Skip to content

Instantly share code, notes, and snippets.

@matutter
Created January 15, 2020 03:20
Show Gist options
  • Save matutter/f3ac93e8fadd833748970c0d2ae2a69f to your computer and use it in GitHub Desktop.
Save matutter/f3ac93e8fadd833748970c0d2ae2a69f to your computer and use it in GitHub Desktop.
Pika and Tornado
# Updated from these out of date docs.
# https://pika.readthedocs.io/en/stable/examples/tornado_consumer.html
from pika.adapters.tornado_connection import TornadoConnection
import pika
import logging
import coloredlogs
coloredlogs.install()
LOGGER = logging.getLogger(__name__)
class ExampleConsumer(object):
def __init__(self, amqp_url):
self._connection = None
self._channel = None
self._closing = False
self._consumer_tag = None
self._url = amqp_url
self.EXCHANGE = 'message'
self.EXCHANGE_TYPE = 'topic'
self.QUEUE = 'text'
self.ROUTING_KEY = 'example.text'
def connect(self):
LOGGER.info('Connecting to %s', self._url)
return TornadoConnection(pika.URLParameters(self._url),
self.on_connection_open)
def close_connection(self):
LOGGER.info('Closing connection')
self._connection.close()
def add_on_connection_close_callback(self):
LOGGER.info('Adding connection close callback')
self._connection.add_on_close_callback(self.on_connection_closed)
def on_connection_closed(self, connection, reason):
self._channel = None
if self._closing:
self._connection.ioloop.stop()
else:
LOGGER.warning('Connection closed, reopening in 5 seconds: %s',
reason)
self._connection.ioloop.call_later(5, self.reconnect)
def on_connection_open(self, unused_connection):
LOGGER.info('Connection opened')
self.add_on_connection_close_callback()
self.open_channel()
def reconnect(self):
if not self._closing:
# Create a new connection
self._connection = self.connect()
def add_on_channel_close_callback(self):
LOGGER.info('Adding channel close callback')
self._channel.add_on_close_callback(self.on_channel_closed)
def on_channel_closed(self, channel, reason):
LOGGER.warning('Channel %i was closed: %s', channel, reason)
try:
self._connection.close()
except:
pass
def on_channel_open(self, channel):
LOGGER.info('Channel opened')
self._channel = channel
self.add_on_channel_close_callback()
self.setup_exchange(self.EXCHANGE)
def setup_exchange(self, exchange_name):
LOGGER.info('Declaring exchange %s', exchange_name)
self._channel.exchange_declare(exchange=exchange_name,
exchange_type=self.EXCHANGE_TYPE,
callback=self.on_exchange_declareok)
def on_exchange_declareok(self, unused_frame):
LOGGER.info('Exchange declared')
self.setup_queue(self.QUEUE)
def setup_queue(self, queue_name):
LOGGER.info('Declaring queue %s', queue_name)
self._channel.queue_declare(queue=queue_name, callback=self.on_queue_declareok)
def on_queue_declareok(self, method_frame):
LOGGER.info('Binding %s to %s with %s',
self.EXCHANGE, self.QUEUE, self.ROUTING_KEY)
self._channel.queue_bind(callback=self.on_bindok, queue=self.QUEUE,
exchange=self.EXCHANGE, routing_key=self.ROUTING_KEY)
def add_on_cancel_callback(self):
LOGGER.info('Adding consumer cancellation callback')
self._channel.add_on_cancel_callback(self.on_consumer_cancelled)
def on_consumer_cancelled(self, method_frame):
LOGGER.info('Consumer was cancelled remotely, shutting down: %r',
method_frame)
if self._channel:
self._channel.close()
def acknowledge_message(self, delivery_tag):
LOGGER.info('Acknowledging message %s', delivery_tag)
self._channel.basic_ack(delivery_tag)
def on_message(self, unused_channel, basic_deliver, properties, body):
LOGGER.info('Received message # %s from %s: %s',
basic_deliver.delivery_tag, properties.app_id, body)
self.acknowledge_message(basic_deliver.delivery_tag)
def on_cancelok(self, unused_frame):
LOGGER.info('RabbitMQ acknowledged the cancellation of the consumer')
self.close_channel()
def stop_consuming(self):
if self._channel:
LOGGER.info('Sending a Basic.Cancel RPC command to RabbitMQ')
self._channel.basic_cancel(self.on_cancelok, self._consumer_tag)
def start_consuming(self):
LOGGER.info('Issuing consumer related RPC commands')
self.add_on_cancel_callback()
self._consumer_tag = self._channel.basic_consume(self.QUEUE, self.on_message)
def on_bindok(self, unused_frame):
LOGGER.info('Queue bound')
self.start_consuming()
def close_channel(self):
LOGGER.info('Closing the channel')
self._channel.close()
def open_channel(self):
LOGGER.info('Creating a new channel')
self._connection.channel(on_open_callback=self.on_channel_open)
def run(self):
self._connection = self.connect()
self._connection.ioloop.start()
def stop(self):
LOGGER.info('Stopping')
self._closing = True
self.stop_consuming()
self._connection.ioloop.start()
LOGGER.info('Stopped')
def main():
example = ExampleConsumer('amqp://guest:guest@localhost:5672/%2F')
try:
example.run()
except KeyboardInterrupt:
example.stop()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment