Skip to content

Instantly share code, notes, and snippets.

@xmonader
Created June 26, 2019 14:27
Show Gist options
  • Select an option

  • Save xmonader/39181a0b9cb19c7b385ac2480ab917f5 to your computer and use it in GitHub Desktop.

Select an option

Save xmonader/39181a0b9cb19c7b385ac2480ab917f5 to your computer and use it in GitHub Desktop.
from gevent import monkey; monkey.patch_all()
import time, os, sys, inspect
from pickle import loads as pickle_loads, dumps as pickle_dumps
from dill import loads as dill_loads, dumps as dill_dumps
from redis import Redis
import gevent as g
from gevent import sleep
from hashlib import md5
from base64 import b64encode, b64decode
from functools import partial
import traceback
STOPPED, SCHEDULED, RUNNING, SUCCESS, FAILURE = range(5)
WORKQ = "jobs-work-q"
ACTIVEQ = "jobs-active-q"
ACKQ = "jobs-ack-list"
class Job:
jid = 0
def __init__(self, fun, retries=3):
self._jid = Job.jid
self._job_id = None
Job.jid += 1
self.fun = dill_dumps(fun)
self.funname = fun.__name__
self.retries = retries
self.state = SCHEDULED
self.args = []
self.kwargs = {}
self.jobkey_cached = "jobs-results:cache-{}".format(self._hash())
self.jobkey = "job-results:job-{}".format(self.job_id)
self.result = None
self.error = None
self.start_time = None
self.done_time = None
self.last_modified_time = None # should it be a list of prev handling times?
self.in_process = False
self.memoized = True
@property
def job_id(self):
if not self._job_id:
self._job_id = "{}-{}".format(time.time(), self._jid)
return self._job_id
def getfn(self):
return dill_loads(self.fun)
def _hash(self):
data = str(self.fun)+str(self.args)+str(self.kwargs)
thehash = md5()
thehash.update(data.encode())
return thehash.hexdigest()
def __call__(self):
return self
def __str__(self):
return str(self.__dict__)
__repr__ = __str__
def job_dumps(job):
return dill_dumps(job)
def job_loads(data):
return dill_loads(data)
def new_job(fun, *args, **kwargs):
job = Job(fun=fun)
job.state = STOPPED
job.args = args
job.kwargs = kwargs
return job
def on_error(job):
print("error: ", job)
def on_success(job):
print("success: ", job)
class TaskQueue:
def __init__(self):
self.r = Redis()
def schedule_fun(self, fun, *args, **kwargs):
return self.schedule(new_job(fun, *args, **kwargs))
def schedule(self, job):
job.state = SCHEDULED
print(self.r.lpush(WORKQ, job_dumps(job)))
def _move_job_from_workq_to_activeq(self):
self.r.brpoplpush(WORKQ, ACTIVEQ)
def get_worker_job(self):
if self.r.llen(ACTIVEQ) == 0:
self._move_job_from_workq_to_activeq()
activejob = self.r.lpop(ACTIVEQ)
job = job_loads(activejob)
return job
class BaseWorker:
wid = 0
class GeventWorker(BaseWorker):
def __init__(self, queue, greenlet=True):
self.q = queue
self.wid = BaseWorker.wid + 1
BaseWorker.wid += 1
self.greenlet = greenlet
def work(self):
jn = 0
print("Starting worker")
while True:
jn += 1
print("jn # ", jn)
job = self.q.get_worker_job()
args, kwargs = job.args, job.kwargs
fn = job.getfn()
if job.memoized and q.r.exists(job.jobkey_cached):
job.state = SUCCESS
job.result = self.q.r.get(job.jobkey_cached)
self.q.r.set(job.jobkey, job.result)
continue
if self.greenlet and not job.in_process:
f = g.spawn(fn, *args, **kwargs)
f.link_exception(partial(self.on_job_error, job))
f.link_value(partial(self.on_job_success, job))
else:
try:
res = fn(*args, **kwargs)
except Exception as m:
self.on_job_error(job, None)
else:
job.state = SUCCESS
job.result = res
self.q.r.set(job.jobkey, res)
# remember cache key.
def on_job_error(self, job, g):
print("JOB ERRORED")
job.retries -= 1
job.state = FAILURE
job.error = str(traceback.format_exc())
job.last_modified_time = time.time()
if job.retries > 0:
print("Scheduling again for retry")
self.q.schedule(job)
print(job)
def on_job_success(self, job, g):
print("JOB SUCCESS")
job.state = SUCCESS
value = g.value
job.result = value
now = time.time()
job.last_modified_time = now
job.done_time = now
# print(value)
self.q.r.set(job.jobkey, value)
self.q.r.set(job.jobkey_cached, value)
class WorkersMgr:
def __init__(self, workerclass, queue):
self.workers = {}
self._workerclass = workerclass
self.q = queue
def new_worker(self):
w = self._workerclass(self.q)
self.workers[w.wid] = w
return w
def start_new_worker(self):
w = self.new_worker()
return g.spawn(w.work)
def produce():
q = TaskQueue()
def longfn1():
sleep(0)
return "ok"
bglongfn1 = new_job(longfn1)
q.schedule(bglongfn1)
def longfn2(username, lang="en"):
sleep(0)
print("hi ", username, lang)
return username, lang
# # bglongfn2 = new_job(longfn2)
q.schedule_fun(longfn2, username="ahmed", lang="en")
def fail1():
sleep(1)
raise ValueError("errr")
# bglongfail1 = new_job(fail1)
# q.schedule(bglongfail1)
# q.schedule_fun(fail1)
if __name__ == "__main__":
q = TaskQueue()
argv = sys.argv
if argv[1] == "producer":
for i in range(100):
produce()
elif argv[1] == "worker":
wm = WorkersMgr(GeventWorker, q)
nworkers = 4
if len(argv) > 2:
nworkers = argv[2]
futures = []
for w in range(int(nworkers)):
f = wm.start_new_worker()
futures.append(f)
g.joinall(futures, raise_error=False)
elif argv[1] == "clean":
q.r.flushall()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment