[docs]class Runner:
def __init__(self,
path: str,
in_dir: str = None,
out_dir: str = None,
pre=None):
self.path = path
if in_dir is None:
self.in_dir = get_input_dir()
else:
self.in_dir = in_dir
if out_dir is None:
self.out_dir = get_output_dir()
else:
self.out_dir = out_dir
if pre is None:
self.pre = get_pre()
else:
self.pre = pre
[docs] def orders(self) -> List[Order]:
"""List of supported Orders in this runner."""
return [e.value for e in Order]
[docs] def get_name(self) -> str:
"""Returns the name of the runner."""
return type(self).__name__
[docs] def _check_path(self) -> bool:
"""Checks if the passed path is valid."""
return True
[docs] def _prepare(self, p: Input, **kwargs) -> RunParam:
skip_ = kwargs["skip"]
d = p.__dict__
d["runner"] = type(self).__name__
name = namehash("_".join("".join(str(_[0]) + "_" + str(_[1]))
for _ in d.items()).replace("/", "-"))
#print(name)
skip = False
if skip_ and os.path.isfile(self.get_output_dir() + name +
".out") and self._is_valid(
self.get_output_dir() + name + ".out",
p, d):
print("skip", end='')
skip = True
return RunParam(execute=self.get_output_dir() + name + ".sh",
in_file=self.get_output_dir() + name + ".in",
out_file=self.get_output_dir() + name + ".out",
skip=skip,
name=name)
[docs] def _prepare_all(self,
params: List[Input],
skip=True,
**kwargs) -> List[RunParam]:
ret = []
for p in params:
if not self._check_input(p):
warnings.warn("Check input failed.")
return []
ret.append(self._prepare(p, skip=skip, **kwargs))
return ret
[docs] def run(self,
params: List[Input],
skip=True,
parse=True,
parallel=True,
sleep=0,
run=True,
**kwargs):
"""
Run the passed list of parameters.
Args:
params (:obj:`list` of :class:`hepi.Input`): All parameters that should be executed/queued.
skip (bool): True means stored runs will be skipped. Else the are overwritten.
parse (bool): Parse the results.
This is not the prefered cluster/parallel mode, as there the function only queues the job.
parallel (bool): Run jobs in parallel.
sleep (int): Sleep seconds after starting job.
run (bool): Actually start/queue runner.
Returns:
:obj:`dict` : combined dictionary of results and parameters. Each member therein is a list.
The dictionary is empty if `parse` is set to False.
"""
if not self._check_path():
warnings.warn("The path is not valid for " + self.get_name())
rps = self._prepare_all(params, parse=parse, skip=skip, **kwargs)
print("Running: " + str(len(params)) + " jobs")
if sleep is None:
sleep = 0 if parse else 5
if run:
self._run(rps,
wait=parse,
parallel=parallel,
sleep=sleep,
**kwargs)
if parse:
outs = LD2DL(rps)["out_file"]
results = self.parse(outs)
rdl = LD2DL(results)
pdl = LD2DL(params)
return {**rdl, **pdl}
return {}
[docs] def _run(self,
rps: List[RunParam],
wait=True,
parallel=True,
sleep=0,
**kwargs):
"""
Runs Runner per :class:`RunParams`.
Args:
rps (:obj:`list` of :class:`RunParams`): Extended run parameters.
bar (bool): Enable info bar.
wait (bool): Wait for parallel runs to finish.
sleep (int): Sleep seconds after starting subprocess.
parallel (bool): Run jobs in parallel.
Returns:
:obj:`list` of int: return codes from jobs if `no_parse` is False.
"""
# get cluster or niceness prefix
template = self.get_pre() + " " + "{}"
# Run commands in parallel
processes = []
for rp in rps:
if not rp.skip:
command = template.format(rp.execute)
process = subprocess.Popen(command, shell=True)
processes.append(process)
if not parallel:
process.wait()
# Forced delay to prevent overloading clusters when registering jobs
time.sleep(sleep)
if wait:
# Collect statuses
output = [p.wait() for p in processes]
return output
return []
[docs] def _is_valid(self, file: str, p: Input, d) -> bool:
"""
Verifies that a file is a complete output.
Args:
file (str): File path to be parsed.
p (:class:`hepi.Input`): Onput parameters.
d (:obj:`dict`): Param dictionary.
Returns:
bool : True if `file` could be parsed.
"""
res = self._parse_file(file)
if res.LO is None and p.order is Order.LO:
return False
if res.NLO is None and p.order is Order.NLO:
return False
if res.NLO_PLUS_NLL is None and p.order is Order.NLO_PLUS_NLL:
return False
if res.aNNLO_PLUS_NNLL is None and p.order is Order.aNNLO_PLUS_NNLL:
return False
return True
[docs] def parse(self, outputs: List[str]) -> List[Result]:
"""
Parses Resummino output files and returns List of Results.
Args:
outputs (:obj:`list` of `str`): List of the filenames to be parsed.
Returns:
:obj:`list` of :class:`hepi.resummino.result.ResumminoResult`
"""
rsl = []
for r in par(self._parse_file, outputs):
rsl.append(r)
return rsl
[docs] def _parse_file(self, file: str) -> Result:
"""
Extracts results from an output file.
Args:
file (str): File path to be parsed.
Returns:
:class:`Result` : If a value is not found in the file None is used.
"""
return None
[docs] def get_path(self) -> str:
"""
Get the Runner path.
Returns:
str: current Runner path.
"""
return self.path
[docs] def get_output_dir(self) -> str:
"""
Get the input directory.
Returns:
str: :attr:`out_dir`
"""
return self.out_dir
[docs] def get_pre(self) -> str:
"""
Gets the command prefix.
Returns:
str: :attr:`pre`
"""
return self.pre
[docs] def set_path(self, p: str):
"""
Set the path to the Runner folder containing the binary in './bin' or './build/bin'.
Args:
p (str): new path.
"""
if os.path.isdir(p):
self.path = p + ("/" if p[-1] != "/" else "")
self.path = p
[docs] def set_output_dir(self, outdir: str, create: bool = True):
"""
Sets the output directory.
Args:
outdir (str): new output directory.
create (bool): create directory if not existing.
"""
if create:
os.makedirs(outdir, exist_ok=True)
self.out_dir = outdir
[docs] def set_pre(self, ppre: str):
"""
Sets the command prefix.
Args:
ppre (str): new command prefix.
"""
self.pre = ppre