Skip to content

Instantly share code, notes, and snippets.

@MikeDacre
Created April 15, 2016 21:54
Show Gist options
  • Save MikeDacre/e672969aff980ee950b9dfa8b2552d40 to your computer and use it in GitHub Desktop.
Save MikeDacre/e672969aff980ee950b9dfa8b2552d40 to your computer and use it in GitHub Desktop.
Python background job runner
def job_runner(jobqueue, outputs, cores=1, jobno=1):
"""
jobs: [(command, args)]
outputs: {id: retval}
"""
import sys
def output(out):
"""Let's try and explicitly clear the dictionary before sending the output."""
lastout = outputs.get() if not outputs.empty() else ''
if out == lastout:
return
while not outputs.empty():
# Clear the output object
outputs.get()
outputs.put(out)
# Make sure we have Queue objects
if not isinstance(jobqueue, mp.queues.Queue) \
or not isinstance(outputs, mp.queues.Queue):
raise ClusterError('jobqueue and outputs must be multiprocessing ' +
'Queue objects')
jobno = int(jobno) if jobno else 1
jobno = jobno-1 if jobno is not 0 else 0
jobs = {}
runners = {}
started = []
done = []
pool = mp.Pool(cores)
while True:
if not jobqueue.empty():
jobno += 1
job = jobqueue.get_nowait()
#function = job.function
#args = job.args
#depends = job.depends
function, args, depends = job
jobs[jobno] = {'func': function, 'args': args, 'kwargs': None, 'depends': depends, 'state': None, 'out': None}
output(jobs)
if jobs:
for jobno, job_info in jobs.items():
if job_info['state'] == 'done':
continue
ready = True
if job_info['depends']:
for depend in job_info['depends']:
if not depend in done:
ready = False
if ready and not jobno in started:
if job_info['args'] and job_info['kwargs']:
runners[jobno] = pool.apply_async(job_info['func'], (job_info['args'],), job_info['kwargs'])
elif job_info['args']:
runners[jobno] = pool.apply_async(job_info['func'], (job_info['args'],))
elif job_info['kwargs']:
runners[jobno] = pool.apply_async(job_info['func'], kwds=job_info['kwargs'])
else:
runners[jobno] = pool.apply_async(job_info['func'])
job_info['state'] = 'started'
started.append(jobno)
output(jobs)
sleep(0.5) # Wait for a second to allow job to start
if job_info['state'] == 'started' and not jobno in done and runners[jobno].ready():
job_info['out'] = runners[jobno].get()
job_info['state'] = 'done'
done.append(jobno)
output(jobs)
sleep(0.5)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment