Skip to content

Instantly share code, notes, and snippets.

@wasade
Last active December 25, 2015 22:09
Show Gist options
  • Save wasade/7047486 to your computer and use it in GitHub Desktop.
Save wasade/7047486 to your computer and use it in GitHub Desktop.
IPython tools for submitting to and monitoring jobs on a Torque-based cluster. NOTE: this script is _not_ compatible with Python, and is only syntactically valid for IPython (git would not syntax highlight for Python if the file had a .ipy extension). An example of the use of these utilities can be found here: https://gist.github.com/wasade/7048424
__author__ = "Daniel McDonald"
__copyright__ = "Copyright 2013"
__credits__ = ["Daniel McDonald"]
__license__ = "BSD"
__version__ = "0.1"
__maintainer__ = "Daniel McDonald"
__email__ = "[email protected]"
usage = """%run cluster_utils.ipy
### to toss existing saved environment
# drop_env()
if recover():
recover_env()
else:
create_env("some_name")
# submission wrapper, note the queue and extra_args kwargs as these are system specific
submit = lambda cmd: submit_qsub(cmd, job_name=prj_name, queue='memroute', extra_args='-l pvmem=8gb')
# example use, submission and monitoring
important_path = 'a/b/c'
important_value = 12
register_items(important_path=important_path, important_value=important_value)
# submit some jobs
jobs = [submit("sleep %d; hostname" % i) for i in range(30,50)]
# wait_on is blocking...
job_ids_and_oe = wait_on(jobs)
# if your session dies while waiting for jobs, you can recover:
recover_env()
recover_jobs()
# wait_on can pay attention to new jobs spawned (e.g., QIIME's parallel_beta_diversity.py).
# to have wait_on update, specify an additional job prefix to monitor:
job = submit("parallel_beta_diversity.py -i foo -o bar -m unweighted_unifrac -X some_prefix")
res = wait_on(job, additional_prefix="some_prefix")
"""
import os
from IPython.core.display import clear_output
import sys
from time import sleep
REGISTERED_ENV_FILE = '.registered_env'
REGISTERED_ENV_VAR = '_registered_env'
def submit_qsub(cmd, job_name='ipy_ftw', queue=None, extra_args=''):
"""Submit a job and return the full job name"""
job_data = {'workdir':working_dir,
'cmd':cmd,
'queue':'-q %s' % queue if queue is not None else '',
'extra_args':extra_args,
'jobname':job_name}
job_template = 'echo "cd %(workdir)s; %(cmd)s" | qsub -k oe -N %(jobname)s %(queue)s %(extra_args)s'
job = job_template % job_data
job_id = !$job
return (job_name, job_id[0].split('.')[0])
def parse_qstat():
"""Process qstat output"""
user = os.environ['USER']
lines = !qstat -u $user
jobs = {}
for id_, name, state in lines.grep(user).fields(0,3,9).fields():
job_id = id_.split('.')[0]
jobs[job_id] = {}
jobs[job_id]['name'] = name
jobs[job_id]['state'] = state
return jobs
def still_running(monitoring, running_jobs, additional_prefix=None):
"""Check if our jobs to be monitored are running
additional_prefix can be specified to track derivative worker processes
(e.g., from a parallel QIIME workflow)
"""
new_monitoring = set([])
for name, id_ in monitoring:
# stop monitoring if not present anymore
if id_ not in running_jobs:
continue
# stop monitoring if complete
if running_jobs[id_]['state'] == 'C':
continue
new_monitoring.add((name, id_))
if additional_prefix is not None:
# see if we have any new jobs with a prefix we're interested in
for id_,md in running_jobs.items():
if md['name'].startswith(additional_prefix):
if md['state'] in ['R','Q']: # running or queued
new_monitoring.add((md['name'], id_))
if new_monitoring != monitoring:
obj = globals().get(REGISTERED_ENV_VAR, None)
if obj is not None:
obj._monitoring_jobs = new_monitoring
return new_monitoring
def job_run_details(name, id_):
"""Run tracejob and parse out the useful bits"""
# go back 2 days. This may need to be smarter.
job_details = !tracejob -a -m -l -f job -n 2 $id_
if not job_details.grep('dest='):
raise ValueError("Cannot find job %s!" % id_)
dest = job_details.grep('dest=').fields(-2)[0].strip('(),').split('=')[1]
tmp = job_details.grep('Exit_status=').fields(3,4,5,6).fields()[0]
exit_status, walltime, mem, vmem = map(lambda x: x.split('=')[-1], tmp)
stderr_file = os.path.expandvars("$HOME/%s.e%s" % (name, id_))
if not os.path.exists(stderr_file):
raise ValueError, "Could not find expected standard error output: %s" % stderr_file
stdout_file = os.path.expandvars("$HOME/%s.o%s" % (name, id_))
if not os.path.exists(stdout_file):
raise ValueError, "Could not find expected standard output: %s" % stdout_file
return {'exit_status':exit_status,
'walltime':walltime,
'mem':mem,
'vmem':vmem,
'stderr_file':stderr_file,
'stdout_file':stdout_file}
def wait_on(jobs_to_monitor, additional_prefix=None):
"""Block while jobs to monitor are running, and provide a status update"""
POLL_INTERVAL = 5
elapsed = 0
# fragile.
if isinstance(jobs_to_monitor, tuple) and len(jobs_to_monitor) == 2:
jobs_to_monitor = [jobs_to_monitor]
all_jobs = set(jobs_to_monitor)
n_jobs = len(jobs_to_monitor)
print "monitoring %d jobs..." % n_jobs
sys.stdout.flush()
running_jobs = parse_qstat()
while jobs_to_monitor:
sleep(POLL_INTERVAL)
elapsed += POLL_INTERVAL
running_jobs = parse_qstat()
jobs_to_monitor = still_running(jobs_to_monitor, running_jobs, additional_prefix)
all_jobs.update(set(jobs_to_monitor))
n_running = len(jobs_to_monitor)
n_total = len(all_jobs)
clear_output()
print "%d / %d jobs still running, approximately %d seconds elapsed" % (n_running, n_total, elapsed)
sys.stdout.flush()
n_running = len(jobs_to_monitor)
n_total = len(all_jobs)
clear_output()
print "%d / %d jobs still running, approximately %d seconds elapsed" % (n_running, n_total, elapsed)
print "All jobs completed!"
sys.stdout.flush()
# check if any jobs errored out
for name, id_ in all_jobs:
deets = job_run_details(name, id_)
if deets['exit_status'] != '0':
stderr_fp = deets['stderr_file']
last_stderr_line = !tail -n 1 $stderr_fp
print "ERROR! Job %s did not exit cleanly." % id_
print "Here is the last line of standard error (%s)" % stderr_fp
print last_stderr_line[0]
print
return all_jobs
class EnvironmentState(object):
"""Store particulars from an environment, write to disk on change"""
def __init__(self, basedir, **kwargs):
import os
import cPickle
env_fp = os.path.join(basedir, REGISTERED_ENV_FILE)
super(EnvironmentState, self).__setattr__('_state_fp', env_fp)
if os.path.exists(self._state_fp):
raise ValueError("State on disk appears to exist!")
super(EnvironmentState, self).__setattr__('pickler', cPickle.dumps)
super(EnvironmentState, self).__setattr__('_basedir', basedir)
self.__dict__.update(**kwargs)
self.save_state()
def save_state(self):
"""Pickle self out to disk"""
f = open(self._state_fp,'w')
f.write(self.pickler(self))
f.close()
def __setattr__(self, key, value):
"""Update self"""
self.__dict__[key] = value
self.save_state()
def update(self, **kwargs):
"""Update self"""
for k,v in kwargs.items():
self.__dict__[k] = v
self.save_state()
def __repr__(self):
"""Represent thy self"""
out = ["{"]
for k,v in self.__dict__.items():
if isinstance(v, int):
out.append("'%s':%d," % (str(k), v))
elif isinstance(v, float):
out.append("'%s':%f," % (str(k), v))
else:
out.append("'%s':'%s'," % (str(k), str(v)))
out.append("}")
return ''.join(out)
def register(**kwargs):
"""Store variables in the EnvironmentState instance"""
if not REGISTERED_ENV_VAR in globals():
raise ValueError("No registered environment in globals!")
obj = globals()[REGISTERED_ENV_VAR]
for k,v in kwargs.items():
obj.__setattr__(k, v)
def register_env(basedir=None, **kwargs):
"""Create a registered environment"""
import os
if basedir is None:
basedir = os.getcwd()
if REGISTERED_ENV_VAR in globals():
raise KeyError("%s already exists!" % REGISTERED_ENV_VAR)
globals()[REGISTERED_ENV_VAR] = EnvironmentState(basedir, **kwargs)
def recover(basedir=None):
"""Is there a registered environment to recover from?"""
import os
if basedir is None:
basedir=os.getcwd()
env_file = os.path.join(basedir, REGISTERED_ENV_FILE)
if os.path.exists(env_file):
return True
else:
return False
def recover_env(basedir=None):
"""Recover an environment"""
import os
if basedir is None:
basedir = os.getcwd()
registered_env = os.path.join(basedir, REGISTERED_ENV_FILE)
if not os.path.exists(registered_env):
raise ValueError("Cannot environment, %s does not exist!" % REGISTERED_ENV_FILE)
global_vars = globals()
obj = loads(open(registered_env).read())
global_vars[REGISTERED_ENV_VAR] = obj
for k,v in obj.__dict__.items():
global_vars[k] = v
def recover_jobs():
"""Attempt to recover monitoring of running jobs"""
obj = globals().get(REGISTERED_ENV_VAR, None)
if obj is None:
return
jobs = obj.__dict__.get('_monitoring_jobs', None)
if jobs is None:
return
return wait_on(jobs)
def drop_env():
"""Drop an existing environment"""
import os
if os.path.exists(REGISTERED_ENV_FILE):
os.remove(REGISTERED_ENV_FILE)
if REGISTERED_ENV_VAR in globals():
foo = globals().pop(REGISTERED_ENV_VAR)
def register_items(**kwargs):
"""Insert into the global namespace"""
if REGISTERED_ENV_VAR not in globals():
raise KeyError("%s is not in globals!" % REGISTERED_ENV_VAR)
obj = globals()[REGISTERED_ENV_VAR]
obj.update(**kwargs)
def create_env(prj_name, basedir=None):
"""Creates a simple project environment and working directory
prj_name : a name for the project. It is advised that this be < 14
characters if submitting on a cluster. A random tag is
appended on as well
basedir : set a base working directory, cwd if None
"""
# create a working directory
from random import choice
import os
if basedir is None:
basedir = os.getcwd()
alpha = 'abcdefghijklmnopqrstuvwxyz'; alpha += alpha.upper(); alpha += '0123456789'
retries = 0
not_created = True
while retries < 5 and not_created:
name = '_'.join([prj_name, ''.join([choice(alpha) for i in range(3)])])
working_dir = os.path.join(basedir, prj_name)
if os.path.exists(working_dir):
retries += 1
else:
os.mkdir(working_dir)
not_created = False
register_env(basedir, prj_name=name, working_dir=working_dir)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment