Skip to content

Instantly share code, notes, and snippets.

@anthonynsimon
Created April 12, 2018 15:47
Show Gist options
  • Save anthonynsimon/2401d2662ca828e5b90abbc4d0fedee8 to your computer and use it in GitHub Desktop.
Save anthonynsimon/2401d2662ca828e5b90abbc4d0fedee8 to your computer and use it in GitHub Desktop.
Dummy file backed job runner
import time
import os
class JobDoneError(Exception):
pass
class Job:
"""A file backed job, completed steps are commited to disk so it's resumable"""
def __init__(self, job_id, steps, filestore):
self._job_id = job_id
self._steps = steps
self._offset = 0
self._filestore = filestore
self._rebuild_from_disk()
def run(self):
while True:
try:
self._run_next_step()
except JobDoneError:
print('Job complete!')
break
def _run_next_step(self):
current_step = self._dequeue_step()
current_step.run()
self._commit_current_step()
def _dequeue_step(self):
if self._offset >= len(self._steps):
raise JobDoneError()
return self._steps[self._offset]
def _commit_current_step(self):
next_offset = self._offset + 1
with open(self._filestore, 'a') as fout:
fout.write('{}:{}\n'.format(self._job_id, next_offset))
fout.flush()
self._offset = next_offset
def _rebuild_from_disk(self):
# Ensure file exists
if not os.path.exists(self._filestore):
with open(self._filestore, 'w'):
pass
with open(self._filestore) as fin:
for line in fin:
# Ignore empty lines
if len(line.strip()) == 0:
continue
job_id, offset = line.split(':')
if job_id == self._job_id:
self._offset = int(offset)
class Step:
def __init__(self, name):
self._name = name
def run(self):
print(self._name)
time.sleep(1)
step1 = Step('trigger query 1')
step2 = Step('trigger query 2')
step3 = Step('transfer results to store')
job = Job(job_id='my_awesome_job', steps=[step1, step2, step3], filestore='db.txt')
job.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment