Created
June 26, 2019 14:27
-
-
Save xmonader/39181a0b9cb19c7b385ac2480ab917f5 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from gevent import monkey; monkey.patch_all() | |
| import time, os, sys, inspect | |
| from pickle import loads as pickle_loads, dumps as pickle_dumps | |
| from dill import loads as dill_loads, dumps as dill_dumps | |
| from redis import Redis | |
| import gevent as g | |
| from gevent import sleep | |
| from hashlib import md5 | |
| from base64 import b64encode, b64decode | |
| from functools import partial | |
| import traceback | |
| STOPPED, SCHEDULED, RUNNING, SUCCESS, FAILURE = range(5) | |
| WORKQ = "jobs-work-q" | |
| ACTIVEQ = "jobs-active-q" | |
| ACKQ = "jobs-ack-list" | |
| class Job: | |
| jid = 0 | |
| def __init__(self, fun, retries=3): | |
| self._jid = Job.jid | |
| self._job_id = None | |
| Job.jid += 1 | |
| self.fun = dill_dumps(fun) | |
| self.funname = fun.__name__ | |
| self.retries = retries | |
| self.state = SCHEDULED | |
| self.args = [] | |
| self.kwargs = {} | |
| self.jobkey_cached = "jobs-results:cache-{}".format(self._hash()) | |
| self.jobkey = "job-results:job-{}".format(self.job_id) | |
| self.result = None | |
| self.error = None | |
| self.start_time = None | |
| self.done_time = None | |
| self.last_modified_time = None # should it be a list of prev handling times? | |
| self.in_process = False | |
| self.memoized = True | |
| @property | |
| def job_id(self): | |
| if not self._job_id: | |
| self._job_id = "{}-{}".format(time.time(), self._jid) | |
| return self._job_id | |
| def getfn(self): | |
| return dill_loads(self.fun) | |
| def _hash(self): | |
| data = str(self.fun)+str(self.args)+str(self.kwargs) | |
| thehash = md5() | |
| thehash.update(data.encode()) | |
| return thehash.hexdigest() | |
| def __call__(self): | |
| return self | |
| def __str__(self): | |
| return str(self.__dict__) | |
| __repr__ = __str__ | |
| def job_dumps(job): | |
| return dill_dumps(job) | |
| def job_loads(data): | |
| return dill_loads(data) | |
| def new_job(fun, *args, **kwargs): | |
| job = Job(fun=fun) | |
| job.state = STOPPED | |
| job.args = args | |
| job.kwargs = kwargs | |
| return job | |
| def on_error(job): | |
| print("error: ", job) | |
| def on_success(job): | |
| print("success: ", job) | |
| class TaskQueue: | |
| def __init__(self): | |
| self.r = Redis() | |
| def schedule_fun(self, fun, *args, **kwargs): | |
| return self.schedule(new_job(fun, *args, **kwargs)) | |
| def schedule(self, job): | |
| job.state = SCHEDULED | |
| print(self.r.lpush(WORKQ, job_dumps(job))) | |
| def _move_job_from_workq_to_activeq(self): | |
| self.r.brpoplpush(WORKQ, ACTIVEQ) | |
| def get_worker_job(self): | |
| if self.r.llen(ACTIVEQ) == 0: | |
| self._move_job_from_workq_to_activeq() | |
| activejob = self.r.lpop(ACTIVEQ) | |
| job = job_loads(activejob) | |
| return job | |
| class BaseWorker: | |
| wid = 0 | |
| class GeventWorker(BaseWorker): | |
| def __init__(self, queue, greenlet=True): | |
| self.q = queue | |
| self.wid = BaseWorker.wid + 1 | |
| BaseWorker.wid += 1 | |
| self.greenlet = greenlet | |
| def work(self): | |
| jn = 0 | |
| print("Starting worker") | |
| while True: | |
| jn += 1 | |
| print("jn # ", jn) | |
| job = self.q.get_worker_job() | |
| args, kwargs = job.args, job.kwargs | |
| fn = job.getfn() | |
| if job.memoized and q.r.exists(job.jobkey_cached): | |
| job.state = SUCCESS | |
| job.result = self.q.r.get(job.jobkey_cached) | |
| self.q.r.set(job.jobkey, job.result) | |
| continue | |
| if self.greenlet and not job.in_process: | |
| f = g.spawn(fn, *args, **kwargs) | |
| f.link_exception(partial(self.on_job_error, job)) | |
| f.link_value(partial(self.on_job_success, job)) | |
| else: | |
| try: | |
| res = fn(*args, **kwargs) | |
| except Exception as m: | |
| self.on_job_error(job, None) | |
| else: | |
| job.state = SUCCESS | |
| job.result = res | |
| self.q.r.set(job.jobkey, res) | |
| # remember cache key. | |
| def on_job_error(self, job, g): | |
| print("JOB ERRORED") | |
| job.retries -= 1 | |
| job.state = FAILURE | |
| job.error = str(traceback.format_exc()) | |
| job.last_modified_time = time.time() | |
| if job.retries > 0: | |
| print("Scheduling again for retry") | |
| self.q.schedule(job) | |
| print(job) | |
| def on_job_success(self, job, g): | |
| print("JOB SUCCESS") | |
| job.state = SUCCESS | |
| value = g.value | |
| job.result = value | |
| now = time.time() | |
| job.last_modified_time = now | |
| job.done_time = now | |
| # print(value) | |
| self.q.r.set(job.jobkey, value) | |
| self.q.r.set(job.jobkey_cached, value) | |
| class WorkersMgr: | |
| def __init__(self, workerclass, queue): | |
| self.workers = {} | |
| self._workerclass = workerclass | |
| self.q = queue | |
| def new_worker(self): | |
| w = self._workerclass(self.q) | |
| self.workers[w.wid] = w | |
| return w | |
| def start_new_worker(self): | |
| w = self.new_worker() | |
| return g.spawn(w.work) | |
| def produce(): | |
| q = TaskQueue() | |
| def longfn1(): | |
| sleep(0) | |
| return "ok" | |
| bglongfn1 = new_job(longfn1) | |
| q.schedule(bglongfn1) | |
| def longfn2(username, lang="en"): | |
| sleep(0) | |
| print("hi ", username, lang) | |
| return username, lang | |
| # # bglongfn2 = new_job(longfn2) | |
| q.schedule_fun(longfn2, username="ahmed", lang="en") | |
| def fail1(): | |
| sleep(1) | |
| raise ValueError("errr") | |
| # bglongfail1 = new_job(fail1) | |
| # q.schedule(bglongfail1) | |
| # q.schedule_fun(fail1) | |
| if __name__ == "__main__": | |
| q = TaskQueue() | |
| argv = sys.argv | |
| if argv[1] == "producer": | |
| for i in range(100): | |
| produce() | |
| elif argv[1] == "worker": | |
| wm = WorkersMgr(GeventWorker, q) | |
| nworkers = 4 | |
| if len(argv) > 2: | |
| nworkers = argv[2] | |
| futures = [] | |
| for w in range(int(nworkers)): | |
| f = wm.start_new_worker() | |
| futures.append(f) | |
| g.joinall(futures, raise_error=False) | |
| elif argv[1] == "clean": | |
| q.r.flushall() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment