Created
July 23, 2020 11:42
-
-
Save phistep/a7e96acdb00c7bb149395f52c1c7b0e3 to your computer and use it in GitHub Desktop.
Minimal Paramter Varying Batch Job Submission System
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import argparse | |
import itertools as it | |
import os | |
import subprocess as sp | |
import multiprocessing as mp | |
parser = argparse.ArgumentParser(description="Run script for all combinations of arguments in parallel") | |
parser.add_argument('config', type=str, help="config file") | |
parser.add_argument('-n', '--processes', type=int, default=None, help="number of parallel processes; defaults to number of cores") | |
parser.add_argument('-N', '--nice', type=int, default=10, help="niceness of processes") | |
parser.add_argument('-p', '--push-notify', action='store_true', | |
help="use pushover to send notification when job finishes; auth is read from ~/.config/pushover, APP_TOKEN\nUSER_TOKEN") | |
parser.add_argument('-m', '--monitor', action='store_true', help="show progress of jobs (linecounts and cpu load) in a tmux overview") | |
args = parser.parse_args() | |
""" Config file example | |
# name of the batch job | |
name = 'example' | |
# tuple of strings that make up the command to be exectued. | |
# if its standalone program: ('programname',) | |
# if its an interoreter of sorts: ('bash', 'script.sh') | |
command = ('python3', '/home/phistep/University/Master/vaml/vamp/weight_distance.py') | |
# folder, file template including extension | |
# file template subs in parameter values (use "b={-b}" to render '-b 0.5' as "-b=0.5") | |
output = ('./batch-test', 'c={-c},d={-d}.tsv') | |
# base parameters for all runs | |
# dictionary with 'switch': value | |
base = { | |
'-a': 1000, | |
# just use switch without value | |
'-B': None, | |
'-c': 0, | |
'-d': 'auto' | |
} | |
# dictionary with 'switch': (tuple, of, possible, values) | |
# all combinations will be explored | |
varying = { | |
'-d': ('d1', 'd2'), | |
'-e': ('e1', 'e2', 'e3'), | |
# use one switch or the orther or none, can only be done for one set of | |
# mutually exclusive switches and blanks. These cannot be used in file name | |
# templating | |
None: ('-F', None), | |
} | |
# for when you need to vary parameters together | |
# this will give [{'-g':1, '-h':10}, {'-g':0.1, '-h':100}, {'-g':0.01, '-h':1000},] | |
# and will be combined in all combinations with the combinations from `varying` above | |
varying_combined = [ | |
{ | |
'-g': (1, 0.1, 0.001,), | |
'-h': (10, 100, 1000,), | |
}, | |
] | |
# these will override any previous base options and will run as independed | |
# sets | |
extra = [ | |
{'-c': 0.5, '-g': '0.5,0.5'}, | |
{'-c': 0.5, '-g': '0.9,0.1'}, | |
] | |
""" | |
exec(open(args.config).read()) | |
if args.monitor: | |
from sys import exit | |
from glob import glob | |
session_name = f'job_monitor_{name}' | |
load_cmd = "htop" | |
progress_cmd = f"watch \"find {output[0]} -name \*{os.path.splitext(output[1])[-1]} -exec wc -l {{}} \;\"" | |
inspect_cmd = "less +F {}" | |
try: | |
tmux = sp.Popen(f'tmux attach-session -t {session_name}', shell=True, stderr=sp.DEVNULL) | |
tmux.wait() | |
if tmux.returncode: | |
sp.run(f"tmux new-session -s {session_name} -d '{progress_cmd}'", shell=True, check=True) | |
sp.run(f"tmux split-window -t {session_name} -h '{load_cmd}'", shell=True, check=True) | |
for file_ in glob(os.path.join(output[0], '*'+os.path.splitext(output[1])[-1])): | |
sp.run(f"tmux new-window -t {session_name} -n '{os.path.basename(file_)}' '{inspect_cmd.format(file_)}'", shell=True, check=True) | |
sp.run(f'tmux select-window -t 0', shell=True, check=True) | |
sp.run(f'tmux attach-session -t {session_name}', shell=True, check=True) | |
finally: | |
sp.run(f'tmux kill-session -t {session_name}', shell=True, stderr=sp.DEVNULL) | |
exit() | |
try: base | |
except NameError: base = {} | |
try: varying | |
except NameError: varying = {} | |
try: varying_combined | |
except NameError: varying_combined = {} | |
try: extra | |
except NameError: extra = {} | |
varied_combined = [ | |
[{k: v for (k, v) in zip(varying_combined_set.keys(), values)} | |
for values in zip(*varying_combined_set.values())] | |
for varying_combined_set in varying_combined] | |
varied_combind_sets = [{k:v for kv in tup for k, v in kv.items()} for tup in it.product(*varied_combined)] | |
varied_param_sets = [{k: v for (k, v) in zip(varying.keys(), values)} for values in it.product(*varying.values())] | |
varied_varied_sets = [{k: v for kv in tup for k, v in kv.items()} for tup in it.product(varied_combind_sets, varied_param_sets)] | |
param_sets = [{**base, **varied_params} for varied_params in varied_varied_sets] | |
param_sets.extend([{**base, **extra_params} for extra_params in extra]) | |
jobs = [{ | |
'id': id_, | |
'args': [*command, *[str(item) for kv in param_set.items() for item in kv if item is not None]], | |
'stdout': os.path.join(output[0], f'{id_:04d}_'+output[1].format(**param_set)), | |
} for id_, param_set in enumerate(param_sets,1)] | |
num_jobs = len(jobs) | |
def run_job(job): | |
kwargs = {k:job[k] for k in job if k!='id'} | |
job['prog_name'] = os.path.basename(command[-1]) | |
job['short_args'] = ' '.join(job['args'][len(command):]) | |
print("[{name}#{id:04d}/{num_jobs} RUN ]: {prog_name} {short_args}".format(**job, name=name, num_jobs=num_jobs)) | |
kwargs['args'] = ' '.join(['nice', '-n', str(args.nice), *kwargs['args']]) | |
kwargs['shell'] = True | |
with open(kwargs['stdout'], 'w') as kwargs['stdout']: | |
sp.run(**kwargs) | |
job_finished(job) | |
def job_finished(job): | |
print("[{name}#{id:04d}/{num_jobs} DONE]: {prog_name} {short_args}".format(**job, name=name, num_jobs=num_jobs)) | |
if args.push_notify: | |
import http.client, urllib | |
title = "Job {name}#{id:04d}/{num_jobs} done.".format(**job, name=name, num_jobs=num_jobs) | |
message = "{prog_name} {short_args}".format(**job) | |
auth = open(os.path.expanduser('~/.config/pushover')).read().split('\n') | |
conn = http.client.HTTPSConnection("api.pushover.net:443") | |
conn.request("POST", "/1/messages.json", | |
urllib.parse.urlencode({ | |
"token": auth[0], | |
"user": auth[1], | |
"title": title, | |
"message": message, | |
}), { "Content-type": "application/x-www-form-urlencoded" }) | |
conn.getresponse() | |
if not args.processes: | |
args.processes = mp.cpu_count() | |
with mp.Pool(args.processes) as worker: | |
worker.map(run_job, jobs) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment