Last active
June 4, 2018 11:43
-
-
Save Attumm/259f61a21b808c692756977f4b66e604 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import time | |
| from workers import RedisQueue | |
| from workers import startapp | |
| config = { | |
| "namespace": "removeme", | |
| "key": "tasks", | |
| "redis_config": {}, | |
| "maxsize": 100 | |
| } | |
| def produce(items): | |
| r = RedisQueue(**config) | |
| for i in range(items): | |
| r.send(i) | |
| def myfunc(item, worker_id): | |
| print('got item') | |
| time.sleep(1) | |
| print('finished item', locals()) | |
| if __name__ == "__main__": | |
| produce(10) | |
| startapp(myfunc, workers=10, config=config) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # start 10 workers | |
| # python worker.py -w 10 | |
| # produce 100 item with | |
| # python worker.py -p 100 | |
| import sys | |
| import time | |
| import syslog | |
| import json | |
| import redis | |
| from multiprocessing import Pool | |
| config = { | |
| "namespace": "removeme", | |
| "key": "tasks", | |
| "redis_config": {}, | |
| "maxsize": 100 | |
| } | |
| class RedisQueue: | |
| def __init__(self, namespace, key, redis_config, maxsize=None): | |
| self.r = redis.Redis(**redis_config) | |
| self.list_key = '{}:{}'.format(namespace, key) | |
| self.maxsize = maxsize | |
| def first_inline_send(self, item): | |
| #TODO rename method | |
| self.r.lpush(self.list_key, item) | |
| def send_to(self, key, item): | |
| self.r.rpush('{}:{}'.format(namespace, key), item) | |
| def send(self, item): | |
| """Adds item to the end of the Redis List. | |
| Side-effects: | |
| If size is above max size, the operation will keep the size the same. | |
| Note that if does not resize the list to maxsize. | |
| """ | |
| if self.maxsize is not None and self.r.llen(self.list_key) >= self.maxsize: | |
| self.r.lpop(self.list_key) | |
| self.r.rpush(self.list_key, item) | |
| def send_dict(self, item): | |
| self.send(json.dumps(item)) | |
| def __iter__(self): | |
| return self | |
| def __next__(self): | |
| return self.r.blpop(self.list_key)[1] | |
| def run_worker(func, on_failure_func, config, worker_id): | |
| while True: | |
| try: | |
| r = RedisQueue(**config) | |
| sys.stdout.write(f'worker {worker_id} started\n') | |
| for item in r: | |
| func(item, worker_id) | |
| except (KeyboardInterrupt, SystemExit): | |
| sys.stdout.write(f'worker {worker_id} stopped\n') | |
| r.first_inline_send(item) | |
| break | |
| except Exception as e: | |
| sys.stdout.write(f'worker {worker_id} failed reason {e}\n') | |
| if on_failure_func is not None: | |
| sys.stdout.write(f'worker {worker_id} running failure handler {e}\n') | |
| on_failure_func(item, e, r, worker_id) | |
| time.sleep(0.1)# Throttle restarting | |
| def startapp(func, workers=10, config=config, on_failure_func=None): | |
| p = Pool(workers) | |
| args = ((func, on_failure_func, config, worker_id) for worker_id in range(1, workers+1)) | |
| try: | |
| p.starmap(run_worker, args) | |
| except (KeyboardInterrupt, SystemExit): | |
| sys.stdout.write('Starting Gracefull exit') | |
| p.close() | |
| finally: | |
| p.join() | |
| sys.stdout.write('Clean shut down') | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment