Created
April 12, 2018 15:47
-
-
Save anthonynsimon/2401d2662ca828e5b90abbc4d0fedee8 to your computer and use it in GitHub Desktop.
Dummy file backed job runner
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import os | |
class JobDoneError(Exception): | |
pass | |
class Job: | |
"""A file backed job, completed steps are commited to disk so it's resumable""" | |
def __init__(self, job_id, steps, filestore): | |
self._job_id = job_id | |
self._steps = steps | |
self._offset = 0 | |
self._filestore = filestore | |
self._rebuild_from_disk() | |
def run(self): | |
while True: | |
try: | |
self._run_next_step() | |
except JobDoneError: | |
print('Job complete!') | |
break | |
def _run_next_step(self): | |
current_step = self._dequeue_step() | |
current_step.run() | |
self._commit_current_step() | |
def _dequeue_step(self): | |
if self._offset >= len(self._steps): | |
raise JobDoneError() | |
return self._steps[self._offset] | |
def _commit_current_step(self): | |
next_offset = self._offset + 1 | |
with open(self._filestore, 'a') as fout: | |
fout.write('{}:{}\n'.format(self._job_id, next_offset)) | |
fout.flush() | |
self._offset = next_offset | |
def _rebuild_from_disk(self): | |
# Ensure file exists | |
if not os.path.exists(self._filestore): | |
with open(self._filestore, 'w'): | |
pass | |
with open(self._filestore) as fin: | |
for line in fin: | |
# Ignore empty lines | |
if len(line.strip()) == 0: | |
continue | |
job_id, offset = line.split(':') | |
if job_id == self._job_id: | |
self._offset = int(offset) | |
class Step: | |
def __init__(self, name): | |
self._name = name | |
def run(self): | |
print(self._name) | |
time.sleep(1) | |
step1 = Step('trigger query 1') | |
step2 = Step('trigger query 2') | |
step3 = Step('transfer results to store') | |
job = Job(job_id='my_awesome_job', steps=[step1, step2, step3], filestore='db.txt') | |
job.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment