Last active
November 14, 2018 14:13
-
-
Save sjlongland/3b89ce91d065851b292a to your computer and use it in GitHub Desktop.
Tornado Coroutine interface to AMQP (pika)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import datetime | |
import logging | |
import threading | |
import weakref | |
import tornado | |
import tornado.gen | |
import pika | |
''' | |
Pika-Tornado: a Tornado coroutine-based abstraction layer for Pika. | |
''' | |
# (C) 2015 Stuart Longland <[email protected]> | |
# Released under the terms of the Mozilla Public License v2.0 | |
class AMQPObject(object): | |
@classmethod | |
def _get_log(cls, *name): | |
return logging.getLogger('.'.join( | |
(cls.__module__, cls.__name__) + name)) | |
class AMQPConnection(AMQPObject): | |
''' | |
Connection to AMQP, this object handles unexpected disconnections (and | |
re-connects accordingly) and wraps the channel opening process in a | |
coroutine-friendly manner. | |
Support for multiple IOLoops is provided and thread-safety is attempted. | |
Callbacks may be specified in a number of ways: | |
- as a reference to the function directly | |
- as a tuple: | |
- first element gives the IOLoop (or None for the client's IOLoop) | |
- second element gives the reference to the function | |
- third element if present gives the initial args | |
- forth element if present gives the initial kwargs | |
''' | |
CONNECTION_CLOSED = pika.connection.Connection.CONNECTION_CLOSED | |
CONNECTION_INIT = pika.connection.Connection.CONNECTION_INIT | |
CONNECTION_PROTOCOL = pika.connection.Connection.CONNECTION_PROTOCOL | |
CONNECTION_START = pika.connection.Connection.CONNECTION_START | |
CONNECTION_TUNE = pika.connection.Connection.CONNECTION_TUNE | |
CONNECTION_OPEN = pika.connection.Connection.CONNECTION_OPEN | |
CONNECTION_CLOSING = pika.connection.Connection.CONNECTION_CLOSING | |
def __init__(self, parameters, on_open_callback=None, | |
on_open_error_callback=None, on_close_callback=None, | |
on_giveup_callback=None, reconnect_delay=5.0, reconnect_max=-1, | |
io_loop=None, io_thread=None): | |
''' | |
Initialise a connection to AMQP. | |
:param: pika.connection.Parameters parameters: | |
Connection parameters, see Pika documentation. | |
:param method on_open_callback: | |
Called when the connection is opened. | |
:param method on_open_error_callback: | |
Called if the connection can't be opened. | |
:param method on_close_callback: | |
Called when the connection is closed. | |
:param method on_giveup_callback: | |
Called when the connection attempts have been exhausted | |
and this library "gives up". This might be used to log | |
an alarm and shut down the daemon. | |
:param float reconnect_delay: | |
The amount of time to wait before attempting to reconnect. | |
Set to 0 to disable reconnect. | |
:param int reconnect_max: | |
The maximum number of reconnection attempts. Set to 0 to | |
permit unlimited reconnections. | |
:param tornado.ioloop.IOLoop io_loop: | |
IOLoop instance to use for the AMQP client itself. By default, | |
the current IOLoop is used. | |
:param threading.Thread io_thread: | |
Thread used for AMQP communications. Since Pika itself is not | |
thread-safe, it is imperative that all AMQP operations take place | |
in the same thread as the AMQP IOLoop instance. | |
If this is different to the thread the constructor is running in | |
for whatever reason, the reference to that thread can be given | |
here. | |
''' | |
if io_loop is None: | |
io_loop = tornado.ioloop.IOLoop.current() | |
if io_thread is None: | |
io_thread = threading.current_thread() | |
self._io_loop = io_loop | |
self._io_thread = io_thread | |
self._reconnect_delay = datetime.timedelta(seconds=reconnect_delay) | |
self._reconnect_max = reconnect_max | |
self._reconnect_rem = reconnect_max | |
self._reconnect = reconnect_delay > 0 | |
self._giveup = False | |
self._on_open_cb = [] | |
self._on_open_error_cb = [] | |
self._on_close_cb = [] | |
self._on_giveup_cb = [] | |
if on_open_callback is not None: | |
self._on_open_cb.append( | |
self._get_callback(on_open_callback)) | |
if on_open_error_callback is not None: | |
self._on_open_error_cb.append( | |
self._get_callback(on_open_error_callback)) | |
if on_close_callback is not None: | |
self._on_close_cb.append( | |
self._get_callback(on_close_callback)) | |
if on_giveup_callback is not None: | |
self._on_giveup_cb.append( | |
self._get_callback(on_giveup_callback)) | |
self._amqp_parameters = parameters | |
self._amqp_connection = None | |
self._channels = [] | |
self._replace_connection() | |
# ------------------------------------------------------------------------ | |
# Connection state | |
# ------------------------------------------------------------------------ | |
connection_state = property(lambda s : \ | |
s._amqp_connection.connection_state) | |
is_closed = property(lambda s : s._amqp_connection.is_closed) | |
is_closing = property(lambda s : s._amqp_connection.is_closing) | |
is_open = property(lambda s : s._amqp_connection.is_open) | |
# ------------------------------------------------------------------------ | |
# Connection callbacks | |
# ------------------------------------------------------------------------ | |
def add_on_open_callback(self, callback, callback_args=None, | |
callback_kwargs=None): | |
self._on_open_cb.append( | |
self._get_callback(callback, callback_args, callback_kwargs)) | |
def add_on_open_error_callback(self, callback, callback_args=None, | |
callback_kwargs=None): | |
self._on_open_error_cb.append( | |
self._get_callback(callback, callback_args, callback_kwargs)) | |
def add_on_close_callback(self, callback, callback_args=None, | |
callback_kwargs=None): | |
self._on_close_cb.append( | |
self._get_callback(callback, callback_args, callback_kwargs)) | |
def add_on_giveup_callback(self, callback, callback_args=None, | |
callback_kwargs=None): | |
self._on_giveup.append( | |
self._get_callback(callback, callback_args, callback_kwargs)) | |
# ------------------------------------------------------------------------ | |
# Connection management | |
# ------------------------------------------------------------------------ | |
def _replace_connection(self): | |
log = self._get_log('_replace_connection') | |
try: | |
log.debug('Gathering a list of existing channels') | |
# Grab a list of channels | |
channels = self._get_channels() | |
log.debug( 'We have %s channels to reopen. Opening '\ | |
'connection to AMQP with parameters %s.', | |
len(channels), self._amqp_parameters) | |
# Replace the connection | |
self._amqp_connection = pika.adapters.TornadoConnection( | |
parameters = self._amqp_parameters, | |
on_open_callback = self._handle_on_open_callback, | |
on_open_error_callback = self._handle_on_open_error_callback, | |
on_close_callback = self._handle_on_close_callback, | |
custom_ioloop = self._io_loop) | |
log.debug('AMQP connection started') | |
# Mark these channels as stale | |
for ch in channels: | |
try: | |
log.debug('Re-opening %s', ch) | |
ch._mark_stale() | |
except: | |
pass | |
except: | |
log.exception('Failed to replace connection') | |
def connect(self, timeout=None): | |
''' | |
Connect to AMQP. | |
''' | |
return self._run_async( | |
self._amqp_connection.connect, | |
(), | |
{}, | |
timeout, | |
None, | |
None) | |
def close(self, *args, **kwargs): | |
''' | |
Disconnect from AMQP. This disables reconnections. | |
See pika.BaseConnection.close for details on arguments. | |
''' | |
timeout = kwargs.pop('timeout',None) | |
self._giveup = False | |
self._reconnect = False | |
return self._run_async( | |
async_func = self._amqp_connection.close, | |
args = args, | |
kwargs = kwargs, | |
timeout = timeout, | |
cb_func = None, | |
future = None) | |
def set_backpressure_multiplier(self, *args, **kwargs): | |
return self._run_async( | |
async_func = self._amqp_connection.set_backpressure_multiplier, | |
args = args, | |
kwargs = kwargs, | |
timeout = timeout, | |
cb_func = None, | |
future = None) | |
# ------------------------------------------------------------------------ | |
# Connection callbacks | |
# ------------------------------------------------------------------------ | |
def add_backpressure_callback(self, callback): | |
return self._run_async( | |
self._amqp_connection.add_backpressure_callback, | |
args=(self._get_callback(callback),)) | |
def add_on_close_callback(self, callback): | |
return self._run_async( | |
self._amqp_connection.add_on_close_callback, | |
args=(self._get_callback(callback),)) | |
def add_on_open_callback(self, callback): | |
return self._run_async( | |
self._amqp_connection.add_on_open_callback, | |
args=(self._get_callback(callback),)) | |
def add_on_open_error_callback(self, callback): | |
return self._run_async( | |
self._amqp_connection.add_on_open_error_callback, | |
args=(self._get_callback(callback),)) | |
# ------------------------------------------------------------------------ | |
# Channel management | |
# ------------------------------------------------------------------------ | |
def _cleanup_channels(self): | |
self._channels = filter(lambda cr : cr() is not None, self._channels) | |
def _get_channels(self): | |
self._cleanup_channels() | |
return [ch() for ch in self._channels] | |
@tornado.gen.coroutine | |
def channel(self, channel_number=None, timeout=None): | |
''' | |
Open a new channel on the AMQP connection. | |
''' | |
ch = AMQPChannel(self, channel_number) | |
yield ch._init_channel(timeout) | |
self._cleanup_channels() | |
self._channels.append(weakref.ref(ch)) | |
raise tornado.gen.Return(ch) | |
# ------------------------------------------------------------------------ | |
# Connection handling callbacks | |
# ------------------------------------------------------------------------ | |
def _handle_on_open_callback(self, *args, **kwargs): | |
''' | |
What to do when the connection is opened? | |
''' | |
log = self._get_log('_handle_on_open_callback') | |
try: | |
log.info('Connection open') | |
self._reconnect = self._reconnect_delay.total_seconds() > 0 | |
self._reconnect_rem = self._reconnect_max | |
self._giveup = False | |
# Grab a list of channels | |
channels = self._get_channels() | |
# Re-open those channels | |
for ch in channels: | |
log.debug('Re-opening %s', ch) | |
yield ch._init_channel() | |
log.debug('Calling open callbacks') | |
for cb in self._on_open_cb: | |
self._io_loop.add_callback(cb, *args, **kwargs) | |
except: | |
log.exception('Failed to handle connection open') | |
def _handle_on_close_callback(self, *args, **kwargs): | |
''' | |
What to do when the connection is closed? | |
''' | |
log = self._get_log('_handle_on_close_callback') | |
try: | |
log.info('Connection closed') | |
for cb in self._on_close_cb: | |
try: | |
cb(*args, **kwargs) | |
except: | |
log.exception('Exception in callback %s(*%s, **%s)', | |
cb, args, kwargs) | |
except: | |
log.exception('Exception in close callback') | |
self._reconnect() | |
def _handle_on_open_error_callback(self, connection, error, | |
*args, **kwargs): | |
''' | |
What to do if we fail to open the connection? | |
''' | |
log = self._get_log('_handle_on_open_error_callback') | |
try: | |
log.error('Received %s whilst attempting connection', error) | |
# Pass it through to the callback handler we were given | |
if self._on_open_error_cb is not None: | |
for cb in self._on_open_error_cb: | |
try: | |
cb(connection, error, *args, **kwargs) | |
except: | |
log.exception('Error in callback %s(*%s, **%s)', | |
cb, args, kwargs) | |
except: | |
log.exception('Failed in open-error callback') | |
self._reconnect() | |
def _reconnect(self): | |
''' | |
Perform a re-connection, if enabled. | |
''' | |
log = self._get_log('_reconnect') | |
try: | |
# Sanity check, ensure we're running in the correct thread for this. | |
# add_timeout is *NOT* thread-safe. | |
if not self._is_own_thread: | |
# This should do the trick! | |
self._io_loop.add_callback(_reconnect) | |
return | |
# Reconnect? | |
if self._reconnect: | |
if self._reconnect_max > 0: | |
self._reconnect_rem -= 1 | |
self._reconnect = self._reconnect_rem > 0 | |
self._giveup = not self._reconnect | |
if self._giveup: | |
log.error('This is our last connection attempt!') | |
# Schedule a reconnect. We do this by completely replacing the | |
# AMQP connection object. | |
self._io_loop.add_timeout( | |
self._reconnect_delay, self._replace_connection) | |
log.error('Re-connecting in %s.', | |
self._reconnect_delay) | |
elif self._giveup: | |
log.fatal('Giving up!') | |
for cb in self._on_giveup_cb: | |
try: | |
cb() | |
except: | |
log.exception('Exception in callback %s', cb) | |
except: | |
log.exception('Failure in recnnection') | |
# ------------------------------------------------------------------------ | |
# Callback and thread-safety utilities | |
# ------------------------------------------------------------------------ | |
@property | |
def _is_own_thread(self): | |
''' | |
Return True if we're running in our own thread. | |
''' | |
return self._io_thread is threading.current_thread() | |
def _schedule_timeout(self, timeout, future, cb_func=None, | |
cb_args=None, cb_kwargs=None): | |
''' | |
Place a TimeoutException into the future if the future is not | |
completed in time. | |
''' | |
assert self._is_own_thread, 'This isn\'t my thread!' | |
if cb_args is None: | |
cb_args = () | |
if cb_kwargs is None: | |
cb_kwargs = {} | |
def _timeout(): | |
if not future.done(): | |
future.set_exception(TimeoutError()) | |
if cb_func is not None: | |
cb_func(*cb_args, **cb_kwargs) | |
return self._io_loop.add_timeout(timeout, _timeout) | |
@tornado.gen.coroutine | |
def _run_async(self, async_func, args=None, kwargs=None, timeout=None, | |
cb_func=None, callback_name='callback', future=None): | |
''' | |
Run a method in our own thread and return its result. | |
args and kwargs are passed to async_func, which will be run in the | |
AMQP IOLoop. | |
timeout specifies a time limit on the operation, and will result in a | |
TimeoutError if the operation does not complete. | |
callback, if given, causes a callback keyword argument to be passed to | |
async_func which it should run when the result comes back. | |
return_in_future specifies | |
''' | |
log = self._get_log('_run_async') | |
try: | |
#log.debug('Run %s\n\twith args %s\n\tkwargs %s'\ | |
# '\n\ttimeout %s\n\tstore result in future %s'\ | |
# '\n\tand using callback %s (named %s)', | |
# async_func, args, kwargs, timeout, future, | |
# cb_func, callback_name) | |
if args is None: | |
args = () | |
if kwargs is None: | |
kwargs = {} | |
timeout_ref = None | |
return_result = future is None | |
if self._is_own_thread: | |
#log.debug('We are running in our own thread') | |
# We're in our own thread. Is there a callback involved? | |
if cb_func is None: | |
# There isn't. This is easy then, just call the function and | |
# return the result. Since the function is synchronous, we | |
# ignore the timeout. | |
if return_result: | |
#log.debug('Running directly') | |
result = async_func(*args, **kwargs) | |
raise tornado.gen.Return(result) | |
else: | |
try: | |
result = async_func(*args, **kwargs) | |
future.set_result(result) | |
except Exception,e: | |
log.exception('Exception running %s', | |
async_func) | |
future.set_exception(e) | |
else: | |
#log.debug('Running via callback') | |
# The result (or an exception) will be placed in this future. | |
future = future or tornado.concurrent.Future() | |
# Create a timeout object if we're given a time limit. | |
if timeout is not None: | |
timeout_ref = self._schedule_timeout(timeout, future) | |
# The function takes a callback argument which provides the | |
# result we're after. We've been given the function that will | |
# rat through what was given to us and will provide the result. | |
# The first argument given to the callback is the future. | |
cb_func = self._get_callback(cb_func) | |
def _callback(*args, **kwargs): | |
log = self._get_log('_run_async._callback') | |
#log.debug('Got response args=%s, kwargs=%s', | |
# args, kwargs) | |
try: | |
cb_func(future, *args, **kwargs) | |
except TimeoutError: | |
timeout_ref = None | |
except Exception, e: | |
if not future.done(): | |
future.set_exception(e) | |
else: | |
log.exception('Exception in callback') | |
kwargs = kwargs.copy() | |
kwargs[callback_name] = _callback | |
# Perform the action given... | |
#log.debug('Performing asynchronous operation') | |
try: | |
#log.debug('Executing %s(*%s, **%s)', | |
# async_func, args, kwargs) | |
async_func(*args, **kwargs) | |
except Exception, e: | |
if not return_result: | |
future.set_exception(e) | |
else: | |
raise | |
if timeout_ref is not None: | |
self._io_loop.remove_timeout(timeout_ref) | |
else: | |
assert self._io_thread.is_alive(), 'IO Thread is dead' | |
#log.debug('Scheduling to run in our own IOLoop (we are '\ | |
# 'in a foreign thread; current=%s vs ours=%s)', | |
# threading.current_thread(), self._io_thread) | |
# We're in the wrong thread, so schedule this to take place in | |
# our own IOLoop. First, create a future to store this in. | |
future = future or tornado.concurrent.Future() | |
# Then call ourselves with the new future. | |
self._io_loop.add_callback( | |
self._run_async, | |
async_func=async_func, | |
args=args, | |
kwargs=kwargs, | |
timeout=timeout, | |
cb_func=cb_func, | |
callback_name=callback_name, | |
future=future) | |
#log.debug('Return result? %s', return_result) | |
if return_result: | |
# Wait for the result | |
#log.debug('Waiting for result') | |
result = yield future | |
if timeout_ref is not None: | |
self._io_loop.remove_timeout(timeout_ref) | |
#log.debug('Result = %s', result) | |
# Return the result | |
raise tornado.gen.Return(result) | |
else: | |
# Return nothing to prevent the IOLoop from waiting. | |
raise tornado.gen.Return(None) | |
except tornado.gen.Return: | |
raise | |
except: | |
log.exception('Failed to execute') | |
raise | |
def _get_callback(self, callback_ref, callback_args=None, | |
callback_kwargs=None): | |
''' | |
Return a callback function from the given reference. | |
''' | |
#log = self._get_log('_get_callback') | |
if callback_ref is None: | |
return None | |
if callback_args is None: | |
callback_args = () | |
if callback_kwargs is None: | |
callback_kwargs = {} | |
if isinstance(callback_ref, tuple): | |
(callback_ioloop, callback_func) = callback_ref[0:2] | |
if len(callback_ref) > 2: | |
callback_args = callback_args + (callback_ref[2] or ()) | |
if len(callback_ref) > 3: | |
callback_kwargs.update(callback_ref[3] or {}) | |
else: | |
callback_ioloop = None | |
callback_func = callback_ref | |
if callback_ioloop is None: | |
callback_ioloop = self._io_loop | |
def _callback(*args, **kwargs): | |
log = self._get_log('_get_callback','_callback') | |
#log.debug('Called with\n\targs=%s,\n\tkwargs=%s', args, kwargs) | |
# TODO: why is this happening? | |
if (len(args) == 1) and isinstance(args[0], tuple): | |
args = args[0] | |
try: | |
cb_args = callback_args + args | |
cb_kwargs = callback_kwargs.copy() | |
cb_kwargs.update(kwargs) | |
if self._io_loop is callback_ioloop: | |
# We're running this ourselves | |
#log.debug('Calling function in own IOLoop') | |
cb_res = callback_func(*args, **kwargs) | |
if isinstance(cb_res, tornado.concurrent.Future): | |
#log.debug( 'Function is asynchronous, '\ | |
# 'waiting for result') | |
@tornado.gen.coroutine | |
def _get_result(future): | |
#log.debug('Getting result') | |
try: | |
result = yield future | |
#log.debug('Callback result: %s', result) | |
except: | |
log.exception('Callback failed') | |
self._io_loop.add_future(cb_res, _get_result) | |
#else: | |
#log.debug('Callback result: %s', cb_res) | |
else: | |
#log.debug('Scheduling function in external IOLoop') | |
callback_ioloop.add_callback( | |
callback_func, *cb_args, **cb_kwargs) | |
#log.debug('Callback scheduled') | |
except: | |
log.exception('Callback failed: %s', callback_func) | |
#log.debug('Wrapping function %s in callback %s', | |
# callback_func, _callback) | |
return _callback | |
class AMQPChannel(AMQPObject): | |
''' | |
An abstraction for the AMQP Channel. | |
''' | |
def __init__(self, connection, channel_number): | |
''' | |
Initialise a new channel object. | |
''' | |
log = self._get_log('__init__') | |
log.debug('New channel') | |
self._connection = connection | |
self._channel_number= channel_number | |
self._channel = None | |
# Queues and exchanges to re-establish | |
self._exchanges = [] | |
self._queues = [] | |
_amqp_connection = property(lambda s : \ | |
s._connection._amqp_connection) | |
def _mark_stale(self): | |
self._get_log('_mark_stale').debug( | |
'Channel is now stale') | |
self._channel = None | |
@tornado.gen.coroutine | |
def _init_channel(self, timeout=None): | |
log = self._get_log('channel') | |
def _callback(future, channel): | |
log.debug('Opened new channel %s', channel) | |
if not future.done(): | |
future.set_result(channel) | |
else: | |
log.error('Future already done, cannot return channel') | |
channel.close() | |
log.debug('Opening new channel (number=%s)', self._channel_number) | |
try: | |
channel = yield self._connection._run_async( | |
async_func = self._amqp_connection.channel, | |
args = None, | |
kwargs = {'channel_number': self._channel_number}, | |
timeout = timeout, | |
cb_func = _callback, | |
callback_name = 'on_open_callback', | |
future = None) | |
except: | |
log.exception('Failed to open channel') | |
raise | |
self._channel = channel | |
# If we had any exchanges, declare those now | |
for ex in self._get_exchanges(): | |
try: | |
log.debug('Declaring exchange %s', ex.exchange) | |
yield ex.declare(timeout) | |
except: | |
log.exception('Failed to declare exchange %s', ex.exchange) | |
# If we had any queues, declare those now | |
for q in self._get_queues(): | |
try: | |
log.debug('Declaring queue %s', q.routing_key) | |
yield q.declare(timeout) | |
except: | |
log.exception('Failed to declare exchange %s', q.routing_key) | |
# ------------------------------------------------------------------------ | |
# Channel status | |
# ------------------------------------------------------------------------ | |
is_closed = property(lambda s : s._channel.is_closed) | |
is_closing = property(lambda s : s._channel.is_closing) | |
is_open = property(lambda s : s._channel.is_open) | |
# ------------------------------------------------------------------------ | |
# Channel callbacks | |
# ------------------------------------------------------------------------ | |
def add_callback(self, callback, replies, one_shot=True): | |
return self._connection._run_async( | |
async_func=self._channel.add_callback, | |
args=(self._connection._get_callback(callback), | |
replies, one_shot), | |
kwargs={}, timeout=None, callback=None, future=None) | |
def add_on_cancel_callback(self, callback): | |
return self._connection._run_async( | |
self._channel.add_on_cancel_callback, | |
args=(self._connection._get_callback(callback),)) | |
def add_on_close_callback(self, callback): | |
return self._connection._run_async( | |
self._channel.add_on_close_callback, | |
args=(self._connection._get_callback(callback),)) | |
def add_on_flow_callback(self, callback): | |
return self._connection._run_async( | |
self._channel.add_on_flow_callback, | |
args=(self._connection._get_callback(callback),)) | |
def add_on_return_callback(self, callback): | |
return self._connection._run_async( | |
self._channel.add_on_return_callback, | |
args=(self._connection._get_callback(callback),)) | |
# ------------------------------------------------------------------------ | |
# Channel operations | |
# ------------------------------------------------------------------------ | |
def basic_ack(self, delivery_tag=0, multiple=False): | |
return self._connection._run_async( | |
self._channel.basic_ack, args=(delivery_tag, multiple)) | |
def basic_cancel(self, callback=None, consumer_tag='', nowait=False): | |
return self._connection._run_async( | |
self._channel.basic_cancel, | |
args=(self._connection._get_callback(callback), | |
consumer_tag, nowait)) | |
def basic_consume(self, consumer_callback, queue='', no_ack=False, | |
exclusive=False, consumer_tag=None, arguments=None): | |
return self._connection._run_async( | |
self._channel.basic_consume, | |
args=(self._connection._get_callback(consumer_callback), | |
queue, no_ack, exclusive, consumer_tag, arguments)) | |
def basic_get(self, callback, queue='', no_ack=False): | |
return self._connection._run_async( | |
self._channel.basic_get, | |
args=(self._connection._get_callback(callback), | |
queue, no_ack)) | |
def basic_nack(self, delivery_tag=None, multiple=False, requeue=True): | |
return self._connection._run_async( | |
self._channel.basic_nack, | |
args=(delivery_tag, multiple, requeue)) | |
def basic_publish(self, exchange, routing_key, body, properties=None, | |
mandatory=False, immediate=False): | |
return self._connection._run_async( | |
self._channel.basic_publish, | |
args=(exchange, routing_key, body, properties, | |
mandatory, immediate)) | |
def basic_qos(self, callback=None, prefetch_size=0, prefetch_count=0, | |
all_channels=False): | |
return self._connection._run_async( | |
self._channel.basic_qos, | |
args=(self._connection._get_callback(callback), | |
prefetch_size, prefetch_count, all_channels)) | |
def basic_reject(self, delivery_tag=None, requeue=True): | |
return self._connection._run_async( | |
self._channel.basic_reject, | |
args=(delivery_tag, requeue)) | |
def basic_recover(self, callback=None, prefetch_size=0, prefetch_count=0, | |
all_channels=False): | |
return self._connection._run_async( | |
self._channel.basic_qos, | |
args=(self._connection._get_callback(callback), | |
prefetch_size, prefetch_count, all_channels)) | |
def close(self, *args, **kwargs): | |
return self._connection._run_async( | |
self._channel.close, | |
args=args, kwargs=kwargs) | |
def confirm_delivery(self, callback=None, nowait=False): | |
return self._connection._run_async( | |
self._channel.confirm_delivery, | |
args=(self._connection._get_callback(callback), nowait)) | |
def open(self): | |
return self._connection._run_async(self._channel.open) | |
# ------------------------------------------------------------------------ | |
# Exchange operations | |
# ------------------------------------------------------------------------ | |
def _cleanup_exchanges(self): | |
self._exchanges = filter(lambda er : er() is not None, self._exchanges) | |
def _get_exchanges(self): | |
return [er() for er in self._exchanges] | |
@tornado.gen.coroutine | |
def exchange(self, exchange=None, exchange_type='direct', | |
passive=False, durable=False, auto_delete=False, internal=False, | |
nowait=False, arguments=None, timeout=None): | |
e = AMQPExchange(self, exchange, exchange_type, passive, durable, | |
auto_delete, internal, nowait, arguments) | |
yield e.declare(timeout) | |
self._cleanup_exchanges() | |
self._exchanges.append(weakref.ref(e)) | |
raise tornado.gen.Return(e) | |
def exchange_bind(self, destination=None, source=None, | |
routing_key='', nowait=False, arguments=None, | |
timeout=None): | |
def _get_response(future, response): | |
future.set_result(response) | |
return self._connection._run_async( | |
self._channel.exchange_bind, | |
args=(destination, source, routing_key, nowait, arguments), | |
cb_func=_get_response, | |
timeout=timeout) | |
def exchange_declare(self, exchange=None, exchange_type='direct', | |
passive=False, durable=False, auto_delete=False, internal=False, | |
nowait=False, arguments=None, timeout=None): | |
def _get_response(future, response): | |
future.set_result(response) | |
return self._connection._run_async( | |
self._channel.exchange_declare, | |
kwargs={ | |
'exchange': exchange, | |
'exchange_type': exchange_type, | |
'passive': passive, | |
'durable': durable, | |
'auto_delete': auto_delete, | |
'internal': internal, | |
'nowait': nowait, | |
'arguments': arguments, | |
}, | |
cb_func=_get_response, | |
timeout=timeout) | |
def exchange_delete(self, exchange=None, if_unused=False, nowait=False, | |
timeout=None): | |
def _get_response(future, response): | |
future.set_result(response) | |
return self._connection._run_async( | |
self._channel.exchange_delete, | |
kwargs={ | |
'exchange': exchange, | |
'if_unused': if_unused, | |
'nowait': nowait, | |
}, | |
cb_func=_get_response, | |
timeout=timeout) | |
def exchange_unbind(self, destination=None, source=None, | |
routing_key='', nowait=False, arguments=None, | |
timeout=None): | |
def _get_response(future, response): | |
future.set_result(response) | |
return self._connection._run_async( | |
self._channel.exchange_unbind, | |
kwargs={ | |
'destination': destination, | |
'source': source, | |
'routing_key': routing_key, | |
'nowait': nowait, | |
'arguments': arguments | |
}, | |
cb_func=_get_response, | |
timeout=timeout) | |
# ------------------------------------------------------------------------ | |
# Flow operations | |
# ------------------------------------------------------------------------ | |
def flow(self, callback, active): | |
return self._connection._run_async( | |
self._channel.flow, | |
args=(self._connection._get_callback(callback), active)) | |
# ------------------------------------------------------------------------ | |
# Queue operations | |
# ------------------------------------------------------------------------ | |
def _cleanup_queues(self): | |
self._queues = filter(lambda qr : qr() is not None, self._queues) | |
def _get_queues(self): | |
return [qr() for qr in self._queues] | |
@tornado.gen.coroutine | |
def queue(self, queue=None, passive=False, durable=False, | |
exclusive=False, auto_delete=False, nowait=False, | |
arguments=None, timeout=None): | |
q = AMQPQueue(self, queue, passive, durable, exclusive, auto_delete, | |
nowait, arguments) | |
yield q.declare(timeout) | |
self._cleanup_queues() | |
self._queues.append(weakref.ref(q)) | |
raise tornado.gen.Return(q) | |
def queue_bind(self, queue, exchange, routing_key=None, | |
nowait=False, arguments=None, timeout=None): | |
def _get_response(future, response): | |
future.set_result(response) | |
return self._connection._run_async( | |
self._channel.queue_bind, | |
kwargs={ | |
'queue': queue, | |
'exchange': exchange, | |
'routing_key': routing_key, | |
'nowait': nowait, | |
'arguments': arguments, | |
}, | |
cb_func=_get_response, | |
timeout=timeout) | |
def queue_declare(self, queue='', passive=False, durable=False, | |
exclusive=False, auto_delete=False, nowait=False, | |
arguments=None, timeout=None): | |
def _get_response(future, response): | |
future.set_result(response) | |
return self._connection._run_async( | |
self._channel.queue_declare, | |
kwargs={ | |
'queue': queue, | |
'passive': passive, | |
'durable': durable, | |
'exclusive': exclusive, | |
'auto_delete': auto_delete, | |
'nowait': nowait, | |
'arguments': arguments, | |
}, | |
cb_func=_get_response, | |
timeout=timeout) | |
def queue_delete(self, queue='', if_unused=False, | |
if_empty=False, nowait=False, timeout=None): | |
def _get_response(future, response): | |
future.set_result(response) | |
return self._connection._run_async( | |
self._channel.queue_delete, | |
kwargs={ | |
'queue': queue, | |
'if_unused': if_unused, | |
'if_empty': if_empty, | |
'nowait': nowait | |
}, | |
cb_func=_get_response, | |
timeout=timeout) | |
def queue_purge(self, queue, nowait=False, timeout=None): | |
def _get_response(future, response): | |
future.set_result(response) | |
return self._connection._run_async( | |
self._channel.queue_purge, | |
kwargs={ | |
'queue': queue, | |
'nowait': nowait | |
}, | |
cb_func=_get_response, | |
timeout=timeout) | |
def queue_unbind(self, queue='', exchange=None, | |
routing_key=None, arguments=None, timeout=None): | |
def _get_response(future, response): | |
future.set_result(response) | |
return self._connection._run_async( | |
self._channel.queue_unbind, | |
kwargs={ | |
'queue': queue, | |
'exchange': exchange, | |
'routing_key': routing_key, | |
'arguments': arguments | |
}, | |
cb_func=_get_response, | |
timeout=timeout) | |
# ------------------------------------------------------------------------ | |
# Transaction operations | |
# ------------------------------------------------------------------------ | |
def tx_commit(self, timeout=None): | |
def _get_response(future, response): | |
future.set_result(response) | |
return self._connection._run_async( | |
self._channel.tx_commit, | |
cb_func=_get_response, | |
timeout=timeout) | |
def tx_rollback(self, timeout=None): | |
def _get_response(future, response): | |
future.set_result(response) | |
return self._connection._run_async( | |
self._channel.tx_rollback, | |
cb_func=_get_response, | |
timeout=timeout) | |
def tx_select(self, timeout=None): | |
def _get_response(future, response): | |
future.set_result(response) | |
return self._connection._run_async( | |
self._channel.tx_select, | |
cb_func=_get_response, | |
timeout=timeout) | |
class AMQPMessageDestination(AMQPObject): | |
''' | |
A AMQPMessageDestination object is a base class for Queues and Exchanges: | |
places you can send messages to. This base class handles the transmission | |
of a message. | |
''' | |
def __init__(self, channel, exchange, routing_key): | |
self._channel = channel | |
self._exchange = exchange | |
self._routing_key = routing_key | |
self._declared = False | |
_connection = property(lambda s : s._channel._connection) | |
_amqp_connection = property(lambda s : s._channel._amqp_connection) | |
exchange = property(lambda s : s._exchange) | |
routing_key = property(lambda s : s._routing_key) | |
@tornado.gen.coroutine | |
def basic_publish(self, body, properties=None, mandatory=False, | |
immediate=False, exchange=None, routing_key=None): | |
if exchange is None: | |
exchange = self.exchange | |
if routing_key is None: | |
routing_key = self.routing_key | |
raise tornado.gen.Return(( | |
yield self._channel.basic_publish( | |
exchange, routing_key, body, properties, mandatory, | |
immediate) | |
)) | |
class AMQPQueue(AMQPMessageDestination): | |
''' | |
A representation of a message queue. | |
''' | |
def __init__(self, channel, queue=None, passive=False, durable=False, | |
exclusive=False, auto_delete=False, nowait=False, arguments=None): | |
super(AMQPQueue, self).__init__(channel, '', None) | |
self._queue = queue | |
self._passive = passive | |
self._durable = durable | |
self._exclusive = exclusive | |
self._auto_delete = auto_delete | |
self._nowait = nowait | |
self._arguments = arguments | |
# Our bindings to instate | |
# Format: | |
# { | |
# exchange_name: { | |
# routing_key: { | |
# args: kwargs for queue_bind, | |
# bound: boolean indicating bind state, | |
# } | |
# }, | |
# } | |
self._bindings = {} | |
# A list of consumers | |
self._consumers = [] | |
# ------------------------------------------------------------------------ | |
# Queue operations | |
# ------------------------------------------------------------------------ | |
@tornado.gen.coroutine | |
def declare(self, timeout=None): | |
log = self._get_log('declare') | |
# Are we declaring passively? | |
if self._passive: | |
# We are, declare using a temporary channel | |
log.debug('Using temporary channel') | |
ch = yield self._channel._connection.channel() | |
else: | |
# We're not, use our own channel | |
log.debug('Using own channel') | |
ch = self._channel | |
# Declare the queue in the channel | |
try: | |
log.debug('Declaring queue') | |
queue_res = yield ch.queue_declare( | |
queue = self._queue or '', | |
passive = self._passive, | |
durable = self._durable, | |
exclusive = self._exclusive, | |
auto_delete = self._auto_delete, | |
nowait = self._nowait, | |
arguments = self._arguments, | |
timeout = timeout) | |
self._routing_key = queue_res.method.queue | |
log.debug('Queue is %s', self._routing_key) | |
except: | |
log.exception('Failed to declare queue (queue=%s)', self._queue) | |
# If we opened a temporary channel, close it | |
if self._passive and ch.is_open: | |
yield ch.close() | |
# If we were bound to things, re-bind | |
for exchange, bindings in self._bindings.copy().iteritems(): | |
for routing_key, state in bindings.copy().iteritems(): | |
yield self.bind( | |
exchange = exchange, | |
routing_key = routing_key, | |
nowait = state['args']['nowait'], | |
arguments = state['args']['arguments'], | |
timeout = timeout) | |
# If we have any consumers, re-start those | |
for c in self._get_consumers(): | |
yield c.consume(timeout) | |
@tornado.gen.coroutine | |
def bind(self, exchange, routing_key=None, | |
nowait=False, arguments=None, timeout=None): | |
log = self._get_log('bind') | |
if isinstance(exchange, AMQPExchange): | |
exchange = exchange.exchange | |
state = { | |
'args': { | |
'nowait': nowait, | |
'arguments': arguments, | |
}, | |
'bound': False, | |
} | |
try: | |
log.debug('Binding to %s:%s', exchange, routing_key) | |
result = yield self._channel.queue_bind( | |
queue = self.routing_key, | |
exchange = exchange or '', | |
routing_key = routing_key, | |
nowait = nowait, | |
arguments = arguments, | |
timeout = timeout) | |
# We should be bound now. | |
state['bound'] = True | |
except: | |
log.debug('Failed to bind to %s:%s', exchange, routing_key) | |
if exchange not in self._bindings: | |
self._bindings[exchange] = {} | |
self._bindings[exchange][routing_key] = state | |
raise tornado.gen.Return(state['bound']) | |
@tornado.gen.coroutine | |
def delete(self, if_unused=False, if_empty=False, nowait=False, | |
timeout=None): | |
log = self._get_log('delete') | |
try: | |
log.debug('Deleting queue %s', self.routing_key) | |
yield self._channel.queue_delete( | |
queue = self.routing_key, | |
if_unused = if_unused, | |
if_empty = if_empty, | |
nowait = nowait, | |
timeout = timeout | |
) | |
# We're deleted | |
self._routing_key = None | |
except: | |
log.exception('Failed to delete queue %s', self.routing_key) | |
raise | |
@tornado.gen.coroutine | |
def purge(self, queue, nowait=False, timeout=None): | |
yield self._channel.queue_purge( | |
queue = self.routing_key, | |
nowait = nowait, | |
timeout = timeout) | |
@tornado.gen.coroutine | |
def unbind(self, exchange=None, routing_key=None, | |
arguments=None, timeout=None): | |
if isinstance(exchange, AMQPExchange): | |
exchange = exchange.exchange | |
yield self._channel.queue_unbind( | |
queue = self.routing_key, | |
exchange = exchange, | |
routing_key = routing_key, | |
arguments = arguments, | |
timeout = timeout) | |
# If we got here, then we're unbound | |
try: | |
del(self._bindings[exchange][routing_key]) | |
if len(self._bindings[exchange]) == 0: | |
del(self._bindings[exchange]) | |
except KeyError: | |
pass | |
def _cleanup_consumers(self): | |
self._consumers = filter(lambda cr : cr() is not None, self._consumers) | |
def _get_consumers(self): | |
return [cr() for cr in self._consumers] | |
@tornado.gen.coroutine | |
def consume(self, consumer_callback, no_ack=False, exclusive=False, | |
consumer_tag=None, arguments=None, channel=None, timeout=None): | |
c = AMQPConsumer(channel or self._channel, consumer_callback, self, | |
no_ack, exclusive, consumer_tag, arguments) | |
yield c.consume(timeout) | |
self._cleanup_consumers() | |
self._consumers.append(weakref.ref(c)) | |
raise tornado.gen.Return(c) | |
class AMQPExchange(AMQPMessageDestination): | |
''' | |
A representation of a message exchange. | |
''' | |
def __init__(self, channel, exchange=None, exchange_type='direct', | |
passive=False, durable=False, auto_delete=False, internal=False, | |
nowait=False, arguments=None): | |
super(AMQPExchange, self).__init__(channel, exchange, None) | |
self._exchange_type = exchange_type | |
self._passive = passive | |
self._durable = durable | |
self._auto_delete = auto_delete | |
self._internal = internal | |
self._nowait = nowait | |
self._arguments = arguments | |
self._declared = False | |
# Our bindings to instate | |
# Format: | |
# { | |
# exchange_name: { | |
# routing_key: { | |
# args: kwargs for queue_bind, | |
# bound: boolean indicating bind state, | |
# } | |
# }, | |
# } | |
self._bindings = {} | |
# ------------------------------------------------------------------------ | |
# Exchange operations | |
# ------------------------------------------------------------------------ | |
@tornado.gen.coroutine | |
def declare(self, timeout=None): | |
log = self._get_log('declare') | |
# Are we declaring passively? | |
if self._passive: | |
# We are, declare using a temporary channel | |
log.debug('Using temporary channel') | |
ch = yield self._channel._connection.channel() | |
else: | |
# We're not, use our own channel | |
log.debug('Using own channel') | |
ch = self._channel | |
# Declare the queue in the channel | |
try: | |
log.debug('Declaring exchange') | |
queue_res = yield ch.exchange_declare( | |
exchange = self._exchange, | |
exchange_type = self._exchange_type, | |
passive = self._passive, | |
durable = self._durable, | |
auto_delete = self._auto_delete, | |
internal = self._internal, | |
nowait = self._nowait, | |
arguments = self._arguments, | |
timeout = timeout) | |
self._declared = True | |
log.debug('Exchange declared') | |
except: | |
log.exception('Failed to declare exchange %s', self._exchange) | |
# If we opened a temporary channel, close it | |
if self._passive and ch.is_open: | |
yield ch.close() | |
# If we were bound to things, re-bind | |
for exchange, bindings in self._bindings.copy().iteritems(): | |
for routing_key, state in bindings.copy().iteritems(): | |
yield self.bind( | |
exchange = exchange, | |
routing_key = routing_key, | |
nowait = state['args']['nowait'], | |
arguments = state['args']['arguments'], | |
timeout = timeout) | |
@tornado.gen.coroutine | |
def bind(self, exchange, routing_key=None, | |
nowait=False, arguments=None, timeout=None): | |
log = self._get_log('bind') | |
if isinstance(exchange, AMQPExchange): | |
exchange = exchange.exchange | |
state = { | |
'args': { | |
'nowait': nowait, | |
'arguments': arguments, | |
}, | |
'bound': False, | |
} | |
try: | |
log.debug('Binding to %s:%s', exchange, routing_key) | |
result = yield self._channel.queue_bind( | |
destination = self.exchange, | |
source = exchange, | |
routing_key = routing_key or '', | |
nowait = nowait, | |
arguments = arguments, | |
timeout = timeout) | |
# We should be bound now. | |
state['bound'] = True | |
except: | |
log.debug('Failed to bind to %s:%s', exchange, routing_key) | |
if exchange not in self._bindings: | |
self._bindings[exchange] = {} | |
self._bindings[exchange][routing_key] = state | |
raise tornado.gen.Return(state['bound']) | |
@tornado.gen.coroutine | |
def delete(self, if_unused=False, nowait=False, timeout=None): | |
log = self._get_log('delete') | |
try: | |
log.debug('Deleting exchange %s', self.exchange) | |
yield self._channel.exchange_delete( | |
exchange = self.exchange, | |
if_unused = if_unused, | |
nowait = nowait, | |
timeout = timeout | |
) | |
self._declared = False | |
except: | |
log.exception('Failed to delete exchange %s', self.exchange) | |
raise | |
@tornado.gen.coroutine | |
def unbind(self, exchange=None, routing_key=None, | |
arguments=None, timeout=None): | |
if isinstance(exchange, AMQPExchange): | |
exchange = exchange.exchange | |
yield self._channel.exchange_unbind( | |
destination = self.exchange, | |
source = exchange, | |
routing_key = routing_key, | |
arguments = arguments, | |
timeout = timeout) | |
# If we got here, then we're unbound | |
try: | |
del(self._bindings[exchange][routing_key]) | |
if len(self._bindings[exchange]) == 0: | |
del(self._bindings[exchange]) | |
except KeyError: | |
pass | |
class AMQPConsumer(AMQPObject): | |
''' | |
An abstraction for a consumer. | |
''' | |
def __init__(self, channel, consumer_callback, queue=None, no_ack=False, | |
exclusive=False, consumer_tag=None, arguments=None): | |
self._channel = channel | |
self._consumer_callback = consumer_callback | |
self._queue = queue | |
self._no_ack = no_ack | |
self._exclusive = exclusive | |
self._consumer_tag_given= consumer_tag | |
self._arguments = arguments | |
self._consumer_tag = None | |
consumer_tag = property(lambda s : s._consumer_tag) | |
@tornado.gen.coroutine | |
def consume(self, timeout=None): | |
log = self._get_log('_consume') | |
try: | |
log.debug('Consuming queue %s', self._queue.routing_key) | |
result = yield self._channel.basic_consume( | |
consumer_callback = self._consumer_callback, | |
queue = self._queue.routing_key, | |
no_ack = self._no_ack, | |
exclusive = self._exclusive, | |
consumer_tag = self._consumer_tag_given, | |
arguments = self._arguments) | |
self._consumer_tag = result | |
log.debug('Consumer tag is %s', self._consumer_tag) | |
except: | |
log.exception('Failed to consume %s', self._queue.routing_key) | |
self._consumer_tag = None | |
raise | |
@tornado.gen.coroutine | |
def cancel(self, nowait=False, arguments=None, timeout=None): | |
log = self._get_log('cancel') | |
try: | |
log.debug('Cancelling consumer of queue %s', self._queue.routing_key) | |
yield self._channel.basic_cancel( | |
consumer_tag = self._consumer_tag_given, | |
nowait = nowait, | |
arguments = arguments) | |
self._consumer_tag = None | |
log.debug('Consumer is cancelled') | |
except: | |
log.exception('Failed to cancel consuming %s', | |
self._queue.routing_key) | |
raise | |
class TimeoutError(Exception): | |
''' | |
Exception raised when an operation times out. | |
''' | |
pass | |
if __name__ == '__main__': | |
# The following is a simple example that tries to | |
# create two channels (transmit and receive), declare an exchange, | |
# bind a new queue to it, then send a message to the exchange. | |
logging.basicConfig( | |
level=logging.DEBUG, | |
format= '%(asctime)s %(name)s[%(filename)s:%(lineno)4d] '\ | |
'%(process)d/%(threadName)s %(levelname)s %(message)s') | |
io_loop = tornado.ioloop.IOLoop.instance() | |
msg_received = tornado.concurrent.Future() | |
@tornado.gen.coroutine | |
def receive(channel, method, properties, body): | |
logging.info( | |
('\n' + ('-'*80) + | |
'\nReceived\n\tchannel=%s,\n\tmethod=%s,'\ | |
'\n\tproperties=%s, body=%s\n' + | |
('-'*80)), | |
channel, method, properties, body) | |
msg_received.set_result(True) | |
raise tornado.gen.Return(None) | |
@tornado.gen.coroutine | |
def on_connect(*args, **kwargs): | |
try: | |
logging.info('Opening a transmit channel') | |
tx_ch = yield amqp_conn.channel() | |
logging.debug('tx_ch = %s', tx_ch) | |
logging.info('Opening a receive channel') | |
rx_ch = yield amqp_conn.channel() | |
logging.info('Declaring a test exchange on the receive channel') | |
ex = yield rx_ch.exchange_declare(exchange='test', | |
exchange_type='fanout') | |
logging.info('Declaring a receive queue on the receive channel') | |
rx_q = yield rx_ch.queue_declare(exclusive=True) | |
logging.info('Binding the queue to the exchange') | |
bind = yield rx_ch.queue_bind( queue=rx_q.method.queue, | |
exchange='test', routing_key='#') | |
logging.info('Beginning consumation of queue') | |
consume = yield rx_ch.basic_consume( | |
consumer_callback = receive, | |
queue = rx_q.method.queue, | |
no_ack=True) | |
logging.info('Transmitting a test message') | |
tx = yield tx_ch.basic_publish( | |
exchange = 'test', | |
routing_key = 'test', | |
body = 'this is a test') | |
logging.info('Waiting to receive something') | |
yield msg_received | |
logging.info('Disconnecting') | |
yield amqp_conn.close() | |
logging.info('Stopping') | |
except: | |
logging.exception('on_connect failed') | |
io_loop.stop() | |
raise tornado.gen.Return(None) | |
amqp_conn = AMQPConnection( | |
pika.URLParameters('amqp://guest:guest@localhost:5672/%2f'), | |
io_loop=io_loop, | |
on_open_callback=on_connect) | |
io_loop.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hello! Found this code useful. Could you please tell if this code is production ready?