Created
May 7, 2012 12:49
-
-
Save jfburkhart/2627589 to your computer and use it in GitHub Desktop.
a multiprocess script
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
jfb, 2010.10.20 | |
""" | |
from copy import copy | |
import shutil | |
import datetime as dt | |
import os, time | |
import os.path as osp | |
import glob | |
from subprocess import PIPE, call, Popen | |
import multiprocessing | |
import logging | |
def now(): | |
return dt.datetime.now() | |
print """ | |
WARNING: | |
This module is under heavy development, and is not well tested. | |
It creates, modifies, and deletes file system files. USE AT YOUR | |
OWN RISK. ~jfb | |
""" | |
class Runner: | |
""" The 'Runner' class is a queue and option parser for programs | |
that can be run from the command line. | |
It is a parent class for subclasses which are specific to the | |
programs that will be called. See the doc strings of the child | |
classes for more details. | |
NOTE:: | |
Alone this class will not work, child classes are required | |
and should have gen_input and run methods at a minimum. | |
""" | |
num_runners = 0 | |
class OptionError: | |
def __init__(self, option=''): | |
print("Not Valid input option: {0}".format(option)) | |
pass | |
def __init__(self, **kwargs): | |
Runner.num_runners += 1 | |
self.default_name = str(self.__class__).split('.')[-1][:3] + \ | |
str(Runner.num_runners) | |
self.valid_options = [] | |
self.default_options = {} | |
if 'options' not in kwargs: | |
self.options = self.default_options | |
else: | |
self._op_check(kwargs['options']) | |
self.options = kwargs['options'] | |
if 'user_dir' not in kwargs: | |
self.user_dir = '.' | |
self.queue = {} | |
self.qcnt = 0 | |
def _op_check(self, options): | |
""" Checks that options are valid. """ | |
for k in options.keys(): | |
if k not in self.valid_options: | |
raise self.OptionError(k) | |
pass | |
def set_option(self, option, value='', run_id=None): | |
""" set options manually, option is a string identifier | |
value can be several types, but needs to be able to be | |
converted to a string. | |
For lists, each element will be converted to a string. | |
run_id is a string identifier that is a 'key' to the run | |
queue, if provided you can modify runs already set up in | |
queue. | |
""" | |
assert isinstance(option, str), "option must be a string" | |
if option not in self.valid_options: | |
raise self.OptionError(option) | |
if isinstance(value, dict): | |
raise ValueError('value type must be one of: \ | |
str, int, float.') | |
if hasattr(value, '__iter__'): | |
value = ' '.join([str(v) for v in value]) | |
else: | |
value = str(value) # TODO: this is not necessarily required | |
assert isinstance(value, str), "value not converted to string." | |
options = self._get_from_queue(run_id) | |
options[option] = value | |
def _get_options_from_queue(self, run_id): | |
""" internal method to return dictionary of options from queue | |
if run_id is None, returns default | |
""" | |
if run_id: | |
if run_id not in self.queue: | |
raise KeyError('run_id: {0} not in queue'.format(run_id)) | |
else: | |
return self.queue[run_id] | |
else: | |
return self.options | |
def overide_options(self, options, run_id=None): | |
""" completely overide the options dictionary. | |
run_id is a string identifier that is a 'key' to the run | |
queue, if provided you can modify runs already set up in | |
queue. | |
""" | |
assert isinstance(options, dict), "override options requires a dict" | |
if run_id in self.queue: | |
self.queue[run_id] = options | |
else: | |
self._op_check(options) | |
self.options = options | |
def update_options(self, options, run_id=None): | |
""" update the options dictionary using a dict. | |
run_id is a string identifier that is a 'key' to the run | |
queue, if provided you can modify runs already set up in | |
queue. | |
""" | |
assert isinstance(options, dict), "update options requires a dict" | |
self._op_check(options) | |
old_options = self._get_from_queue(run_id) | |
old_options.update(options) | |
def add_run(self, run_id=None, options=None): | |
""" adds a run to the queue | |
settings can be passed as an optional dictionary:: | |
add_run('run_id', options=MySettings) | |
""" | |
if options is None: | |
options = copy(self.options) | |
if run_id is None: | |
run_id = self.default_name + '_' + str(self.qcnt) | |
self.queue[run_id] = {'options' : options} | |
self.qcnt += 1 | |
def clear_queue(self): | |
""" clears the queue dictionary """ | |
proceed = raw_input('WARNING: Delete all _run information in the queue?[y]') | |
if proceed.lower() in ['', 'y', 'yes']: | |
self.queue = {} | |
self.qcnt = 0 | |
def print_queue(self): | |
for run_id in self.queue: | |
options = self.queue[run_id] | |
print('### File: {0} ###'.format(run_id)) | |
for option in options: | |
print('{0} : {1}'.format(option, options[option])) | |
print('') | |
print('Current defaults: {0}'.format(self.options)) | |
def _call(self, cmd, run_id, *args, **kwargs): | |
""" Uses subprocess call convenience function """ | |
""" OVERRIDING _call """ | |
#cmd = 'echo "{0}"'.format(cmd) | |
ofile = osp.join(self.BASE_DIR, 'run_{0}.nh'.format(run_id)) | |
pid = Popen(cmd, | |
stdout=open( ofile, 'w'), | |
stderr=open('logfile.log', 'a')).pid | |
print 'Run ID: {0}, pid: {1}'.format(run_id, pid) | |
return | |
def _worker(self, run_id, verbose=0): | |
"""thread _worker function""" | |
self.run(run_id, verbose) | |
return | |
def run_queue(self, runs=None, verbose=0): | |
""" queue is a dictionary keyed by run_id. The values | |
are parsed into options for the program that is called. | |
The option parsing is specific to each program, and not | |
done here, but done in the _run method of the child class. | |
""" | |
assert self.queue, "Must generate at least one input file" | |
# can pass a list of runs, if you don't want all. | |
if not runs: | |
run_ids = self.queue.keys() | |
self.alljobs, jobs = [], [] | |
while run_ids: | |
CPUs = range(multiprocessing.cpu_count()) | |
while CPUs: | |
try: | |
run_id = run_ids.pop() | |
except: | |
return | |
P = multiprocessing.Process(target=self._worker, args=(run_id, verbose)) | |
jobs.append(P) | |
self.alljobs.append(P) | |
P.start() | |
time.sleep(0.25) | |
CPUs.pop() | |
for job in jobs: | |
job.join() | |
jobs = [] | |
def wait_for_queue(self): | |
""" wait for all jobs in queue before proceeding. """ | |
for job in self.alljobs: | |
job.join() | |
return | |
class Test_nohup(Runner): | |
""" | |
Test Runner for nohup | |
Note:: | |
The input and output files will be named according to the run_id | |
passed to add_run method: run_id.inp and run_id.out | |
""" | |
def __init__(self, | |
base_dir=None, | |
run_dir='test', | |
verbose=False, | |
): | |
Runner.__init__(self) | |
# Are we defining a custom base directory? | |
if not base_dir: | |
self.BASE_DIR = os.path.abspath(osp.curdir) | |
print("WARNING, No BASE_DIR defined. Using Defaults.") | |
else: | |
self.BASE_DIR = base_dir | |
self.run_dir = run_dir | |
self.verbose = verbose | |
self.queue = {} | |
self.qcnt = 0 | |
def run(self, run_id, verbose=False): | |
"""push the job to the queue """ | |
cmd = ['nohup', 'a.out'] | |
#args = ['/xnilu_wrk/jfb/SnakePit/pflexrun/a.out > {0}.nh &'.format(run_id)] | |
#cmd = """nohup {0} > {1}.nh""".format(cmd, run_id) | |
os.chdir(osp.join(self.BASE_DIR, self.run_dir)) | |
self._call(cmd, run_id ) | |
os.chdir(self.BASE_DIR) | |
def gen_input(self, run_id=None): | |
""" Creates a FLEXPART Run Class that will set up input/output | |
Directories | |
""" | |
pass | |
def _call(self, cmd, run_id, *args, **kwargs): | |
""" Uses subprocess call convenience function """ | |
""" OVERRIDING _call """ | |
#cmd = 'echo "{0}"'.format(cmd) | |
pid = Popen(cmd, | |
stdout=open('run_{0}.nh'.format(run_id), 'w'), | |
stderr=open('logfile.log', 'a')).pid | |
print 'Run ID: {0}, pid: {1}'.format(run_id, pid) | |
return | |
class Test_pool(Runner): | |
""" | |
Test Runner for pool | |
Note:: | |
The input and output files will be named according to the run_id | |
passed to add_run method: run_id.inp and run_id.out | |
""" | |
def __init__(self, | |
base_dir=None, | |
run_dir='test', | |
verbose=False, | |
): | |
Runner.__init__(self) | |
# Are we defining a custom base directory? | |
if not base_dir: | |
self.BASE_DIR = os.path.abspath(osp.curdir) | |
print("WARNING, No BASE_DIR defined. Using Defaults.") | |
else: | |
self.BASE_DIR = base_dir | |
self.run_dir = run_dir | |
self.verbose = verbose | |
self.queue = {} | |
self.qcnt = 0 | |
def run(self, run_id, verbose=False): | |
"""push the job to the queue """ | |
cmd = ['a.out'] | |
#args = ['/xnilu_wrk/jfb/SnakePit/pflexrun/a.out > {0}.nh &'.format(run_id)] | |
#cmd = """nohup {0} > {1}.nh""".format(cmd, run_id) | |
os.chdir(osp.join(self.BASE_DIR, self.run_dir)) | |
self._call(cmd, run_id ) | |
os.chdir(self.BASE_DIR) | |
def gen_input(self, run_id=None): | |
""" Creates a FLEXPART Run Class that will set up input/output | |
Directories | |
""" | |
pass | |
def _call(self, cmd, run_id, *args, **kwargs): | |
""" Uses subprocess call convenience function """ | |
""" OVERRIDING _call """ | |
#cmd = 'echo "{0}"'.format(cmd) | |
pid = Popen(cmd, | |
stdout=open('run_{0}.nh'.format(run_id), 'w'), | |
stderr=open('logfile.log', 'a')).pid | |
print 'Run ID: {0}, pid: {1}'.format(run_id, pid) | |
return | |
def run_queue(self, runs=None, verbose=0): | |
""" queue is a dictionary keyed by run_id. The values | |
are parsed into options for the program that is called. | |
The option parsing is specific to each program, and not | |
done here, but done in the _run method of the child class. | |
""" | |
assert self.queue, "Must generate at least one input file" | |
# can pass a list of runs, if you don't want all. | |
if not runs: | |
run_ids = self.queue.keys() | |
self.alljobs, jobs = [], [] | |
while run_ids: | |
pool = multiprocessing.Pool(processes=4) | |
args = [] | |
for i in range(4): | |
try: | |
id = run_ids.pop() | |
args.append((id, verbose)) | |
except: | |
break | |
result = pool.apply_async(self.run, args=args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment