"""Runs Resummino"""
import shutil
from typing import List
import subprocess
from string import Template
import warnings
from hepi.input import Order
from hepi.run import RunParam
import numpy as np
import pkgutil
from hepi.util import namehash
from .. import Input, Result, LD2DL, get_output_dir, get_input_dir, get_pre
import re
import os.path
from pathlib import Path
from .result import ResumminoResult, is_valid, parse_single
import enlighten
import time
import difflib
from smpl.parallel import par
import hashlib
import os
import stat
from smpl import debug
[docs]resummino_path:str = "~/resummino/"
"""resummino folder containing the binary in './build/bin'."""
[docs]def set_path(p: str):
"""
Set the path to the resummino folder containing the binary in './build/bin'.
Args:
str: new Resummino path.
"""
global resummino_path
resummino_path = p+ ("/" if p[-1]!="/" else "")
[docs]def get_path() -> str:
"""
Get Resummino path.
Returns:
str: current Resummino path
"""
global resummino_path
return resummino_path
[docs]class ResumminoRunParam(RunParam):
"""
Parameters for running Resummino.
Attributes:
skip (bool): Skip already performed and stored runs.
flags (str): Additional resummino flags. E.g. '--nlo'.
in_path (str): File path of the input file.
out_path (str): File path of the output file.
"""
def __init__(self, flags: str, in_path: str, out_path: str, skip=False):
super().__init__(skip)
#self.skip = skip
self.flags = flags
self.in_path = in_path
self.out_path = out_path
[docs]def run(params: List[Input], noskip=False, bar=False, no_parse=False,para=True,skip=True,parse=True,run=True) -> dict:
"""
Run the passed list of parameters.
Args:
params (:obj:`list` of :class:`hepi.Input`): All parameters that should be executed/queued.
noskip (bool): False means stored runs will be skipped. Else the are overwritten.
bar (bool): Display a progressbar.
no_parse (bool): Skip parsing the results.
This is the prefered cluster mode, as this function only queues the job.
para (bool): Run jobs in parallel.
run (bool): Actually start/queue resummino.
Returns:
:obj:`dict` : combined dictionary of results and parameters. Each member therein is a list.
The dictionary is empty if `no_parse` is set.
"""
if noskip == skip:
noskip = True
if no_parse== parse:
no_parse= True
print("Running: " + str(len(params)) +" jobs" )
rps = _queue(params, noskip)
if run:
_run(rps, bar, no_parse,para)
if not no_parse:
outs = LD2DL(rps)["out_path"]
results = _parse(outs)
rdl = LD2DL(results)
pdl = LD2DL(params)
return {**rdl, **pdl}
return {}
[docs]def _parse(outputs: List[str]) -> List[ResumminoResult]:
"""
Parses Resummino output files and returns List of Results.
Args:
outputs (:obj:`list` of `str`): List of the filenames to be parsed.
Returns:
:obj:`list` of :class:`hepi.resummino.result.ResumminoResult`
"""
rsl = []
for r in par(parse_single, outputs):
rsl.append(r)
return rsl
[docs]def _queue(params: List[Input], noskip=False) -> List[ResumminoRunParam]:
"""
Queues and generates Resummino run files.
Extends params by input and output files.
Args:
params (:obj:`list` of :class:`hepi.Input`): input parameters
noskip (bool): False means stored runs will be skipped. Else the are overwritten.
Returns:
:obj:`list` of :class:`hepi.RunParams`: Run paramters for usage with :meth:`_run`.
"""
global resummino_path
Path("output").mkdir(parents=True, exist_ok=True)
Path("input").mkdir(parents=True, exist_ok=True)
ret = []
for p in params:
d = p.__dict__
d["code"] = "RS"
# TODO insert defautl if missing in d!
name = namehash("_".join("".join(str(_[0]) + "_" + str(_[1]))
for _ in d.items()).replace("/", "-"))
debug.msg(name)
skip = False
if not noskip and os.path.isfile(get_output_dir() + name + ".out") and is_valid(get_output_dir() + name + ".out",p,d):
print("skip", end='')
skip = True
flags = ""
if not skip:
data = pkgutil.get_data(__name__, "plot_template.in").decode(
'utf-8')
if p.order == Order.LO:
flags = flags + "--lo"
elif p.order == Order.NLO:
flags = flags + "--nlo"
elif p.order == Order.NLO_PLUS_NLL:
flags = flags + "--nll"
elif p.order == Order.aNNLO_PLUS_NNLL:
flags = flags + "--nnll"
else:
raise ValueError("Order not supported by resummino. Must be one of LO/NLO/NLO+NLL/aNNLO+NNLL.")
src = Template(data)
result = src.substitute(d)
open(get_output_dir() + name + ".in", "w").write(result)
open(get_output_dir() + name + ".sh", "w").write("#!/bin/sh\n"+
resummino_path + 'build/bin/resummino {} {} >> {}'.format(
get_output_dir()+name + ".in",flags,get_output_dir()+name + ".out"
))
st = os.stat(get_output_dir() + name + ".sh")
os.chmod(get_output_dir() + name + ".sh", st.st_mode | stat.S_IEXEC)
open(get_output_dir() + name + ".out", "w").write(result + "\n\n")
sname = d['slha']
with open(get_output_dir() + sname, 'r') as f:
#src = Template(f.read())
#result = src.substitute(d)
#open(get_input_dir() + sname + ".in", "w").write(result)
open(get_output_dir() + name + ".out",
"a").write(f.read() + "\n\n")
ret.append(ResumminoRunParam(flags, get_output_dir()+name + ".in", get_output_dir()+name + ".out", skip))
return ret
[docs]def _run(rps: List[ResumminoRunParam], bar=True, no_parse=False,para=True):
"""
Runs Resummino per :class:`RunParams`.
Args:
rps (:obj:`list` of :class:`RunParams`): Extended run parameters.
bar (bool): Enable info bar.
no_parse (bool): Do not wait for parallel runs to finish.
para (bool): Run jobs in parallel.
Returns:
:obj:`list` of int: return codes from jobs if `no_parse` is False.
"""
# TODO clean up on exit emergency
global resummino_path
# TODO RS build path checks?!?!
template = get_pre() + " " + "{}"
# Run commands in parallel
processes = []
processesrpo = {}
processesbar = {}
manager = enlighten.get_manager()
status_format = '{program}{fill} Stage: {stage}{fill} Status {status}:{fill}{lastline}'
main_format = '{program}'
mp = ""
if bar:
mp = rps[0].out_path
main_bar = manager.status_bar(status_format=main_format,
program=mp)
mp = rps[1].out_path
for rp in rps:
if not rp.skip:
command = template.format(rp.in_path.replace(".in",".sh"))
process = subprocess.Popen(command, shell=True)
processes.append(process)
processesrpo[process] = rp.out_path
if not para:
process.wait()
if no_parse:
time.sleep(5)
if bar:
nnn = ""
nn = ""
ch = False
for i, s in enumerate(difflib.ndiff(mp+"_", rp.out_path+"_")):
if s[-1] == "_":
if ch:
nnn = nnn + " ... " + nn
ch = False
nn = ""
if s[0] == '+':
ch = True
if (s[0] == '+' or s[0] == ' ') and s[-1] != "_":
nn = nn + s[-1]
processesbar[process] = manager.status_bar(status_format=status_format,
program=nnn,
stage='INIT',
status='OKAY',
lastline="0")
mp = rps[0].out_path
if bar:
c = True
while c:
if len(processes) > 0:
time.sleep(10)
c = False
for p in processes:
if p.poll() is None:
c = True
pat = re.compile(r'^\* (.*) \*')
n = ""
cl = 0
with open(processesrpo[p], mode="r") as f:
for l in f:
tmp = pat.search(l)
if(tmp is not None):
n = tmp.group(1)
cl = 0
cl = cl+1
processesbar[p].update(
stage=n, status=cl, lastline=l[int(len(l)/2)::])
else:
processesbar[p].update(
stage="DONE", status='DONE', lastline="")
for p in processes:
processesbar[p].close()
main_bar.close()
if not no_parse:
# Collect statuses
output = [p.wait() for p in processes]
return output
return []