Last active
July 7, 2022 19:38
-
-
Save mattbennett/30cc95d42346df62a60e to your computer and use it in GitHub Desktop.
Greenthread worker
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import eventlet | |
eventlet.monkey_patch() | |
from eventlet.greenpool import GreenPile | |
from kombu.pools import producers | |
from kombu import Exchange, Queue | |
exchange = Exchange('exchange', type='direct') | |
queue = Queue('queue', exchange, routing_key='queue') | |
if __name__ == '__main__': | |
from kombu import Connection | |
connection = Connection('amqp://guest:guest@localhost:5672//') | |
def publish(index): | |
with producers[connection].acquire(block=True) as producer: | |
producer.publish(index, routing_key='queue', serializer='json') | |
pile = GreenPile() | |
for index in xrange(100): | |
pile.spawn(publish, index) | |
list(pile) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import eventlet | |
eventlet.monkey_patch() | |
import random | |
import time | |
import threading | |
from kombu.mixins import ConsumerMixin | |
from kombu.log import get_logger | |
from kombu import Exchange, Queue | |
exchange = Exchange('exchange', type='direct') | |
queue = Queue('queue', exchange, routing_key='queue') | |
logger = get_logger(__name__) | |
class Worker(ConsumerMixin): | |
def __init__(self, connection): | |
self.connection = connection | |
def handle_message(self, body, message): | |
print "CONSUME", body, "in", threading.current_thread().get_name() | |
eventlet.spawn(self.process_task, body, message) | |
def get_consumers(self, Consumer, channel): | |
consumer = Consumer(queues=[queue], | |
accept=['json'], | |
callbacks=[self.handle_message]) | |
consumer.qos(prefetch_count=10) | |
return [consumer] | |
def process_task(self, body, message): | |
time.sleep(random.random()) | |
print "ACK", body, "in", threading.current_thread().get_name() | |
message.ack() | |
if __name__ == '__main__': | |
from kombu import Connection | |
from kombu.utils.debug import setup_logging | |
# setup root logger | |
setup_logging(loglevel='INFO', loggers=['']) | |
with Connection('amqp://guest:guest@localhost:5672//') as conn: | |
try: | |
worker = Worker(conn) | |
eventlet.spawn(worker.run).wait() | |
except KeyboardInterrupt: | |
print('bye bye') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
RabbitMQ recommends use ONE Connection with multiple Channels.
Why not use ChannelPool instead of ConnectionPool in producers ?