Created
February 19, 2013 20:56
-
-
Save wasade/4989820 to your computer and use it in GitHub Desktop.
Updated cluster_jobs.py for use on some Torque based clusters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/env python | |
#cluster_jobs.py | |
""" | |
Simple helper script to make pbs jobs and submit to the cluster | |
Will only work if run on bmf or hemisphere | |
Can run from command line or import into python code. | |
Author: Micah Hamady | |
Status: Prototype | |
""" | |
from os import makedirs, removedirs | |
from commands import getoutput | |
from string import strip | |
from time import sleep | |
from random import choice | |
__author__ = "Micah Hamady" | |
__copyright__ = "Copyright 2012, PyCogent" | |
__credits__ = ["Micah Hamady", "Daniel McDonald"] | |
__license__ = "GPL" | |
__version__ = "1.5.3-dev" | |
__maintainer__ = "Daniel McDonald" | |
__email__ = "[email protected]" | |
__status__ = "Development" | |
# module constants | |
# qsub template | |
QSUB_TEXT = """# Walltime Limit: hh:nn:ss | |
#PBS -l walltime=%s | |
# Node Specification: Two CPUs on one node | |
#PBS -l ncpus=%d -l nodes=%d%s -l pvmem=%s | |
# Mail: options are (a) aborted, (b) begins execution, (e) ends execution | |
# use -M <email> for additional recipients | |
# Queue: Defaults to friendlyq | |
#PBS -q %s | |
# supress email notification | |
#PBS -m n | |
# Job Name: | |
#PBS -N %s | |
#PBS -k %s | |
# discard output | |
%s | |
###################################################################### | |
# Executable commands go here. Make sure that mpirun -np and -npn match the | |
# number of processors you requested from PBS! If you are using ppn and want | |
# npn=2, then you can leave -np and -npn off and PBS will add it for you. If | |
# you want npn=1, then you have to request all of the processors on all of the | |
# nodes, and then force it to use less using -npn. | |
echo ------------------------------------------------------ | |
echo PBS: qsub is running on $PBS_O_HOST | |
echo PBS: originating queue is $PBS_O_QUEUE | |
echo PBS: executing queue is $PBS_QUEUE | |
echo PBS: working directory is $PBS_O_WORKDIR | |
echo PBS: execution mode is $PBS_ENVIRONMENT | |
echo PBS: job identifier is $PBS_JOBID | |
echo PBS: job name is $PBS_JOBNAME | |
echo PBS: node file is $PBS_NODEFILE | |
echo PBS: current home directory is $PBS_O_HOME | |
echo PBS: PATH = $PBS_O_PATH | |
echo ------------------------------------------------------ | |
cd $PBS_O_WORKDIR | |
%s | |
""" | |
# qsub command | |
QSUB_CMD = "qsub %s" | |
#QSUB_CMD = "/usr/pbs/bin/qsub %s" | |
# command line usage string | |
USAGE_STR = """Usage: cluster_jobs.py <-m|-ms|-clean|-kill> [<name of commands file> <jobs prefix>] | |
Options: | |
-m just make jobs. this will create a "jobs" directory in the current | |
directory and write out the pbs job files there. | |
-ms make and submit jobs. | |
-clean clean up temp and output files (std and stderrror) from previous | |
run(s). | |
-cleansave clean up temp and output files (std and stderrror) from previous | |
run(s) and move to saved directory | |
-kill kill all of queued or running jobs for current user. | |
The following are required if -m or -ms specified (2nd and 3rd arguments): | |
<name of commands file> name of file with one command per line. | |
<jobs_prefix> prefix of jobs, 10 chars. e.g. micah_blat | |
""" | |
class ClusterJobs: | |
""" | |
Wrapper to queue managament system. | |
Creates and/or sumbits jobs to cluster. | |
""" | |
# bmf typical settings | |
def __init__(self, commands_list, jobs_prefix, jobs_dir="jobs", | |
ppn=1, nnodes=1, ncpus=1, num_jobs=50, wall_time="999:00:00", | |
queue_name="memroute", job_range=None, | |
use_mpirun = False, qstat_prefix = "compy", delay_secs=5.0, | |
verbose=True, keep_output="oe", discard_output=False, | |
memory='4gb'): | |
# hemisphere typical settings | |
#def __init__(self, commands_list, jobs_prefix, jobs_dir="jobs", | |
# ppn=1, nnodes=1, ncpus=1, num_jobs=48, wall_time="144:00:00", | |
# queue_name="friendlyq", job_range=(16,32), | |
# use_mpirun = False, qstat_prefix = "hemispher", | |
# delay_secs=1.0, verbose=False, keep_output="oe", discard_output=False): | |
""" | |
Initialize jobs | |
commands_list: list of commands to run on cluster | |
**PLEASE NOTE** path to executable must be fully qualified. | |
e.g. to run a python script, the job must look something like | |
/usr/local/bin/python2.4 my_script.py 'script params' | |
**ALSO NOTE** If your python (et al) script takes parameters from | |
the command line, and you are checking the number of parameters | |
you should be aware that mpirun will add parameters after the | |
parameters you have passed. | |
(e.g. if len(argv) != 4: <do something>, you will need to modify | |
your code) | |
jobs_prefix: job prefix. must be <= 10 characters. no spaces. first | |
character must be a-Z. this is what you will see in the job queue, | |
and will be the prifix of all of the standard out and standard | |
error output. required. | |
jobs_dir: path to directory to store generated jobs. default = "jobs" | |
ppn: number of processors per node to use, default = 1 | |
nnodes: number of nodes per job, default = 1 | |
ncpus: number of cpus total to use | |
num_jobs: number of jobs to make, default = 16 | |
wall_time: maximum time the job is allowed to run. friendlyq limit is | |
72 hours. if debugging a program that might hang (e.g. an mpi | |
program that blocks) it is a good idea to set this value very low, | |
e.g. 5 minutes. format is HH:MM:SS. default = 12 hours | |
queue_name: name of queue to submit jobs to. default = friendlyq | |
job_range: only create/submit jobs in this range. tuple (start, stop) | |
half open. e.g. includes first index, excludes last) | |
use_mpirun: will run jobs using mpi run. this will send your command | |
through mpirun shell script. both mpi and non-mpi programs will | |
run fine when this is True. default = True | |
qstat_prefix: prefix of the qstat host. e.g. execute qstat and you | |
will see something like 7737.bmf or 204544.hemispher | |
the qstat_prefix would be the text after the number, so | |
on bmf, the qstat_prefix is "bmf" | |
on hemisphere the qstat_prefix is currently "hemispher" | |
delay_secs: time delay between submitting jobs - helps to avoid | |
pbs crashes | |
keep_output: keep standard error, standard out, both, or neither | |
o=std out, e=std err, oe=both, n=neither | |
verbose: when True, will print out progress, otherwise will be silent | |
discard_output: when True, output will write to temp file. Using | |
when running as apache user (word dir env vars stripped) and | |
-k n is ignored. If not used, there is a delay when job | |
finishes running but output cannot be delivered. | |
memory: memory to request | |
""" | |
if not commands_list: | |
raise ValueError, "You must specify a list of commands." | |
if not jobs_prefix or len(jobs_prefix) > 10: | |
raise ValueError, "Jobs prefix must be a 1-10 characters long." | |
if not jobs_prefix[0].isalpha(): | |
raise ValueError, "First letter of prefix must be a-Z." | |
if ppn < 1 or ppn > 2: | |
print "WARNING! Possible invalid ppn value. %d" % ppn | |
if nnodes < 1 or nnodes > 8: | |
print "WARNING! Possible invalid number of nodes. %d" % nnodes | |
if num_jobs < 1: | |
raise ValueError, "You must specify > 1 jobs to create." | |
if ncpus < 1: | |
raise ValueError, "You must specify > 1 cpus to use" | |
if num_jobs > 256: | |
print "WARNING! Large number of jobs specified! %d" % num_jobs | |
# make sure that absolute path to executable is specified | |
#if len(filter(lambda x: x.startswith("/"), commands_list)) != \ | |
# len(commands_list): | |
# raise ValueError, "One or more commands not fully qualified." | |
# try to make directory for jobs | |
try: | |
makedirs(jobs_dir) | |
except OSError: | |
# if directory already exists, ignore | |
pass | |
if keep_output not in ["e", "o", "oe", "eo", "n"]: | |
raise ValueError, "Invalid keep_output option." | |
# make sure jobs dir has training slash | |
if not jobs_dir.endswith("/"): | |
jobs_dir += "/" | |
self.JobsDir = jobs_dir | |
# store values | |
self.CommandList = commands_list | |
self.JobsPrefix = jobs_prefix | |
self.ProcsPerNode = ppn | |
self.NumNodes = nnodes | |
self.NumCpus = ncpus | |
self.NumJobs = num_jobs | |
self.WallTime = wall_time | |
self.QueueName = queue_name | |
self.JobRange = job_range | |
self.UseMPIRun = use_mpirun | |
self.JobFilenames = [] | |
self.QstatPrefix = qstat_prefix | |
self.DelaySecs = delay_secs | |
self.Verbose = verbose | |
self.KeepOutput = keep_output | |
self.DiscardOutput = discard_output | |
self.Memory = memory | |
def makeJobs(self): | |
""" | |
Write out jobs | |
""" | |
out = [] | |
out.append("Trying to make %d jobs." % self.NumJobs) | |
out.append("Writing jobs to: %s" % self.JobsDir) | |
out.append("Delay between jobs: %.2f" % self.DelaySecs) | |
ct = 0 | |
job_lists = {} | |
# for each command, add to growing list of jobs | |
for cmd in map(strip, self.CommandList): | |
cur_job = ct % self.NumJobs | |
# check if new job | |
if not cur_job in job_lists: | |
job_lists[cur_job] = [] | |
# if UseMPIRun, prefix command with mpirun command | |
mpirun = "mpirun -np %d " % self.NumCpus | |
if not self.UseMPIRun: | |
mpirun = "" | |
# append command to current job | |
job_lists[cur_job].append(mpirun + cmd) | |
ct += 1 | |
# write out pbs files, cache job filenames | |
for jkey, jlist in job_lists.items(): | |
job_out = "%s%s%d.pbs" %(self.JobsDir, | |
self.JobsPrefix, | |
jkey) | |
self.JobFilenames.append(job_out) | |
o_file = open(job_out, "w+") | |
ppn_text = "" | |
if self.ProcsPerNode > 1: | |
ppn_text = ":ppn=%d" % self.ProcsPerNode | |
discard_out = "" | |
if self.DiscardOutput: | |
picks = list('abcdefghigklmnopqrstuvwxyz0123456789') | |
base = ''.join([choice(picks) for i in range(9)]) + ".discard" | |
rerr = "e" + base | |
rout = "o" + base | |
discard_out = "#PBS -o /tmp/%s\n#PBS -e /tmp/%s" % (rout, rerr) | |
o_file.write(QSUB_TEXT % (self.WallTime, | |
self.NumCpus, | |
self.NumNodes, | |
ppn_text, | |
self.Memory, | |
self.QueueName, | |
"%s%d" % (self.JobsPrefix, jkey), | |
self.KeepOutput, | |
discard_out, | |
'\n'.join(jlist))) | |
o_file.close() | |
return '\n'.join(out) | |
def submitJobs(self): | |
""" | |
Submit all jobs. | |
""" | |
if not self.JobFilenames: | |
raise ValueError, "No job filenames available. Have you run makeJobs yet?" | |
out = [] | |
job_ids = [] | |
out.append("Submitting %d jobs to %s queue!" % (len(self.JobFilenames), | |
self.QueueName)) | |
start_ix = 0 | |
stop_ix = len(self.JobFilenames) | |
if self.JobRange: | |
start_ix, stop_ix = self.JobRange | |
for file_pt in zip(range(len(self.JobFilenames)), self.JobFilenames): | |
ct, p_file = file_pt | |
if ct < start_ix or ct >= stop_ix: | |
if self.Verbose: | |
print "skipping: %d" % ct | |
continue | |
if self.Verbose: | |
print "Submitting: ", p_file | |
#raise ValueError, "Submitting: %s" % (QSUB_CMD % p_file) | |
out_id = getoutput(QSUB_CMD % p_file) | |
out.append(out_id) | |
job_ids.append(out_id) | |
sleep(self.DelaySecs) | |
out.append("done.\nType 'qstat' or 'qstat -a' to check status of your jobs.") | |
return '\n'.join(out), job_ids | |
from cogent.util.misc import parse_command_line_parameters | |
from optparse import make_option | |
script_info={} | |
script_info['brief_description']="""Submit jobs to a PBS/Torque cluster""" | |
script_info['script_description']="""This script consumes a list of command lines and partitions those commands into a specified number of jobs. Within each job, the commands are run serially.""" | |
script_info['script_usage']=[] | |
script_info['required_options'] = [\ | |
make_option('-i','--input',help="file containing the input commands", | |
type='str'), | |
make_option('-p','--prefix',help="submission prefix", type='str'), | |
make_option('-t','--runtype',help="what to do: make or make_submit "\ | |
"[default:%default]",type='choice', choices=['make','make_submit'], | |
default='make')] | |
script_info['optional_options'] = [\ | |
make_option('-j','--jobs',help="make N jobs", type='int', default=20), | |
make_option('-m','--mem',help="memory per job",type='str',default='2gb'), | |
make_option('-q','--queue',help="submit queue",type='str', | |
default='memroute'), | |
make_option('-w','--walltime',help='walltime per job',type='str', | |
default='120:00:00'), | |
make_option('-d','--delay',help='job submission delay',type='float', | |
default=0.1)] | |
script_info['version'] = __version__ | |
if __name__ == "__main__": | |
option_parser, opts, args = parse_command_line_parameters(**script_info) | |
cj = ClusterJobs(open(opts.input), opts.prefix, num_jobs=opts.jobs, | |
queue_name=opts.queue,delay_secs=opts.delay, | |
wall_time=opts.walltime,memory=opts.mem) | |
if opts.runtype == 'make' or opts.runtype == 'make_submit': | |
cj.makeJobs() | |
if opts.runtype == 'make_submit': | |
cj.submitJobs() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment