Created
April 15, 2016 21:54
-
-
Save MikeDacre/e672969aff980ee950b9dfa8b2552d40 to your computer and use it in GitHub Desktop.
Python background job runner
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def job_runner(jobqueue, outputs, cores=1, jobno=1): | |
""" | |
jobs: [(command, args)] | |
outputs: {id: retval} | |
""" | |
import sys | |
def output(out): | |
"""Let's try and explicitly clear the dictionary before sending the output.""" | |
lastout = outputs.get() if not outputs.empty() else '' | |
if out == lastout: | |
return | |
while not outputs.empty(): | |
# Clear the output object | |
outputs.get() | |
outputs.put(out) | |
# Make sure we have Queue objects | |
if not isinstance(jobqueue, mp.queues.Queue) \ | |
or not isinstance(outputs, mp.queues.Queue): | |
raise ClusterError('jobqueue and outputs must be multiprocessing ' + | |
'Queue objects') | |
jobno = int(jobno) if jobno else 1 | |
jobno = jobno-1 if jobno is not 0 else 0 | |
jobs = {} | |
runners = {} | |
started = [] | |
done = [] | |
pool = mp.Pool(cores) | |
while True: | |
if not jobqueue.empty(): | |
jobno += 1 | |
job = jobqueue.get_nowait() | |
#function = job.function | |
#args = job.args | |
#depends = job.depends | |
function, args, depends = job | |
jobs[jobno] = {'func': function, 'args': args, 'kwargs': None, 'depends': depends, 'state': None, 'out': None} | |
output(jobs) | |
if jobs: | |
for jobno, job_info in jobs.items(): | |
if job_info['state'] == 'done': | |
continue | |
ready = True | |
if job_info['depends']: | |
for depend in job_info['depends']: | |
if not depend in done: | |
ready = False | |
if ready and not jobno in started: | |
if job_info['args'] and job_info['kwargs']: | |
runners[jobno] = pool.apply_async(job_info['func'], (job_info['args'],), job_info['kwargs']) | |
elif job_info['args']: | |
runners[jobno] = pool.apply_async(job_info['func'], (job_info['args'],)) | |
elif job_info['kwargs']: | |
runners[jobno] = pool.apply_async(job_info['func'], kwds=job_info['kwargs']) | |
else: | |
runners[jobno] = pool.apply_async(job_info['func']) | |
job_info['state'] = 'started' | |
started.append(jobno) | |
output(jobs) | |
sleep(0.5) # Wait for a second to allow job to start | |
if job_info['state'] == 'started' and not jobno in done and runners[jobno].ready(): | |
job_info['out'] = runners[jobno].get() | |
job_info['state'] = 'done' | |
done.append(jobno) | |
output(jobs) | |
sleep(0.5) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment