Last active
March 7, 2017 00:02
-
-
Save MaxMorais/d111f3c4c74e053bdddfa9466548f7e6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#-*- coding: utf-8 -*- | |
"""RoundRobin is a Python library that allow to use Redis for a message queue""" | |
from functools import wraps | |
import multiprocessing | |
from redis import Redis | |
try: | |
import cPickle as pickle | |
except ImportError: | |
import pickle | |
def key_for_name(name): | |
"""Return used to store the given queue name in Redis""" | |
return "roundrobin:{}".format(name.lower()) | |
class RoundRobinAgent(object): | |
"""Base class for all roundrobin subclasses""" | |
def __init__(self, name, serializer=pickle, **kwargs): | |
self.name = name | |
self.serializer = serializer | |
self._redis = Redis(**kwargs) | |
def __len__(self): | |
return self._redis.llen(self.key) | |
@property | |
def key(self): | |
"""Return the key name used to read this queue in Redis""" | |
if not hasattr(self, '__key'): | |
self.__key = key_for_name(self.name) | |
return self.__key | |
def clear(self): | |
"""Clear the queue of all messages, deleting the Redis key.""" | |
self._redis.delete(self.key) | |
def worker(self, *args, **kwargs): | |
"""Decorator for using a function as a queue worker. Example: | |
>>> @consumer.worker(timeout=1) | |
... def printer(msgs): | |
... print msgs | |
>>> printer() | |
my message | |
another message | |
You can also use it without passing any keyword arguments: | |
>>> @consumer.worker | |
... def printer(msg): | |
... print msg | |
>>> printer() | |
my message | |
another message | |
:param async: if ``True`` will encapsulate the worker in a Thread to perform | |
parallelism. (defaults to ``False`` if not given) | |
:param kwargs: any arguments that :meth:`~roundrobin.RoundRobin.get` can | |
accept (:attr:`block` will default to ``True`` if not given) | |
""" | |
def decorator(worker): | |
@wraps(worker) | |
def wrapper(*args): | |
for msg in self.consume(**kwargs): | |
worker(*args + (msg,)) | |
return wrapper | |
if args: | |
return decorator(*args) | |
return decorator | |
class Consumer(RoundRobinAgent): | |
"""Simple FIFO Queue consumer | |
>>> from roundrobin import Consumer | |
>>> queue = Consumer("myqueue", host="localhost", port=6739, db=0) | |
:param name: name of the queue. | |
:param serializer: The class or module to serialize msgs, with have | |
methods or functions named ``dumps`` and ``loads``, | |
`pickle <http://docs.python.org/library/pickle.html>`_ is the default, | |
use ``None`` to read messages in a plain text (suitable for strings, | |
integers, etc.) | |
:param kwargs: additional kwargs to pass to :class:`Redis`, most commonly | |
:attr:`host`, :attr:`port`, :attr:`db` | |
""" | |
def consume(self, **kwargs): | |
""" | |
Return a generator that yields whenever a message is waiting in the queue. | |
Will block otherwise. Example: | |
>> for msg in queue.consume(timeout=1): | |
... print msg | |
my message | |
another message | |
:param kwargs: any arguments that :meth:`~roundrobin.RoundRobin.get` can | |
accept (:attr:`block` will default to ``True`` if not given) | |
""" | |
kwargs.setdefault('block', True) | |
try: | |
while True: | |
msg = self.get(**kwargs) | |
if msg is None: | |
break | |
yield msg | |
except KeyboardInterrupt: | |
print; return | |
def get(self, block=False, timeout=None): | |
"""Return a message from the queue. Example: | |
>>> consumer.get() | |
'my message' | |
>>> consumer.get() | |
'another message' | |
:param block: whether or not to wait until a msg is available in the queue | |
before returning; ``False`` by default. | |
:param timeout: when using :attr:`block`, if no msg is available | |
for :attr:`timeout` in seconds, give up and return ``None`` | |
""" | |
if block: | |
if timeout is None: | |
timeout = 0 | |
msg = self._redis.blpop(self.key, timeout=timeout) | |
if msg is not None: | |
msg = msg[1] | |
else: | |
msg = self._redis.blpop(self.key) | |
if msg is not None and self.serializer is not None: | |
msg = self.serializer.loads(msg) | |
return msg | |
class Producer(RoundRobinAgent): | |
""" | |
Simple Redis Queue Producer | |
>>> from roundrobin import Producer | |
>>> producer = Producer(host="localhost", port=6739, db=0) | |
>>> producer.put("myqueue", "my message") | |
:param name: name of the queue. | |
:param serializer: The class or module to serialize msgs with, with have | |
methods or functions named ``dumps`` and ``loads``, | |
`pickle <http://docs.python.org/library/pickle.html>`_ is the default, | |
use ``None`` to store messages in a plain text (suitable for strings, | |
integers, etc). | |
:param kwargs: additional kwargs to pass to :class:`Redis`, most commonly | |
:attr:`host`, :attr:`port`, :attr:`db` | |
""" | |
def __init__(self, serializer=pickle, **kwargs): | |
super(Producer, self).__init__('', serializer, **kwargs) | |
def clear(self, queue): | |
"""Clear the queue of all messages, deleting the Redis key.""" | |
key = key_for_name(queue) | |
self._redis.delete(key) | |
def put(self, queue, *msgs): | |
"""Put one or more messages onto the queue. Example: | |
>>> queue.put("myqueue", "my message") | |
>>> queue.put("myqueue", "another message") | |
To put messages onto the queue in bulk, which can be significantly | |
faster if you have a large number of messages: | |
>>> queue.put("myqueue", "my message", "another message", "third message") | |
""" | |
if self.serializer is not None: | |
msgs = map(self.serializer.dumps, msgs) | |
key = key_for_name(queue) | |
return self._redis.rpush(key, *msgs) | |
if __name__ == '__main__': | |
queues = ["queue{:02d}".format(i) for i in range(10)] | |
lqueue = len(queues) | |
def consume(name): | |
consumer = Consumer(name, host="localhost", port=13000) | |
@consumer.worker(timeout=20) | |
def printer(msg): | |
try: | |
print msg | |
except Exception as e: | |
producer.put('error', (msg, str(e)) | |
printer() | |
producer = Producer(host='localhost', port=13000) | |
for i in range(100): | |
queue = queues[ i % lqueue ] | |
msg = 'Message {:03d}'.format(i) | |
print queue, producer.put(queue, msg) | |
for name in queues: | |
producer.put(name, None) | |
for name in queues: | |
p = multiprocessing.Process(target=consume, args=(name,)) | |
p.start() | |
p.join() | |
for queue in queues: | |
producer.clear(queue) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment