Created
January 15, 2020 03:20
-
-
Save matutter/f3ac93e8fadd833748970c0d2ae2a69f to your computer and use it in GitHub Desktop.
Pika and Tornado
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Updated from these out of date docs. | |
# https://pika.readthedocs.io/en/stable/examples/tornado_consumer.html | |
from pika.adapters.tornado_connection import TornadoConnection | |
import pika | |
import logging | |
import coloredlogs | |
coloredlogs.install() | |
LOGGER = logging.getLogger(__name__) | |
class ExampleConsumer(object): | |
def __init__(self, amqp_url): | |
self._connection = None | |
self._channel = None | |
self._closing = False | |
self._consumer_tag = None | |
self._url = amqp_url | |
self.EXCHANGE = 'message' | |
self.EXCHANGE_TYPE = 'topic' | |
self.QUEUE = 'text' | |
self.ROUTING_KEY = 'example.text' | |
def connect(self): | |
LOGGER.info('Connecting to %s', self._url) | |
return TornadoConnection(pika.URLParameters(self._url), | |
self.on_connection_open) | |
def close_connection(self): | |
LOGGER.info('Closing connection') | |
self._connection.close() | |
def add_on_connection_close_callback(self): | |
LOGGER.info('Adding connection close callback') | |
self._connection.add_on_close_callback(self.on_connection_closed) | |
def on_connection_closed(self, connection, reason): | |
self._channel = None | |
if self._closing: | |
self._connection.ioloop.stop() | |
else: | |
LOGGER.warning('Connection closed, reopening in 5 seconds: %s', | |
reason) | |
self._connection.ioloop.call_later(5, self.reconnect) | |
def on_connection_open(self, unused_connection): | |
LOGGER.info('Connection opened') | |
self.add_on_connection_close_callback() | |
self.open_channel() | |
def reconnect(self): | |
if not self._closing: | |
# Create a new connection | |
self._connection = self.connect() | |
def add_on_channel_close_callback(self): | |
LOGGER.info('Adding channel close callback') | |
self._channel.add_on_close_callback(self.on_channel_closed) | |
def on_channel_closed(self, channel, reason): | |
LOGGER.warning('Channel %i was closed: %s', channel, reason) | |
try: | |
self._connection.close() | |
except: | |
pass | |
def on_channel_open(self, channel): | |
LOGGER.info('Channel opened') | |
self._channel = channel | |
self.add_on_channel_close_callback() | |
self.setup_exchange(self.EXCHANGE) | |
def setup_exchange(self, exchange_name): | |
LOGGER.info('Declaring exchange %s', exchange_name) | |
self._channel.exchange_declare(exchange=exchange_name, | |
exchange_type=self.EXCHANGE_TYPE, | |
callback=self.on_exchange_declareok) | |
def on_exchange_declareok(self, unused_frame): | |
LOGGER.info('Exchange declared') | |
self.setup_queue(self.QUEUE) | |
def setup_queue(self, queue_name): | |
LOGGER.info('Declaring queue %s', queue_name) | |
self._channel.queue_declare(queue=queue_name, callback=self.on_queue_declareok) | |
def on_queue_declareok(self, method_frame): | |
LOGGER.info('Binding %s to %s with %s', | |
self.EXCHANGE, self.QUEUE, self.ROUTING_KEY) | |
self._channel.queue_bind(callback=self.on_bindok, queue=self.QUEUE, | |
exchange=self.EXCHANGE, routing_key=self.ROUTING_KEY) | |
def add_on_cancel_callback(self): | |
LOGGER.info('Adding consumer cancellation callback') | |
self._channel.add_on_cancel_callback(self.on_consumer_cancelled) | |
def on_consumer_cancelled(self, method_frame): | |
LOGGER.info('Consumer was cancelled remotely, shutting down: %r', | |
method_frame) | |
if self._channel: | |
self._channel.close() | |
def acknowledge_message(self, delivery_tag): | |
LOGGER.info('Acknowledging message %s', delivery_tag) | |
self._channel.basic_ack(delivery_tag) | |
def on_message(self, unused_channel, basic_deliver, properties, body): | |
LOGGER.info('Received message # %s from %s: %s', | |
basic_deliver.delivery_tag, properties.app_id, body) | |
self.acknowledge_message(basic_deliver.delivery_tag) | |
def on_cancelok(self, unused_frame): | |
LOGGER.info('RabbitMQ acknowledged the cancellation of the consumer') | |
self.close_channel() | |
def stop_consuming(self): | |
if self._channel: | |
LOGGER.info('Sending a Basic.Cancel RPC command to RabbitMQ') | |
self._channel.basic_cancel(self.on_cancelok, self._consumer_tag) | |
def start_consuming(self): | |
LOGGER.info('Issuing consumer related RPC commands') | |
self.add_on_cancel_callback() | |
self._consumer_tag = self._channel.basic_consume(self.QUEUE, self.on_message) | |
def on_bindok(self, unused_frame): | |
LOGGER.info('Queue bound') | |
self.start_consuming() | |
def close_channel(self): | |
LOGGER.info('Closing the channel') | |
self._channel.close() | |
def open_channel(self): | |
LOGGER.info('Creating a new channel') | |
self._connection.channel(on_open_callback=self.on_channel_open) | |
def run(self): | |
self._connection = self.connect() | |
self._connection.ioloop.start() | |
def stop(self): | |
LOGGER.info('Stopping') | |
self._closing = True | |
self.stop_consuming() | |
self._connection.ioloop.start() | |
LOGGER.info('Stopped') | |
def main(): | |
example = ExampleConsumer('amqp://guest:guest@localhost:5672/%2F') | |
try: | |
example.run() | |
except KeyboardInterrupt: | |
example.stop() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment