Skip to content

Instantly share code, notes, and snippets.

@SeijiEmery
Last active January 30, 2017 23:43
Show Gist options
  • Save SeijiEmery/b39a91fd507f76153f2e to your computer and use it in GitHub Desktop.
Save SeijiEmery/b39a91fd507f76153f2e to your computer and use it in GitHub Desktop.
Multithreaded Job Manager
''' High level multithreaded job system.
Actual impl would be in c++.
'''
class PendingJob:
''' Stores a job until it gets executed. Uses reference counting to track depencencies
(number of jobs that must be resolved before this can run)
'''
def __init__(self, jobId, run):
self.__dict__.update({
'id': jobId,
'run': run,
'rc': 0
})
class JobState:
PENDING = 0
AVAILABLE = 1
RUNNING = 2
COMPLETED = 3
class JobGraphNode:
''' Used to produce a visual representation of the running job state '''
def __init__(self, jobId, dependencies, jobState, metadata=None):
metadata = metadata or {}
self.__dict__.update({
'id': jobId,
'dependencies': dependencies,
'jobState': jobState
}).update(metadata)
class JobMgr:
def __init__(self):
self.availableJobs = [] # Queue of jobs that can be run now
self.completedJobs = set() # List of completed jobs (anything that's dependent on these can be run immediately)
self.dependentJobMap = {} # Inverse mapping of jobs to dependencies (each available job has a list of the jobs
# that depend on it). When the job is run, each dep's rc is decremented, and if this hits
# zero it gets added to the available job list. It is *critical* that this and each dep
# job's rc stays in sync. All of these containers would either use locking, or, ideally,
# atomic primitives to stay synchronized across threads.
# Job graph (redundant data used for visualization)
self.jobGraph = {}
def uuid():
# return a new UUID (not implemented)
pass
def addJob(self, run, rdeps = None):
rdeps = rdeps or []
job = PendingJob(self.uuid(), run)
for rdep in rdeps:
if rdep not in self.completedJobs:
# Add the dependency (must wait for rc = 0 before it can run)
self.dependentJobMap[rdep].add(job)
job.rc += 1
if job.rc == 0:
# No dependencies -- mark the job as available
self.availableJobs.append(job)
self.dependentJobMap[job.id] = set()
self.jobGraph[job.id] = JobGraphNode(job.id, rdeps, JobState.AVAILABLE if job.rc == 0 else JobState.PENDING, 'name': {
'name': run.__name__,
'func': run
})
return job.id
class JobThread:
def __init__(self, jobMgr):
self.jobMgr = jobMgr
def fetchAndRunJob(self):
until len(self.jobMgr.availableJobs) > 0:
if not self.running:
return
wait()
job = self.jobMgr.availableJobs.popFront()
self.jobGraph[job.id].jobState = JobState.RUNNING
job.run(*job.args)
self.jobMgr.completedJobs.add(job.id)
self.jobGraph[job.id].jobState = JobState.COMPLETED
for j in self.jobMgr.dependentJobMap[job.id]:
j.rc -= 1
if j.rc <= 0:
self.jobMgr.availableJobs.append(j)
self.jobGraph[j.id].jobState = JobState.AVAILABLE
del self.jobMgr.dependentJobMap[job.id]
def run(self):
while self.running:
self.fetchAndRunJob()
def example():
''' Create 3 jobs: foo, bar, and baz. Baz depends on the results of foo and bar. '''
fooInput = { 'x': 10, 'y': 12 }
fooOut = { 'z': None }
def runFoo():
fooOut['z'] = (fooInput['x'] * 13 + fooInput['y'] / 12) % 27
barInput = { 'a': 12, 'b': 123, 'c': 20930 }
barOut = { 'k': None, 'r': None }
def runBar():
barOut['r'] = len([ k for k in range(barIn['a'], barIn['b']) if k % 13 == 0 ])
barOut['k'] = [ ord(k) for k in range(barIn['a'], barIn['b']) if k % 13 == 0 ] * barOut['r']
bazOut = { 'bazzz': None, 'zx': None }
def runBaz():
bazOut['zx'] = (fooOut['z'] * 14 + 87) % barOut['r']
bazOut['bazzz'] = barOut['k'] * barOut['zx']
jobMgr = App.getInstance(JobMgr)
fooTask = jobMgr.addJob(runFoo)
barTask = jobMgr.addJob(runBar)
bazTask = jobMgr.addJob(runBaz, rdeps=[fooTask, barTask])
''' Create a last job that prints out the result of baz (and thus foo and bar).
Assuming that jobMgr has active threads, this should run asynchronously and in the required order.
'''
def printBaz():
print(bazOut['bazzz'])
jobMgr.addJob(printBaz, rdeps=[bazTask])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment