-
-
Save ygmpkk/372bc83f2cb1e37500fb to your computer and use it in GitHub Desktop.
Greenthread worker
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import eventlet | |
eventlet.monkey_patch() | |
from eventlet.greenpool import GreenPile | |
from kombu.pools import producers | |
from kombu import Exchange, Queue | |
exchange = Exchange('exchange', type='direct') | |
queue = Queue('queue', exchange, routing_key='queue') | |
if __name__ == '__main__': | |
from kombu import Connection | |
connection = Connection('amqp://guest:guest@localhost:5672//') | |
def publish(index): | |
with producers[connection].acquire(block=True) as producer: | |
producer.publish(index, routing_key='queue', serializer='json') | |
pile = GreenPile() | |
for index in xrange(100): | |
pile.spawn(publish, index) | |
list(pile) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import eventlet | |
eventlet.monkey_patch() | |
import random | |
import time | |
import threading | |
from kombu.mixins import ConsumerMixin | |
from kombu.log import get_logger | |
from kombu import Exchange, Queue | |
exchange = Exchange('exchange', type='direct') | |
queue = Queue('queue', exchange, routing_key='queue') | |
logger = get_logger(__name__) | |
class Worker(ConsumerMixin): | |
def __init__(self, connection): | |
self.connection = connection | |
def handle_message(self, body, message): | |
print "CONSUME", body, "in", threading.current_thread().get_name() | |
eventlet.spawn(self.process_task, body, message) | |
def get_consumers(self, Consumer, channel): | |
consumer = Consumer(queues=[queue], | |
accept=['json'], | |
callbacks=[self.handle_message]) | |
consumer.qos(prefetch_count=10) | |
return [consumer] | |
def process_task(self, body, message): | |
time.sleep(random.random()) | |
print "ACK", body, "in", threading.current_thread().get_name() | |
message.ack() | |
if __name__ == '__main__': | |
from kombu import Connection | |
from kombu.utils.debug import setup_logging | |
# setup root logger | |
setup_logging(loglevel='INFO', loggers=['']) | |
with Connection('amqp://guest:guest@localhost:5672//') as conn: | |
try: | |
worker = Worker(conn) | |
eventlet.spawn(worker.run).wait() | |
except KeyboardInterrupt: | |
print('bye bye') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment