Last active
June 3, 2018 14:50
-
-
Save Attumm/b79021ae8505bd7d087c2eca749fa722 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # start 10 workers | |
| # python worker.py -w 10 | |
| # produce 100 item with | |
| # python worker.py -p 100 | |
| import sys | |
| import time | |
| import syslog | |
| import json | |
| import redis | |
| from multiprocessing import Pool | |
| config = { | |
| "namespace": "removeme", | |
| "key": "tasks", | |
| "redis_config": {}, | |
| "maxsize": 100 | |
| } | |
| class RedisQueue: | |
| def __init__(self, namespace, key, redis_config, maxsize=None): | |
| self.r = redis.Redis(**redis_config) | |
| self.list_key = '{}:{}'.format(namespace, key) | |
| self.maxsize = maxsize | |
| def first_inline_send(self, item): | |
| #TODO rename method | |
| self.r.lpush(self.list_key, item) | |
| def send(self, item): | |
| """Adds item to the end of the Redis List. | |
| Side-effects: | |
| If size is above max size, the operation will keep the size the same. | |
| Note that if does not resize the list to maxsize. | |
| """ | |
| if self.maxsize is not None and self.r.llen(self.list_key) >= self.maxsize: | |
| self.r.lpop(self.list_key) | |
| self.r.rpush(self.list_key, item) | |
| def send_dict(self, item): | |
| """Adds item to the end of the Redis List. | |
| Side-effects: | |
| If size is above max size, the operation will keep the size the same. | |
| Note that if does not resize the list to maxsize. | |
| """ | |
| self.send(json.dumps(item)) | |
| def __iter__(self): | |
| return self | |
| def __next__(self): | |
| return self.r.blpop(self.list_key)[1] | |
| def runner(item, worker_id): | |
| sys.stdout.write('{}: {}{}'.format(worker_id, type(item), item)) | |
| def create_redis_consumer(namespace, key, **redis_config): | |
| r = redis.Redis(**redis_config) | |
| msg_key = '{}:{}'.format(namespace, key) | |
| while True: | |
| #blocks until new item is found. | |
| yield r.blpop(msg_key)[1] | |
| def run_worker(func, config, worker_id): | |
| while True: | |
| try: | |
| r = RedisQueue(**config) | |
| sys.stdout.write(f'worker {worker_id} started\n') | |
| for item in r: | |
| func(item, worker_id) | |
| except (KeyboardInterrupt, SystemExit): | |
| sys.stdout.write(f'worker {worker_id} stopped\n') | |
| r.first_inline_send(item) | |
| break | |
| except Exception as e: | |
| # Throttle restarting | |
| time.sleep(0.1) | |
| sys.stdout.write(f'worker {worker_id} failed reason {e}\n') | |
| def startapp(func, workers=10, config=config): | |
| p = Pool(workers) | |
| args = ((func, config, worker_id) for worker_id in range(1, workers+1)) | |
| try: | |
| p.starmap(run_worker, args) | |
| except (KeyboardInterrupt, SystemExit): | |
| sys.stdout.write('Starting Gracefull exit') | |
| p.close() | |
| finally: | |
| p.join() | |
| sys.stdout.write('Clean shut down') | |
| if __name__ == '__main__': | |
| workers = int(sys.argv[sys.argv.index('-w')+1]) if '-w' in sys.argv else 10 | |
| produce = int(sys.argv[sys.argv.index('-p')+1]) if '-p' in sys.argv else None | |
| redis_producer = RedisQueue(**config) | |
| if produce is not None: | |
| sys.stdout.write(f'Producing: {produce}\n') | |
| for i in range(produce): | |
| redis_producer.send(i) | |
| sys.stdout.write(f'Done\n') | |
| sys.exit(0) | |
| sys.stdout.write(f'Starting: {workers}\n') | |
| p = Pool(workers) | |
| args = ((config, worker_id) for worker_id in range(1, workers+1)) | |
| try: | |
| p.starmap(run_worker, args) | |
| except (KeyboardInterrupt, SystemExit): | |
| sys.stdout.write('Starting Gracefull exit') | |
| p.close() | |
| finally: | |
| p.join() | |
| sys.stdout.write('Clean shut down') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment