Skip to content

Instantly share code, notes, and snippets.

@Attumm
Last active June 3, 2018 14:50
Show Gist options
  • Save Attumm/b79021ae8505bd7d087c2eca749fa722 to your computer and use it in GitHub Desktop.
Save Attumm/b79021ae8505bd7d087c2eca749fa722 to your computer and use it in GitHub Desktop.
# start 10 workers
# python worker.py -w 10
# produce 100 item with
# python worker.py -p 100
import sys
import time
import syslog
import json
import redis
from multiprocessing import Pool
config = {
"namespace": "removeme",
"key": "tasks",
"redis_config": {},
"maxsize": 100
}
class RedisQueue:
def __init__(self, namespace, key, redis_config, maxsize=None):
self.r = redis.Redis(**redis_config)
self.list_key = '{}:{}'.format(namespace, key)
self.maxsize = maxsize
def first_inline_send(self, item):
#TODO rename method
self.r.lpush(self.list_key, item)
def send(self, item):
"""Adds item to the end of the Redis List.
Side-effects:
If size is above max size, the operation will keep the size the same.
Note that if does not resize the list to maxsize.
"""
if self.maxsize is not None and self.r.llen(self.list_key) >= self.maxsize:
self.r.lpop(self.list_key)
self.r.rpush(self.list_key, item)
def send_dict(self, item):
"""Adds item to the end of the Redis List.
Side-effects:
If size is above max size, the operation will keep the size the same.
Note that if does not resize the list to maxsize.
"""
self.send(json.dumps(item))
def __iter__(self):
return self
def __next__(self):
return self.r.blpop(self.list_key)[1]
def runner(item, worker_id):
sys.stdout.write('{}: {}{}'.format(worker_id, type(item), item))
def create_redis_consumer(namespace, key, **redis_config):
r = redis.Redis(**redis_config)
msg_key = '{}:{}'.format(namespace, key)
while True:
#blocks until new item is found.
yield r.blpop(msg_key)[1]
def run_worker(func, config, worker_id):
while True:
try:
r = RedisQueue(**config)
sys.stdout.write(f'worker {worker_id} started\n')
for item in r:
func(item, worker_id)
except (KeyboardInterrupt, SystemExit):
sys.stdout.write(f'worker {worker_id} stopped\n')
r.first_inline_send(item)
break
except Exception as e:
# Throttle restarting
time.sleep(0.1)
sys.stdout.write(f'worker {worker_id} failed reason {e}\n')
def startapp(func, workers=10, config=config):
p = Pool(workers)
args = ((func, config, worker_id) for worker_id in range(1, workers+1))
try:
p.starmap(run_worker, args)
except (KeyboardInterrupt, SystemExit):
sys.stdout.write('Starting Gracefull exit')
p.close()
finally:
p.join()
sys.stdout.write('Clean shut down')
if __name__ == '__main__':
workers = int(sys.argv[sys.argv.index('-w')+1]) if '-w' in sys.argv else 10
produce = int(sys.argv[sys.argv.index('-p')+1]) if '-p' in sys.argv else None
redis_producer = RedisQueue(**config)
if produce is not None:
sys.stdout.write(f'Producing: {produce}\n')
for i in range(produce):
redis_producer.send(i)
sys.stdout.write(f'Done\n')
sys.exit(0)
sys.stdout.write(f'Starting: {workers}\n')
p = Pool(workers)
args = ((config, worker_id) for worker_id in range(1, workers+1))
try:
p.starmap(run_worker, args)
except (KeyboardInterrupt, SystemExit):
sys.stdout.write('Starting Gracefull exit')
p.close()
finally:
p.join()
sys.stdout.write('Clean shut down')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment