Last active
December 25, 2015 22:09
-
-
Save wasade/7047486 to your computer and use it in GitHub Desktop.
IPython tools for submitting to and monitoring jobs on a Torque-based cluster. NOTE: this script is _not_ compatible with Python, and is only syntactically valid for IPython (git would not syntax highlight for Python if the file had a .ipy extension). An example of the use of these utilities can be found here: https://gist.github.com/wasade/7048424
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
__author__ = "Daniel McDonald" | |
__copyright__ = "Copyright 2013" | |
__credits__ = ["Daniel McDonald"] | |
__license__ = "BSD" | |
__version__ = "0.1" | |
__maintainer__ = "Daniel McDonald" | |
__email__ = "[email protected]" | |
usage = """%run cluster_utils.ipy | |
### to toss existing saved environment | |
# drop_env() | |
if recover(): | |
recover_env() | |
else: | |
create_env("some_name") | |
# submission wrapper, note the queue and extra_args kwargs as these are system specific | |
submit = lambda cmd: submit_qsub(cmd, job_name=prj_name, queue='memroute', extra_args='-l pvmem=8gb') | |
# example use, submission and monitoring | |
important_path = 'a/b/c' | |
important_value = 12 | |
register_items(important_path=important_path, important_value=important_value) | |
# submit some jobs | |
jobs = [submit("sleep %d; hostname" % i) for i in range(30,50)] | |
# wait_on is blocking... | |
job_ids_and_oe = wait_on(jobs) | |
# if your session dies while waiting for jobs, you can recover: | |
recover_env() | |
recover_jobs() | |
# wait_on can pay attention to new jobs spawned (e.g., QIIME's parallel_beta_diversity.py). | |
# to have wait_on update, specify an additional job prefix to monitor: | |
job = submit("parallel_beta_diversity.py -i foo -o bar -m unweighted_unifrac -X some_prefix") | |
res = wait_on(job, additional_prefix="some_prefix") | |
""" | |
import os | |
from IPython.core.display import clear_output | |
import sys | |
from time import sleep | |
REGISTERED_ENV_FILE = '.registered_env' | |
REGISTERED_ENV_VAR = '_registered_env' | |
def submit_qsub(cmd, job_name='ipy_ftw', queue=None, extra_args=''): | |
"""Submit a job and return the full job name""" | |
job_data = {'workdir':working_dir, | |
'cmd':cmd, | |
'queue':'-q %s' % queue if queue is not None else '', | |
'extra_args':extra_args, | |
'jobname':job_name} | |
job_template = 'echo "cd %(workdir)s; %(cmd)s" | qsub -k oe -N %(jobname)s %(queue)s %(extra_args)s' | |
job = job_template % job_data | |
job_id = !$job | |
return (job_name, job_id[0].split('.')[0]) | |
def parse_qstat(): | |
"""Process qstat output""" | |
user = os.environ['USER'] | |
lines = !qstat -u $user | |
jobs = {} | |
for id_, name, state in lines.grep(user).fields(0,3,9).fields(): | |
job_id = id_.split('.')[0] | |
jobs[job_id] = {} | |
jobs[job_id]['name'] = name | |
jobs[job_id]['state'] = state | |
return jobs | |
def still_running(monitoring, running_jobs, additional_prefix=None): | |
"""Check if our jobs to be monitored are running | |
additional_prefix can be specified to track derivative worker processes | |
(e.g., from a parallel QIIME workflow) | |
""" | |
new_monitoring = set([]) | |
for name, id_ in monitoring: | |
# stop monitoring if not present anymore | |
if id_ not in running_jobs: | |
continue | |
# stop monitoring if complete | |
if running_jobs[id_]['state'] == 'C': | |
continue | |
new_monitoring.add((name, id_)) | |
if additional_prefix is not None: | |
# see if we have any new jobs with a prefix we're interested in | |
for id_,md in running_jobs.items(): | |
if md['name'].startswith(additional_prefix): | |
if md['state'] in ['R','Q']: # running or queued | |
new_monitoring.add((md['name'], id_)) | |
if new_monitoring != monitoring: | |
obj = globals().get(REGISTERED_ENV_VAR, None) | |
if obj is not None: | |
obj._monitoring_jobs = new_monitoring | |
return new_monitoring | |
def job_run_details(name, id_): | |
"""Run tracejob and parse out the useful bits""" | |
# go back 2 days. This may need to be smarter. | |
job_details = !tracejob -a -m -l -f job -n 2 $id_ | |
if not job_details.grep('dest='): | |
raise ValueError("Cannot find job %s!" % id_) | |
dest = job_details.grep('dest=').fields(-2)[0].strip('(),').split('=')[1] | |
tmp = job_details.grep('Exit_status=').fields(3,4,5,6).fields()[0] | |
exit_status, walltime, mem, vmem = map(lambda x: x.split('=')[-1], tmp) | |
stderr_file = os.path.expandvars("$HOME/%s.e%s" % (name, id_)) | |
if not os.path.exists(stderr_file): | |
raise ValueError, "Could not find expected standard error output: %s" % stderr_file | |
stdout_file = os.path.expandvars("$HOME/%s.o%s" % (name, id_)) | |
if not os.path.exists(stdout_file): | |
raise ValueError, "Could not find expected standard output: %s" % stdout_file | |
return {'exit_status':exit_status, | |
'walltime':walltime, | |
'mem':mem, | |
'vmem':vmem, | |
'stderr_file':stderr_file, | |
'stdout_file':stdout_file} | |
def wait_on(jobs_to_monitor, additional_prefix=None): | |
"""Block while jobs to monitor are running, and provide a status update""" | |
POLL_INTERVAL = 5 | |
elapsed = 0 | |
# fragile. | |
if isinstance(jobs_to_monitor, tuple) and len(jobs_to_monitor) == 2: | |
jobs_to_monitor = [jobs_to_monitor] | |
all_jobs = set(jobs_to_monitor) | |
n_jobs = len(jobs_to_monitor) | |
print "monitoring %d jobs..." % n_jobs | |
sys.stdout.flush() | |
running_jobs = parse_qstat() | |
while jobs_to_monitor: | |
sleep(POLL_INTERVAL) | |
elapsed += POLL_INTERVAL | |
running_jobs = parse_qstat() | |
jobs_to_monitor = still_running(jobs_to_monitor, running_jobs, additional_prefix) | |
all_jobs.update(set(jobs_to_monitor)) | |
n_running = len(jobs_to_monitor) | |
n_total = len(all_jobs) | |
clear_output() | |
print "%d / %d jobs still running, approximately %d seconds elapsed" % (n_running, n_total, elapsed) | |
sys.stdout.flush() | |
n_running = len(jobs_to_monitor) | |
n_total = len(all_jobs) | |
clear_output() | |
print "%d / %d jobs still running, approximately %d seconds elapsed" % (n_running, n_total, elapsed) | |
print "All jobs completed!" | |
sys.stdout.flush() | |
# check if any jobs errored out | |
for name, id_ in all_jobs: | |
deets = job_run_details(name, id_) | |
if deets['exit_status'] != '0': | |
stderr_fp = deets['stderr_file'] | |
last_stderr_line = !tail -n 1 $stderr_fp | |
print "ERROR! Job %s did not exit cleanly." % id_ | |
print "Here is the last line of standard error (%s)" % stderr_fp | |
print last_stderr_line[0] | |
return all_jobs | |
class EnvironmentState(object): | |
"""Store particulars from an environment, write to disk on change""" | |
def __init__(self, basedir, **kwargs): | |
import os | |
import cPickle | |
env_fp = os.path.join(basedir, REGISTERED_ENV_FILE) | |
super(EnvironmentState, self).__setattr__('_state_fp', env_fp) | |
if os.path.exists(self._state_fp): | |
raise ValueError("State on disk appears to exist!") | |
super(EnvironmentState, self).__setattr__('pickler', cPickle.dumps) | |
super(EnvironmentState, self).__setattr__('_basedir', basedir) | |
self.__dict__.update(**kwargs) | |
self.save_state() | |
def save_state(self): | |
"""Pickle self out to disk""" | |
f = open(self._state_fp,'w') | |
f.write(self.pickler(self)) | |
f.close() | |
def __setattr__(self, key, value): | |
"""Update self""" | |
self.__dict__[key] = value | |
self.save_state() | |
def update(self, **kwargs): | |
"""Update self""" | |
for k,v in kwargs.items(): | |
self.__dict__[k] = v | |
self.save_state() | |
def __repr__(self): | |
"""Represent thy self""" | |
out = ["{"] | |
for k,v in self.__dict__.items(): | |
if isinstance(v, int): | |
out.append("'%s':%d," % (str(k), v)) | |
elif isinstance(v, float): | |
out.append("'%s':%f," % (str(k), v)) | |
else: | |
out.append("'%s':'%s'," % (str(k), str(v))) | |
out.append("}") | |
return ''.join(out) | |
def register(**kwargs): | |
"""Store variables in the EnvironmentState instance""" | |
if not REGISTERED_ENV_VAR in globals(): | |
raise ValueError("No registered environment in globals!") | |
obj = globals()[REGISTERED_ENV_VAR] | |
for k,v in kwargs.items(): | |
obj.__setattr__(k, v) | |
def register_env(basedir=None, **kwargs): | |
"""Create a registered environment""" | |
import os | |
if basedir is None: | |
basedir = os.getcwd() | |
if REGISTERED_ENV_VAR in globals(): | |
raise KeyError("%s already exists!" % REGISTERED_ENV_VAR) | |
globals()[REGISTERED_ENV_VAR] = EnvironmentState(basedir, **kwargs) | |
def recover(basedir=None): | |
"""Is there a registered environment to recover from?""" | |
import os | |
if basedir is None: | |
basedir=os.getcwd() | |
env_file = os.path.join(basedir, REGISTERED_ENV_FILE) | |
if os.path.exists(env_file): | |
return True | |
else: | |
return False | |
def recover_env(basedir=None): | |
"""Recover an environment""" | |
import os | |
if basedir is None: | |
basedir = os.getcwd() | |
registered_env = os.path.join(basedir, REGISTERED_ENV_FILE) | |
if not os.path.exists(registered_env): | |
raise ValueError("Cannot environment, %s does not exist!" % REGISTERED_ENV_FILE) | |
global_vars = globals() | |
obj = loads(open(registered_env).read()) | |
global_vars[REGISTERED_ENV_VAR] = obj | |
for k,v in obj.__dict__.items(): | |
global_vars[k] = v | |
def recover_jobs(): | |
"""Attempt to recover monitoring of running jobs""" | |
obj = globals().get(REGISTERED_ENV_VAR, None) | |
if obj is None: | |
return | |
jobs = obj.__dict__.get('_monitoring_jobs', None) | |
if jobs is None: | |
return | |
return wait_on(jobs) | |
def drop_env(): | |
"""Drop an existing environment""" | |
import os | |
if os.path.exists(REGISTERED_ENV_FILE): | |
os.remove(REGISTERED_ENV_FILE) | |
if REGISTERED_ENV_VAR in globals(): | |
foo = globals().pop(REGISTERED_ENV_VAR) | |
def register_items(**kwargs): | |
"""Insert into the global namespace""" | |
if REGISTERED_ENV_VAR not in globals(): | |
raise KeyError("%s is not in globals!" % REGISTERED_ENV_VAR) | |
obj = globals()[REGISTERED_ENV_VAR] | |
obj.update(**kwargs) | |
def create_env(prj_name, basedir=None): | |
"""Creates a simple project environment and working directory | |
prj_name : a name for the project. It is advised that this be < 14 | |
characters if submitting on a cluster. A random tag is | |
appended on as well | |
basedir : set a base working directory, cwd if None | |
""" | |
# create a working directory | |
from random import choice | |
import os | |
if basedir is None: | |
basedir = os.getcwd() | |
alpha = 'abcdefghijklmnopqrstuvwxyz'; alpha += alpha.upper(); alpha += '0123456789' | |
retries = 0 | |
not_created = True | |
while retries < 5 and not_created: | |
name = '_'.join([prj_name, ''.join([choice(alpha) for i in range(3)])]) | |
working_dir = os.path.join(basedir, prj_name) | |
if os.path.exists(working_dir): | |
retries += 1 | |
else: | |
os.mkdir(working_dir) | |
not_created = False | |
register_env(basedir, prj_name=name, working_dir=working_dir) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment