Skip to content

Instantly share code, notes, and snippets.

@Attumm
Last active June 4, 2018 11:43
Show Gist options
  • Save Attumm/259f61a21b808c692756977f4b66e604 to your computer and use it in GitHub Desktop.
Save Attumm/259f61a21b808c692756977f4b66e604 to your computer and use it in GitHub Desktop.
import time
from workers import RedisQueue
from workers import startapp
config = {
"namespace": "removeme",
"key": "tasks",
"redis_config": {},
"maxsize": 100
}
def produce(items):
r = RedisQueue(**config)
for i in range(items):
r.send(i)
def myfunc(item, worker_id):
print('got item')
time.sleep(1)
print('finished item', locals())
if __name__ == "__main__":
produce(10)
startapp(myfunc, workers=10, config=config)
# start 10 workers
# python worker.py -w 10
# produce 100 item with
# python worker.py -p 100
import sys
import time
import syslog
import json
import redis
from multiprocessing import Pool
config = {
"namespace": "removeme",
"key": "tasks",
"redis_config": {},
"maxsize": 100
}
class RedisQueue:
def __init__(self, namespace, key, redis_config, maxsize=None):
self.r = redis.Redis(**redis_config)
self.list_key = '{}:{}'.format(namespace, key)
self.maxsize = maxsize
def first_inline_send(self, item):
#TODO rename method
self.r.lpush(self.list_key, item)
def send_to(self, key, item):
self.r.rpush('{}:{}'.format(namespace, key), item)
def send(self, item):
"""Adds item to the end of the Redis List.
Side-effects:
If size is above max size, the operation will keep the size the same.
Note that if does not resize the list to maxsize.
"""
if self.maxsize is not None and self.r.llen(self.list_key) >= self.maxsize:
self.r.lpop(self.list_key)
self.r.rpush(self.list_key, item)
def send_dict(self, item):
self.send(json.dumps(item))
def __iter__(self):
return self
def __next__(self):
return self.r.blpop(self.list_key)[1]
def run_worker(func, on_failure_func, config, worker_id):
while True:
try:
r = RedisQueue(**config)
sys.stdout.write(f'worker {worker_id} started\n')
for item in r:
func(item, worker_id)
except (KeyboardInterrupt, SystemExit):
sys.stdout.write(f'worker {worker_id} stopped\n')
r.first_inline_send(item)
break
except Exception as e:
sys.stdout.write(f'worker {worker_id} failed reason {e}\n')
if on_failure_func is not None:
sys.stdout.write(f'worker {worker_id} running failure handler {e}\n')
on_failure_func(item, e, r, worker_id)
time.sleep(0.1)# Throttle restarting
def startapp(func, workers=10, config=config, on_failure_func=None):
p = Pool(workers)
args = ((func, on_failure_func, config, worker_id) for worker_id in range(1, workers+1))
try:
p.starmap(run_worker, args)
except (KeyboardInterrupt, SystemExit):
sys.stdout.write('Starting Gracefull exit')
p.close()
finally:
p.join()
sys.stdout.write('Clean shut down')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment