Created
January 15, 2012 04:52
-
-
Save gleicon/1614390 to your computer and use it in GitHub Desktop.
gevent+kombu actor/mailbox using decorators
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import with_statement | |
from kombu import BrokerConnection | |
from collections import defaultdict | |
import gevent | |
from gevent import monkey | |
monkey.patch_all() | |
class WorkerHub(): | |
""" | |
WorkerHub controls the local mailboxes that the @worker decorator assigns. | |
Also contains a method to send messages to given workers | |
Depends on kombu + gevent, uses SimpleQueue. default transport is redis | |
""" | |
def __init__(self, transport_url = "redis://127.0.0.1:6379"): | |
self.workers = defaultdict(set) | |
self._transport_url = transport_url | |
self._connected = False | |
def add_worker(self, workername, callback): | |
self.workers[workername].add(callback) | |
def _listener(): | |
while self._connected == False: | |
print "waiting %s" % self._connected | |
gevent.sleep(1) | |
with BrokerConnection(self._transport_url) as conn: | |
with conn.SimpleQueue(workername) as queue: | |
while True: | |
message = queue.get(block=True, timeout=10) | |
if message: | |
self._execute_callbacks(workername, message.payload) | |
message.ack() | |
if self._connected == False: break | |
gevent.spawn(_listener) | |
def _execute_callbacks(self, workername, message): | |
for w in self.workers[workername]: | |
w(message) | |
def send_message(self, workername, message): | |
with BrokerConnection(self._transport_url) as conn: | |
with conn.SimpleQueue(workername) as queue: | |
queue.put(message) | |
def connect(self, transport_url=None): | |
self._connected = True | |
print "connected: %s" % self._connected | |
if transport_url: self._transport_url = transport_url | |
def disconnect(self): | |
self._connected = False | |
wh = WorkerHub() | |
def start_workers(transport_url = None): | |
""" | |
start all workers. provide a different transport url following kombu URI | |
scheme. | |
if no transport is given, defaults to local redis. | |
start_workers() | |
""" | |
if transport_url is None: | |
transport_url = "redis://127.0.0.1:6379" | |
wh.connect(transport_url) | |
def stop_workers(): | |
wh.disconnect() | |
def worker(workername): | |
""" | |
gevent/kombu based worker | |
prepend this decorator to a function that might receive the message. | |
broadcasts the message to all callbacks appended. | |
ex: | |
@worker("/queue") | |
def my_processor(message): | |
print message | |
""" | |
def _decorator(f): | |
wh.add_worker(workername, f) | |
return f | |
return _decorator | |
def send_message(workername, message): | |
""" | |
send a message to a given worker, using the same key as the worker | |
decorator. | |
send_message("/queue", "my message") | |
""" | |
wh.send_message(workername, message) | |
# test | |
@worker('/workers/delicious') | |
def a_worker(message): | |
print "received: %s" % message | |
#send_message('/workers/jazz', "no waaay saaaap") | |
@worker('/workers/jazz') | |
def another_worker(message): | |
print "received2: %s" % message | |
send_message('/workers/delicious', "saaaap") | |
@worker('/workers/delicious') | |
def third_worker(message): | |
print "the other worker received: %s" % message | |
if __name__ == "__main__": | |
start_workers() | |
gevent.sleep(10) | |
print "acabou" | |
stop_workers() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import with_statement | |
from kombu.common import maybe_declare | |
from kombu.pools import producers | |
from kombu import BrokerConnection, Exchange, Queue | |
from kombu.pools import connections | |
transport_url = "redis://127.0.0.1:6379" | |
connection = BrokerConnection(transport_url) | |
_queue = "/workers/delicious" | |
_queue2 = "/workers/jazz" | |
if __name__ == "__main__": | |
with connections[connection].acquire(block=True) as conn: | |
with conn.SimpleQueue(_queue) as queue: | |
queue.put("mensagem") | |
with connections[connection].acquire(block=True) as conn: | |
with conn.SimpleQueue(_queue2) as queue: | |
queue.put("mensagem2") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment