-
-
Save desertkun/e6f5ccbceb3bc39d1bc4f824b1b93dba to your computer and use it in GitHub Desktop.
Tornado Coroutine interface to AMQP (pika)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import datetime | |
import logging | |
import threading | |
import weakref | |
from tornado.gen import coroutine, Return | |
import tornado | |
import tornado.ioloop | |
import tornado.concurrent | |
import pika | |
""" | |
Pika-Tornado: a Tornado coroutine-based abstraction layer for Pika. | |
""" | |
# (C) 2015 Stuart Longland <[email protected]> | |
# Released under the terms of the Mozilla Public License v2.0 | |
class AMQPObject(object): | |
@classmethod | |
def _get_log(cls, *name): | |
return logging.getLogger('.'.join( | |
(cls.__module__, cls.__name__) + name)) | |
# noinspection PyProtectedMember,PyPropertyDefinition | |
class AMQPConnection(AMQPObject): | |
""" | |
Connection to AMQP, this object handles unexpected disconnections (and | |
re-connects accordingly) and wraps the channel opening process in a | |
coroutine-friendly manner. | |
Support for multiple IOLoops is provided and thread-safety is attempted. | |
Callbacks may be specified in a number of ways: | |
- as a reference to the function directly | |
- as a tuple: | |
- first element gives the IOLoop (or None for the client's IOLoop) | |
- second element gives the reference to the function | |
- third element if present gives the initial args | |
- forth element if present gives the initial kwargs | |
""" | |
CONNECTION_CLOSED = pika.connection.Connection.CONNECTION_CLOSED | |
CONNECTION_INIT = pika.connection.Connection.CONNECTION_INIT | |
CONNECTION_PROTOCOL = pika.connection.Connection.CONNECTION_PROTOCOL | |
CONNECTION_START = pika.connection.Connection.CONNECTION_START | |
CONNECTION_TUNE = pika.connection.Connection.CONNECTION_TUNE | |
CONNECTION_OPEN = pika.connection.Connection.CONNECTION_OPEN | |
CONNECTION_CLOSING = pika.connection.Connection.CONNECTION_CLOSING | |
def __init__(self, parameters, on_open_callback=None, | |
on_open_error_callback=None, on_close_callback=None, | |
on_giveup_callback=None, reconnect_delay=5.0, reconnect_max=-1, | |
io_loop=None, io_thread=None): | |
""" | |
Initialise a connection to AMQP. | |
:param: pika.connection.Parameters parameters: | |
Connection parameters, see Pika documentation. | |
:param method on_open_callback: | |
Called when the connection is opened. | |
:param method on_open_error_callback: | |
Called if the connection can't be opened. | |
:param method on_close_callback: | |
Called when the connection is closed. | |
:param method on_giveup_callback: | |
Called when the connection attempts have been exhausted | |
and this library "gives up". This might be used to log | |
an alarm and shut down the daemon. | |
:param float reconnect_delay: | |
The amount of time to wait before attempting to reconnect. | |
Set to 0 to disable reconnect. | |
:param int reconnect_max: | |
The maximum number of reconnection attempts. Set to 0 to | |
permit unlimited reconnections. | |
:param tornado.ioloop.IOLoop io_loop: | |
IOLoop instance to use for the AMQP client itself. By default, | |
the current IOLoop is used. | |
:param threading.Thread io_thread: | |
Thread used for AMQP communications. Since Pika itself is not | |
thread-safe, it is imperative that all AMQP operations take place | |
in the same thread as the AMQP IOLoop instance. | |
If this is different to the thread the constructor is running in | |
for whatever reason, the reference to that thread can be given | |
here. | |
""" | |
if io_loop is None: | |
io_loop = tornado.ioloop.IOLoop.current() | |
if io_thread is None: | |
io_thread = threading.current_thread() | |
self._io_loop = io_loop | |
self._io_thread = io_thread | |
self._reconnect_delay = datetime.timedelta(seconds=reconnect_delay) | |
self._reconnect_max = reconnect_max | |
self._reconnect_rem = reconnect_max | |
self._reconnect = reconnect_delay > 0 | |
self._giveup = False | |
self._on_open_cb = [] | |
self._on_open_error_cb = [] | |
self._on_close_cb = [] | |
self._on_giveup_cb = [] | |
if on_open_callback is not None: | |
self._on_open_cb.append( | |
self._get_callback(on_open_callback)) | |
if on_open_error_callback is not None: | |
self._on_open_error_cb.append( | |
self._get_callback(on_open_error_callback)) | |
if on_close_callback is not None: | |
self._on_close_cb.append( | |
self._get_callback(on_close_callback)) | |
if on_giveup_callback is not None: | |
self._on_giveup_cb.append( | |
self._get_callback(on_giveup_callback)) | |
self._amqp_parameters = parameters | |
self._amqp_connection = None | |
self._channels = [] | |
self._replace_connection() | |
# ------------------------------------------------------------------------ | |
# Connection state | |
# ------------------------------------------------------------------------ | |
connection_state = property(lambda s: s._amqp_connection.connection_state) | |
is_closed = property(lambda s: s._amqp_connection.is_closed) | |
is_closing = property(lambda s: s._amqp_connection.is_closing) | |
is_open = property(lambda s: s._amqp_connection.is_open) | |
# ------------------------------------------------------------------------ | |
# Connection management | |
# ------------------------------------------------------------------------ | |
# noinspection PyBroadException | |
def _replace_connection(self): | |
log = self._get_log('_replace_connection') | |
try: | |
log.debug('Gathering a list of existing channels') | |
# Grab a list of channels | |
channels = self._get_channels() | |
log.debug('We have %s channels to reopen. Opening ' | |
'connection to AMQP with parameters %s.', | |
len(channels), self._amqp_parameters) | |
# Replace the connection | |
self._amqp_connection = pika.adapters.TornadoConnection( | |
parameters=self._amqp_parameters, | |
on_open_callback=self._handle_on_open_callback, | |
on_open_error_callback=self._handle_on_open_error_callback, | |
on_close_callback=self._handle_on_close_callback, | |
custom_ioloop=self._io_loop) | |
log.debug('AMQP connection started') | |
# Mark these channels as stale | |
for ch in channels: | |
try: | |
log.debug('Re-opening %s', ch) | |
ch._mark_stale() | |
except: | |
pass | |
except: | |
log.exception('Failed to replace connection') | |
def connect(self, timeout=None): | |
""" | |
Connect to AMQP. | |
""" | |
return self._run_async( | |
self._amqp_connection.connect, | |
args=(), | |
kwargs={}, | |
timeout=timeout) | |
def close(self, *args, **kwargs): | |
""" | |
Disconnect from AMQP. This disables reconnections. | |
See pika.BaseConnection.close for details on arguments. | |
""" | |
timeout = kwargs.pop('timeout', None) | |
self._giveup = False | |
self._reconnect = False | |
return self._run_async( | |
async_func=self._amqp_connection.close, | |
args=args, | |
kwargs=kwargs, | |
timeout=timeout, | |
cb_func=None, | |
future=None) | |
def set_backpressure_multiplier(self, *args, **kwargs): | |
return self._run_async( | |
async_func=self._amqp_connection.set_backpressure_multiplier, | |
args=args, | |
kwargs=kwargs, | |
cb_func=None, | |
future=None) | |
# ------------------------------------------------------------------------ | |
# Connection callbacks | |
# ------------------------------------------------------------------------ | |
def add_backpressure_callback(self, callback): | |
return self._run_async( | |
self._amqp_connection.add_backpressure_callback, | |
args=(self._get_callback(callback),)) | |
def add_on_close_callback(self, callback): | |
return self._run_async( | |
self._amqp_connection.add_on_close_callback, | |
args=(self._get_callback(callback),)) | |
def add_on_open_callback(self, callback): | |
return self._run_async( | |
self._amqp_connection.add_on_open_callback, | |
args=(self._get_callback(callback),)) | |
def add_on_open_error_callback(self, callback): | |
return self._run_async( | |
self._amqp_connection.add_on_open_error_callback, | |
args=(self._get_callback(callback),)) | |
# ------------------------------------------------------------------------ | |
# Channel management | |
# ------------------------------------------------------------------------ | |
def _cleanup_channels(self): | |
self._channels = filter(lambda cr: cr() is not None, self._channels) | |
def _get_channels(self): | |
self._cleanup_channels() | |
return [ch() for ch in self._channels] | |
@coroutine | |
def channel(self, channel_number=None, timeout=None): | |
""" | |
Open a new channel on the AMQP connection. | |
""" | |
ch = AMQPChannel(self, channel_number) | |
yield ch._init_channel(timeout) | |
self._cleanup_channels() | |
self._channels.append(weakref.ref(ch)) | |
raise Return(ch) | |
# ------------------------------------------------------------------------ | |
# Connection handling callbacks | |
# ------------------------------------------------------------------------ | |
def _handle_on_open_callback(self, *args, **kwargs): | |
""" | |
What to do when the connection is opened? | |
""" | |
log = self._get_log('_handle_on_open_callback') | |
try: | |
log.info('Connection open') | |
self._reconnect = self._reconnect_delay.total_seconds() > 0 | |
self._reconnect_rem = self._reconnect_max | |
self._giveup = False | |
# Grab a list of channels | |
channels = self._get_channels() | |
# Re-open those channels | |
for ch in channels: | |
log.debug('Re-opening %s', ch) | |
self._io_loop.add_callback(ch._init_channel) | |
log.debug('Calling open callbacks') | |
for cb in self._on_open_cb: | |
self._io_loop.add_callback(cb, *args, **kwargs) | |
except: | |
log.exception('Failed to handle connection open') | |
# noinspection PyBroadException | |
def _handle_on_close_callback(self, *args, **kwargs): | |
""" | |
What to do when the connection is closed? | |
""" | |
log = self._get_log('_handle_on_close_callback') | |
try: | |
log.info('Connection closed') | |
for cb in self._on_close_cb: | |
try: | |
cb(*args, **kwargs) | |
except: | |
log.exception('Exception in callback %s(*%s, **%s)', | |
cb, args, kwargs) | |
except: | |
log.exception('Exception in close callback') | |
self._do_reconnect() | |
# noinspection PyBroadException | |
def _handle_on_open_error_callback(self, connection, error, | |
*args, **kwargs): | |
""" | |
What to do if we fail to open the connection? | |
""" | |
log = self._get_log('_handle_on_open_error_callback') | |
try: | |
log.error('Received %s whilst attempting connection', error) | |
# Pass it through to the callback handler we were given | |
if self._on_open_error_cb is not None: | |
for cb in self._on_open_error_cb: | |
try: | |
cb(connection, error, *args, **kwargs) | |
except: | |
log.exception('Error in callback %s(*%s, **%s)', | |
cb, args, kwargs) | |
except: | |
log.exception('Failed in open-error callback') | |
self._do_reconnect() | |
# noinspection PyBroadException | |
def _do_reconnect(self): | |
""" | |
Perform a re-connection, if enabled. | |
""" | |
log = self._get_log('_reconnect') | |
try: | |
# Sanity check, ensure we're running in the correct thread for this. | |
# add_timeout is *NOT* thread-safe. | |
if not self._is_own_thread: | |
# This should do the trick! | |
self._io_loop.add_callback(self._do_reconnect) | |
return | |
# Reconnect? | |
if self._reconnect: | |
if self._reconnect_max > 0: | |
self._reconnect_rem -= 1 | |
self._reconnect = self._reconnect_rem > 0 | |
self._giveup = not self._reconnect | |
if self._giveup: | |
log.error('This is our last connection attempt!') | |
# Schedule a reconnect. We do this by completely replacing the | |
# AMQP connection object. | |
self._io_loop.add_timeout( | |
self._reconnect_delay, self._replace_connection) | |
log.error('Re-connecting in %s.', | |
self._reconnect_delay) | |
elif self._giveup: | |
log.fatal('Giving up!') | |
for cb in self._on_giveup_cb: | |
try: | |
cb() | |
except: | |
log.exception('Exception in callback %s', cb) | |
except: | |
log.exception('Failure in reconnection') | |
# ------------------------------------------------------------------------ | |
# Callback and thread-safety utilities | |
# ------------------------------------------------------------------------ | |
@property | |
def _is_own_thread(self): | |
""" | |
Return True if we're running in our own thread. | |
""" | |
return self._io_thread is threading.current_thread() | |
def _schedule_timeout(self, timeout, future, cb_func=None, | |
cb_args=None, cb_kwargs=None): | |
""" | |
Place a TimeoutException into the future if the future is not | |
completed in time. | |
""" | |
assert self._is_own_thread, 'This isn\'t my thread!' | |
if cb_args is None: | |
cb_args = () | |
if cb_kwargs is None: | |
cb_kwargs = {} | |
def _timeout(): | |
if not future.done(): | |
future.set_exception(TimeoutError()) | |
if cb_func is not None: | |
cb_func(*cb_args, **cb_kwargs) | |
return self._io_loop.add_timeout(timeout, _timeout) | |
@coroutine | |
def _run_async(self, async_func, args=None, kwargs=None, timeout=None, | |
cb_func=None, callback_name='callback', future=None): | |
""" | |
Run a method in our own thread and return its result. | |
args and kwargs are passed to async_func, which will be run in the | |
AMQP IOLoop. | |
timeout specifies a time limit on the operation, and will result in a | |
TimeoutError if the operation does not complete. | |
callback, if given, causes a callback keyword argument to be passed to | |
async_func which it should run when the result comes back. | |
return_in_future specifies | |
""" | |
log = self._get_log('_run_async') | |
try: | |
# log.debug('Run %s\n\twith args %s\n\tkwargs %s'\ | |
# '\n\ttimeout %s\n\tstore result in future %s'\ | |
# '\n\tand using callback %s (named %s)', | |
# async_func, args, kwargs, timeout, future, | |
# cb_func, callback_name) | |
if args is None: | |
args = () | |
if kwargs is None: | |
kwargs = {} | |
timeout_ref = None | |
return_result = future is None | |
if self._is_own_thread: | |
# log.debug('We are running in our own thread') | |
# We're in our own thread. Is there a callback involved? | |
if cb_func is None: | |
# There isn't. This is easy then, just call the function and | |
# return the result. Since the function is synchronous, we | |
# ignore the timeout. | |
if return_result: | |
# log.debug('Running directly') | |
result = async_func(*args, **kwargs) | |
raise Return(result) | |
else: | |
try: | |
result = async_func(*args, **kwargs) | |
future.set_result(result) | |
except Exception, e: | |
log.exception('Exception running %s', | |
async_func) | |
future.set_exception(e) | |
else: | |
# log.debug('Running via callback') | |
# The result (or an exception) will be placed in this future. | |
future = future or tornado.concurrent.Future() | |
# Create a timeout object if we're given a time limit. | |
if timeout is not None: | |
timeout_ref = self._schedule_timeout(timeout, future) | |
# The function takes a callback argument which provides the | |
# result we're after. We've been given the function that will | |
# rat through what was given to us and will provide the result. | |
# The first argument given to the callback is the future. | |
cb_func = self._get_callback(cb_func) | |
# noinspection PyShadowingNames | |
def _callback(*args, **kwargs): | |
log = self._get_log('_run_async._callback') | |
# log.debug('Got response args=%s, kwargs=%s', | |
# args, kwargs) | |
try: | |
cb_func(future, *args, **kwargs) | |
except TimeoutError: | |
timeout_ref = None | |
except Exception, e: | |
if not future.done(): | |
future.set_exception(e) | |
else: | |
log.exception('Exception in callback') | |
kwargs = kwargs.copy() | |
kwargs[callback_name] = _callback | |
# Perform the action given... | |
# log.debug('Performing asynchronous operation') | |
try: | |
# log.debug('Executing %s(*%s, **%s)', | |
# async_func, args, kwargs) | |
async_func(*args, **kwargs) | |
except Exception, e: | |
if not return_result: | |
future.set_exception(e) | |
else: | |
raise | |
if timeout_ref is not None: | |
self._io_loop.remove_timeout(timeout_ref) | |
else: | |
assert self._io_thread.is_alive(), 'IO Thread is dead' | |
# log.debug('Scheduling to run in our own IOLoop (we are '\ | |
# 'in a foreign thread; current=%s vs ours=%s)', | |
# threading.current_thread(), self._io_thread) | |
# We're in the wrong thread, so schedule this to take place in | |
# our own IOLoop. First, create a future to store this in. | |
future = future or tornado.concurrent.Future() | |
# Then call ourselves with the new future. | |
self._io_loop.add_callback( | |
self._run_async, | |
async_func=async_func, | |
args=args, | |
kwargs=kwargs, | |
timeout=timeout, | |
cb_func=cb_func, | |
callback_name=callback_name, | |
future=future) | |
# log.debug('Return result? %s', return_result) | |
if return_result: | |
# Wait for the result | |
# log.debug('Waiting for result') | |
result = yield future | |
if timeout_ref is not None: | |
self._io_loop.remove_timeout(timeout_ref) | |
# log.debug('Result = %s', result) | |
# Return the result | |
raise Return(result) | |
else: | |
# Return nothing to prevent the IOLoop from waiting. | |
raise Return(None) | |
except Return: | |
raise | |
except: | |
log.exception('Failed to execute') | |
raise | |
def _get_callback(self, callback_ref, callback_args=None, | |
callback_kwargs=None): | |
""" | |
Return a callback function from the given reference. | |
""" | |
# log = self._get_log('_get_callback') | |
if callback_ref is None: | |
return None | |
if callback_args is None: | |
callback_args = () | |
if callback_kwargs is None: | |
callback_kwargs = {} | |
if isinstance(callback_ref, tuple): | |
(callback_ioloop, callback_func) = callback_ref[0:2] | |
if len(callback_ref) > 2: | |
callback_args = callback_args + (callback_ref[2] or ()) | |
if len(callback_ref) > 3: | |
callback_kwargs.update(callback_ref[3] or {}) | |
else: | |
callback_ioloop = None | |
callback_func = callback_ref | |
if callback_ioloop is None: | |
callback_ioloop = self._io_loop | |
# noinspection PyBroadException | |
def _callback(*args, **kwargs): | |
log = self._get_log('_get_callback', '_callback') | |
# log.debug('Called with\n\targs=%s,\n\tkwargs=%s', args, kwargs) | |
# TODO: why is this happening? | |
if (len(args) == 1) and isinstance(args[0], tuple): | |
args = args[0] | |
try: | |
cb_args = callback_args + args | |
cb_kwargs = callback_kwargs.copy() | |
cb_kwargs.update(kwargs) | |
if self._io_loop is callback_ioloop: | |
# We're running this ourselves | |
# log.debug('Calling function in own IOLoop') | |
cb_res = callback_func(*args, **kwargs) | |
if isinstance(cb_res, tornado.concurrent.Future): | |
# noinspection PyBroadException | |
@coroutine | |
def _get_result(future): | |
# log.debug('Getting result') | |
try: | |
yield future | |
# log.debug('Callback result: %s', result) | |
except: | |
log.exception('Callback failed') | |
self._io_loop.add_future(cb_res, _get_result) | |
# else: | |
# log.debug('Callback result: %s', cb_res) | |
else: | |
# log.debug('Scheduling function in external IOLoop') | |
callback_ioloop.add_callback( | |
callback_func, *cb_args, **cb_kwargs) | |
# log.debug('Callback scheduled') | |
except: | |
log.exception('Callback failed: %s', callback_func) | |
# log.debug('Wrapping function %s in callback %s', | |
# callback_func, _callback) | |
return _callback | |
# noinspection PyProtectedMember,PyPropertyDefinition | |
class AMQPChannel(AMQPObject): | |
""" | |
An abstraction for the AMQP Channel. | |
""" | |
def __init__(self, connection, channel_number): | |
""" | |
Initialise a new channel object. | |
""" | |
log = self._get_log('__init__') | |
log.debug('New channel') | |
self._connection = connection | |
self._channel_number = channel_number | |
self._channel = None | |
# Queues and exchanges to re-establish | |
self._exchanges = [] | |
self._queues = [] | |
_amqp_connection = property(lambda s: s._connection._amqp_connection) | |
def _mark_stale(self): | |
self._get_log('_mark_stale').debug( | |
'Channel is now stale') | |
self._channel = None | |
# noinspection PyBroadException | |
@coroutine | |
def _init_channel(self, timeout=None): | |
log = self._get_log('channel') | |
def _callback(future, ch): | |
log.debug('Opened new channel %s', ch) | |
if not future.done(): | |
future.set_result(ch) | |
else: | |
log.error('Future already done, cannot return channel') | |
ch.close() | |
log.debug('Opening new channel (number=%s)', self._channel_number) | |
try: | |
channel = yield self._connection._run_async( | |
async_func=self._amqp_connection.channel, | |
args=None, | |
kwargs={'channel_number': self._channel_number}, | |
timeout=timeout, | |
cb_func=_callback, | |
callback_name='on_open_callback', | |
future=None) | |
except: | |
log.exception('Failed to open channel') | |
raise | |
self._channel = channel | |
# If we had any exchanges, declare those now | |
for ex in self._get_exchanges(): | |
try: | |
log.debug('Declaring exchange %s', ex.exchange) | |
yield ex.declare(timeout) | |
except: | |
log.exception('Failed to declare exchange %s', ex.exchange) | |
# If we had any queues, declare those now | |
for q in self._get_queues(): | |
try: | |
log.debug('Declaring queue %s', q.routing_key) | |
yield q.declare(timeout) | |
except: | |
log.exception('Failed to declare exchange %s', q.routing_key) | |
# ------------------------------------------------------------------------ | |
# Channel status | |
# ------------------------------------------------------------------------ | |
is_closed = property(lambda s: s._channel.is_closed) | |
is_closing = property(lambda s: s._channel.is_closing) | |
is_open = property(lambda s: s._channel.is_open) | |
# ------------------------------------------------------------------------ | |
# Channel callbacks | |
# ------------------------------------------------------------------------ | |
def add_callback(self, callback, replies, one_shot=True): | |
return self._connection._run_async( | |
async_func=self._channel.add_callback, | |
args=(self._connection._get_callback(callback), | |
replies, one_shot), | |
kwargs={}, timeout=None, callback=None, future=None) | |
def add_on_cancel_callback(self, callback): | |
return self._connection._run_async( | |
self._channel.add_on_cancel_callback, | |
args=(self._connection._get_callback(callback),)) | |
def add_on_close_callback(self, callback): | |
return self._connection._run_async( | |
self._channel.add_on_close_callback, | |
args=(self._connection._get_callback(callback),)) | |
def add_on_flow_callback(self, callback): | |
return self._connection._run_async( | |
self._channel.add_on_flow_callback, | |
args=(self._connection._get_callback(callback),)) | |
def add_on_return_callback(self, callback): | |
return self._connection._run_async( | |
self._channel.add_on_return_callback, | |
args=(self._connection._get_callback(callback),)) | |
# ------------------------------------------------------------------------ | |
# Channel operations | |
# ------------------------------------------------------------------------ | |
def basic_ack(self, delivery_tag=0, multiple=False): | |
return self._connection._run_async( | |
self._channel.basic_ack, args=(delivery_tag, multiple)) | |
def basic_cancel(self, callback=None, consumer_tag='', nowait=False): | |
return self._connection._run_async( | |
self._channel.basic_cancel, | |
args=(self._connection._get_callback(callback), | |
consumer_tag, nowait)) | |
def basic_consume(self, consumer_callback, queue='', no_ack=False, | |
exclusive=False, consumer_tag=None, arguments=None): | |
return self._connection._run_async( | |
self._channel.basic_consume, | |
args=(self._connection._get_callback(consumer_callback), | |
queue, no_ack, exclusive, consumer_tag, arguments)) | |
def basic_get(self, callback, queue='', no_ack=False): | |
return self._connection._run_async( | |
self._channel.basic_get, | |
args=(self._connection._get_callback(callback), | |
queue, no_ack)) | |
def basic_nack(self, delivery_tag=None, multiple=False, requeue=True): | |
return self._connection._run_async( | |
self._channel.basic_nack, | |
args=(delivery_tag, multiple, requeue)) | |
def basic_publish(self, exchange, routing_key, body, properties=None, | |
mandatory=False, immediate=False): | |
return self._connection._run_async( | |
self._channel.basic_publish, | |
args=(exchange, routing_key, body, properties, | |
mandatory, immediate)) | |
def basic_qos(self, callback=None, prefetch_size=0, prefetch_count=0, | |
all_channels=False): | |
return self._connection._run_async( | |
self._channel.basic_qos, | |
args=(self._connection._get_callback(callback), | |
prefetch_size, prefetch_count, all_channels)) | |
def basic_reject(self, delivery_tag=None, requeue=True): | |
return self._connection._run_async( | |
self._channel.basic_reject, | |
args=(delivery_tag, requeue)) | |
def basic_recover(self, callback=None, prefetch_size=0, prefetch_count=0, | |
all_channels=False): | |
return self._connection._run_async( | |
self._channel.basic_qos, | |
args=(self._connection._get_callback(callback), | |
prefetch_size, prefetch_count, all_channels)) | |
def close(self, *args, **kwargs): | |
return self._connection._run_async( | |
self._channel.close, | |
args=args, kwargs=kwargs) | |
def confirm_delivery(self, callback=None, nowait=False): | |
return self._connection._run_async( | |
self._channel.confirm_delivery, | |
args=(self._connection._get_callback(callback), nowait)) | |
def open(self): | |
return self._connection._run_async(self._channel.open) | |
# ------------------------------------------------------------------------ | |
# Exchange operations | |
# ------------------------------------------------------------------------ | |
def _cleanup_exchanges(self): | |
self._exchanges = filter(lambda er: er() is not None, self._exchanges) | |
def _get_exchanges(self): | |
return [er() for er in self._exchanges] | |
@coroutine | |
def exchange(self, exchange=None, exchange_type='direct', | |
passive=False, durable=False, auto_delete=False, internal=False, | |
nowait=False, arguments=None, timeout=None): | |
""" | |
Creates a new instance of exchange, and declares it. | |
WARNING: Make sure you stored this result (e) in your code, otherwise it might be gc'ted, | |
since it has a weak reference. | |
""" | |
e = AMQPExchange(self, exchange, exchange_type, passive, durable, | |
auto_delete, internal, nowait, arguments) | |
yield e.declare(timeout) | |
self._cleanup_exchanges() | |
self._exchanges.append(weakref.ref(e)) | |
raise Return(e) | |
def exchange_bind(self, destination=None, source=None, | |
routing_key='', nowait=False, arguments=None, | |
timeout=None): | |
def _get_response(future, response): | |
future.set_result(response) | |
return self._connection._run_async( | |
self._channel.exchange_bind, | |
args=(destination, source, routing_key, nowait, arguments), | |
cb_func=_get_response, | |
timeout=timeout) | |
def exchange_declare(self, exchange=None, exchange_type='direct', | |
passive=False, durable=False, auto_delete=False, internal=False, | |
nowait=False, arguments=None, timeout=None): | |
def _get_response(future, response): | |
future.set_result(response) | |
return self._connection._run_async( | |
self._channel.exchange_declare, | |
kwargs={ | |
'exchange': exchange, | |
'exchange_type': exchange_type, | |
'passive': passive, | |
'durable': durable, | |
'auto_delete': auto_delete, | |
'internal': internal, | |
'nowait': nowait, | |
'arguments': arguments, | |
}, | |
cb_func=_get_response, | |
timeout=timeout) | |
def exchange_delete(self, exchange=None, if_unused=False, nowait=False, | |
timeout=None): | |
def _get_response(future, response): | |
future.set_result(response) | |
return self._connection._run_async( | |
self._channel.exchange_delete, | |
kwargs={ | |
'exchange': exchange, | |
'if_unused': if_unused, | |
'nowait': nowait, | |
}, | |
cb_func=_get_response, | |
timeout=timeout) | |
def exchange_unbind(self, destination=None, source=None, | |
routing_key='', nowait=False, arguments=None, | |
timeout=None): | |
def _get_response(future, response): | |
future.set_result(response) | |
return self._connection._run_async( | |
self._channel.exchange_unbind, | |
kwargs={ | |
'destination': destination, | |
'source': source, | |
'routing_key': routing_key, | |
'nowait': nowait, | |
'arguments': arguments | |
}, | |
cb_func=_get_response, | |
timeout=timeout) | |
# ------------------------------------------------------------------------ | |
# Flow operations | |
# ------------------------------------------------------------------------ | |
def flow(self, callback, active): | |
return self._connection._run_async( | |
self._channel.flow, | |
args=(self._connection._get_callback(callback), active)) | |
# ------------------------------------------------------------------------ | |
# Queue operations | |
# ------------------------------------------------------------------------ | |
def _cleanup_queues(self): | |
self._queues = filter(lambda qr: qr() is not None, self._queues) | |
def _get_queues(self): | |
return [qr() for qr in self._queues] | |
@coroutine | |
def queue(self, queue=None, passive=False, durable=False, | |
exclusive=False, auto_delete=False, nowait=False, | |
arguments=None, timeout=None): | |
""" | |
Creates a new instance of queue, and declares it. | |
WARNING: Make sure you stored this result (q) in your code, otherwise it might be gc'ted, | |
since it has a weak reference. | |
""" | |
q = AMQPQueue(self, queue, passive, durable, exclusive, auto_delete, | |
nowait, arguments) | |
yield q.declare(timeout) | |
self._cleanup_queues() | |
self._queues.append(weakref.ref(q)) | |
raise Return(q) | |
def queue_bind(self, queue, exchange, routing_key=None, | |
nowait=False, arguments=None, timeout=None): | |
def _get_response(future, response): | |
future.set_result(response) | |
return self._connection._run_async( | |
self._channel.queue_bind, | |
kwargs={ | |
'queue': queue, | |
'exchange': exchange, | |
'routing_key': routing_key, | |
'nowait': nowait, | |
'arguments': arguments, | |
}, | |
cb_func=_get_response, | |
timeout=timeout) | |
def queue_declare(self, queue='', passive=False, durable=False, | |
exclusive=False, auto_delete=False, nowait=False, | |
arguments=None, timeout=None): | |
def _get_response(future, response): | |
future.set_result(response) | |
return self._connection._run_async( | |
self._channel.queue_declare, | |
kwargs={ | |
'queue': queue, | |
'passive': passive, | |
'durable': durable, | |
'exclusive': exclusive, | |
'auto_delete': auto_delete, | |
'nowait': nowait, | |
'arguments': arguments, | |
}, | |
cb_func=_get_response, | |
timeout=timeout) | |
def queue_delete(self, queue='', if_unused=False, | |
if_empty=False, nowait=False, timeout=None): | |
def _get_response(future, response): | |
future.set_result(response) | |
return self._connection._run_async( | |
self._channel.queue_delete, | |
kwargs={ | |
'queue': queue, | |
'if_unused': if_unused, | |
'if_empty': if_empty, | |
'nowait': nowait | |
}, | |
cb_func=_get_response, | |
timeout=timeout) | |
def queue_purge(self, queue, nowait=False, timeout=None): | |
def _get_response(future, response): | |
future.set_result(response) | |
return self._connection._run_async( | |
self._channel.queue_purge, | |
kwargs={ | |
'queue': queue, | |
'nowait': nowait | |
}, | |
cb_func=_get_response, | |
timeout=timeout) | |
def queue_unbind(self, queue='', exchange=None, | |
routing_key=None, arguments=None, timeout=None): | |
def _get_response(future, response): | |
future.set_result(response) | |
return self._connection._run_async( | |
self._channel.queue_unbind, | |
kwargs={ | |
'queue': queue, | |
'exchange': exchange, | |
'routing_key': routing_key, | |
'arguments': arguments | |
}, | |
cb_func=_get_response, | |
timeout=timeout) | |
# ------------------------------------------------------------------------ | |
# Transaction operations | |
# ------------------------------------------------------------------------ | |
def tx_commit(self, timeout=None): | |
def _get_response(future, response): | |
future.set_result(response) | |
return self._connection._run_async( | |
self._channel.tx_commit, | |
cb_func=_get_response, | |
timeout=timeout) | |
def tx_rollback(self, timeout=None): | |
def _get_response(future, response): | |
future.set_result(response) | |
return self._connection._run_async( | |
self._channel.tx_rollback, | |
cb_func=_get_response, | |
timeout=timeout) | |
def tx_select(self, timeout=None): | |
def _get_response(future, response): | |
future.set_result(response) | |
return self._connection._run_async( | |
self._channel.tx_select, | |
cb_func=_get_response, | |
timeout=timeout) | |
# noinspection PyProtectedMember,PyPropertyDefinition | |
class AMQPMessageDestination(AMQPObject): | |
""" | |
A AMQPMessageDestination object is a base class for Queues and Exchanges: | |
places you can send messages to. This base class handles the transmission | |
of a message. | |
""" | |
def __init__(self, channel, exchange, routing_key): | |
self._channel = channel | |
self._exchange = exchange | |
self._routing_key = routing_key | |
self._declared = False | |
_connection = property(lambda s: s._channel._connection) | |
_amqp_connection = property(lambda s: s._channel._amqp_connection) | |
exchange = property(lambda s: s._exchange) | |
routing_key = property(lambda s: s._routing_key) | |
@coroutine | |
def basic_publish(self, body, properties=None, mandatory=False, | |
immediate=False, exchange=None, routing_key=None): | |
if exchange is None: | |
exchange = self.exchange | |
if routing_key is None: | |
routing_key = self.routing_key | |
raise Return(( | |
yield self._channel.basic_publish( | |
exchange, routing_key, body, properties, mandatory, | |
immediate) | |
)) | |
# noinspection PyProtectedMember | |
class AMQPQueue(AMQPMessageDestination): | |
""" | |
A representation of a message queue. | |
Please use yield channel.queue() instead of constructing queue directly. | |
In case of reconnection this queue may be redeclared. | |
""" | |
def __init__(self, channel, queue=None, passive=False, durable=False, | |
exclusive=False, auto_delete=False, nowait=False, arguments=None): | |
super(AMQPQueue, self).__init__(channel, '', None) | |
self._queue = queue | |
self._passive = passive | |
self._durable = durable | |
self._exclusive = exclusive | |
self._auto_delete = auto_delete | |
self._nowait = nowait | |
self._arguments = arguments | |
# Our bindings to instate | |
# Format: | |
# { | |
# exchange_name: { | |
# routing_key: { | |
# args: kwargs for queue_bind, | |
# bound: boolean indicating bind state, | |
# } | |
# }, | |
# } | |
self._bindings = {} | |
# A list of consumers | |
self._consumers = [] | |
# ------------------------------------------------------------------------ | |
# Queue operations | |
# ------------------------------------------------------------------------ | |
# noinspection PyBroadException | |
@coroutine | |
def declare(self, timeout=None): | |
log = self._get_log('declare') | |
# Are we declaring passively? | |
if self._passive: | |
# We are, declare using a temporary channel | |
log.debug('Using temporary channel') | |
ch = yield self._channel._connection.channel() | |
else: | |
# We're not, use our own channel | |
log.debug('Using own channel') | |
ch = self._channel | |
# Declare the queue in the channel | |
try: | |
log.debug('Declaring queue') | |
queue_res = yield ch.queue_declare( | |
queue=self._queue or '', | |
passive=self._passive, | |
durable=self._durable, | |
exclusive=self._exclusive, | |
auto_delete=self._auto_delete, | |
nowait=self._nowait, | |
arguments=self._arguments, | |
timeout=timeout) | |
self._routing_key = queue_res.method.queue | |
log.debug('Queue is %s', self._routing_key) | |
except: | |
log.exception('Failed to declare queue (queue=%s)', self._queue) | |
# If we opened a temporary channel, close it | |
if self._passive and ch.is_open: | |
yield ch.close() | |
# If we were bound to things, re-bind | |
for exchange, bindings in self._bindings.copy().iteritems(): | |
for routing_key, state in bindings.copy().iteritems(): | |
yield self.bind( | |
exchange=exchange, | |
routing_key=routing_key, | |
nowait=state['args']['nowait'], | |
arguments=state['args']['arguments'], | |
timeout=timeout) | |
# If we have any consumers, re-start those | |
for c in self._get_consumers(): | |
yield c.consume(timeout) | |
# noinspection PyBroadException | |
@coroutine | |
def bind(self, exchange, routing_key=None, | |
nowait=False, arguments=None, timeout=None): | |
log = self._get_log('bind') | |
if isinstance(exchange, AMQPExchange): | |
exchange = exchange.exchange | |
state = { | |
'args': { | |
'nowait': nowait, | |
'arguments': arguments, | |
}, | |
'bound': False, | |
} | |
try: | |
log.debug('Binding to %s:%s', exchange, routing_key) | |
yield self._channel.queue_bind( | |
queue=self.routing_key, | |
exchange=exchange or '', | |
routing_key=routing_key, | |
nowait=nowait, | |
arguments=arguments, | |
timeout=timeout) | |
# We should be bound now. | |
state['bound'] = True | |
except: | |
log.debug('Failed to bind to %s:%s', exchange, routing_key) | |
if exchange not in self._bindings: | |
self._bindings[exchange] = {} | |
self._bindings[exchange][routing_key] = state | |
raise Return(state['bound']) | |
@coroutine | |
def delete(self, if_unused=False, if_empty=False, nowait=False, | |
timeout=None): | |
log = self._get_log('delete') | |
try: | |
log.debug('Deleting queue %s', self.routing_key) | |
yield self._channel.queue_delete( | |
queue=self.routing_key, | |
if_unused=if_unused, | |
if_empty=if_empty, | |
nowait=nowait, | |
timeout=timeout | |
) | |
# We're deleted | |
self._routing_key = None | |
except: | |
log.exception('Failed to delete queue %s', self.routing_key) | |
raise | |
# noinspection PyUnusedLocal | |
@coroutine | |
def purge(self, queue, nowait=False, timeout=None): | |
yield self._channel.queue_purge( | |
queue=self.routing_key, | |
nowait=nowait, | |
timeout=timeout) | |
@coroutine | |
def unbind(self, exchange=None, routing_key=None, | |
arguments=None, timeout=None): | |
if isinstance(exchange, AMQPExchange): | |
exchange = exchange.exchange | |
yield self._channel.queue_unbind( | |
queue=self.routing_key, | |
exchange=exchange, | |
routing_key=routing_key, | |
arguments=arguments, | |
timeout=timeout) | |
# If we got here, then we're unbound | |
try: | |
del (self._bindings[exchange][routing_key]) | |
if len(self._bindings[exchange]) == 0: | |
del (self._bindings[exchange]) | |
except KeyError: | |
pass | |
def _cleanup_consumers(self): | |
self._consumers = filter(lambda cr: cr() is not None, self._consumers) | |
def _get_consumers(self): | |
return [cr() for cr in self._consumers] | |
@coroutine | |
def consume(self, consumer_callback, no_ack=False, exclusive=False, | |
consumer_tag=None, arguments=None, channel=None, timeout=None): | |
""" | |
Creates a new instance of consumer, and consumes it. | |
WARNING: Make sure you stored this result (c) in your code, otherwise it might be gc'ted, | |
since it has a weak reference. | |
""" | |
c = AMQPConsumer(channel or self._channel, consumer_callback, self, | |
no_ack, exclusive, consumer_tag, arguments) | |
yield c.consume(timeout) | |
self._cleanup_consumers() | |
self._consumers.append(weakref.ref(c)) | |
raise Return(c) | |
# noinspection PyProtectedMember | |
class AMQPExchange(AMQPMessageDestination): | |
""" | |
A representation of a message exchange. | |
Please use yield channel.exchange() instead of constructing exchange directly. | |
In case of reconnection this exchanger may be redeclared. | |
""" | |
def __init__(self, channel, exchange=None, exchange_type='direct', | |
passive=False, durable=False, auto_delete=False, internal=False, | |
nowait=False, arguments=None): | |
super(AMQPExchange, self).__init__(channel, exchange, None) | |
self._exchange_type = exchange_type | |
self._passive = passive | |
self._durable = durable | |
self._auto_delete = auto_delete | |
self._internal = internal | |
self._nowait = nowait | |
self._arguments = arguments | |
self._declared = False | |
# Our bindings to instate | |
# Format: | |
# { | |
# exchange_name: { | |
# routing_key: { | |
# args: kwargs for queue_bind, | |
# bound: boolean indicating bind state, | |
# } | |
# }, | |
# } | |
self._bindings = {} | |
# ------------------------------------------------------------------------ | |
# Exchange operations | |
# ------------------------------------------------------------------------ | |
# noinspection PyBroadException | |
@coroutine | |
def declare(self, timeout=None): | |
log = self._get_log('declare') | |
# Are we declaring passively? | |
if self._passive: | |
# We are, declare using a temporary channel | |
log.debug('Using temporary channel') | |
ch = yield self._channel._connection.channel() | |
else: | |
# We're not, use our own channel | |
log.debug('Using own channel') | |
ch = self._channel | |
# Declare the queue in the channel | |
try: | |
log.debug('Declaring exchange') | |
yield ch.exchange_declare( | |
exchange=self._exchange, | |
exchange_type=self._exchange_type, | |
passive=self._passive, | |
durable=self._durable, | |
auto_delete=self._auto_delete, | |
internal=self._internal, | |
nowait=self._nowait, | |
arguments=self._arguments, | |
timeout=timeout) | |
self._declared = True | |
log.debug('Exchange declared') | |
except: | |
log.exception('Failed to declare exchange %s', self._exchange) | |
# If we opened a temporary channel, close it | |
if self._passive and ch.is_open: | |
yield ch.close() | |
# If we were bound to things, re-bind | |
for exchange, bindings in self._bindings.copy().iteritems(): | |
for routing_key, state in bindings.copy().iteritems(): | |
yield self.bind( | |
exchange=exchange, | |
routing_key=routing_key, | |
nowait=state['args']['nowait'], | |
arguments=state['args']['arguments'], | |
timeout=timeout) | |
# noinspection PyBroadException | |
@coroutine | |
def bind(self, exchange, routing_key=None, | |
nowait=False, arguments=None, timeout=None): | |
log = self._get_log('bind') | |
if isinstance(exchange, AMQPExchange): | |
exchange = exchange.exchange | |
state = { | |
'args': { | |
'nowait': nowait, | |
'arguments': arguments, | |
}, | |
'bound': False, | |
} | |
try: | |
log.debug('Binding to %s:%s', exchange, routing_key) | |
yield self._channel.queue_bind( | |
destination=self.exchange, | |
source=exchange, | |
routing_key=routing_key or '', | |
nowait=nowait, | |
arguments=arguments, | |
timeout=timeout) | |
# We should be bound now. | |
state['bound'] = True | |
except: | |
log.debug('Failed to bind to %s:%s', exchange, routing_key) | |
if exchange not in self._bindings: | |
self._bindings[exchange] = {} | |
self._bindings[exchange][routing_key] = state | |
raise Return(state['bound']) | |
@coroutine | |
def delete(self, if_unused=False, nowait=False, timeout=None): | |
log = self._get_log('delete') | |
try: | |
log.debug('Deleting exchange %s', self.exchange) | |
yield self._channel.exchange_delete( | |
exchange=self.exchange, | |
if_unused=if_unused, | |
nowait=nowait, | |
timeout=timeout | |
) | |
self._declared = False | |
except: | |
log.exception('Failed to delete exchange %s', self.exchange) | |
raise | |
@coroutine | |
def unbind(self, exchange=None, routing_key=None, | |
arguments=None, timeout=None): | |
if isinstance(exchange, AMQPExchange): | |
exchange = exchange.exchange | |
yield self._channel.exchange_unbind( | |
destination=self.exchange, | |
source=exchange, | |
routing_key=routing_key, | |
arguments=arguments, | |
timeout=timeout) | |
# If we got here, then we're unbound | |
try: | |
del (self._bindings[exchange][routing_key]) | |
if len(self._bindings[exchange]) == 0: | |
del (self._bindings[exchange]) | |
except KeyError: | |
pass | |
# noinspection PyProtectedMember,PyPropertyDefinition | |
class AMQPConsumer(AMQPObject): | |
""" | |
An abstraction for a consumer. | |
Please use yield queue.consume() instead of constructing consumer directly. | |
In case of reconnection this consumer may be reconsumed. | |
""" | |
def __init__(self, channel, consumer_callback, queue=None, no_ack=False, | |
exclusive=False, consumer_tag=None, arguments=None): | |
self._channel = channel | |
self._consumer_callback = consumer_callback | |
self._queue = queue | |
self._no_ack = no_ack | |
self._exclusive = exclusive | |
self._consumer_tag_given = consumer_tag | |
self._arguments = arguments | |
self._consumer_tag = None | |
consumer_tag = property(lambda s: s._consumer_tag) | |
# noinspection PyUnusedLocal | |
@coroutine | |
def consume(self, timeout=None): | |
log = self._get_log('_consume') | |
try: | |
log.debug('Consuming queue %s', self._queue.routing_key) | |
result = yield self._channel.basic_consume( | |
consumer_callback=self._consumer_callback, | |
queue=self._queue.routing_key, | |
no_ack=self._no_ack, | |
exclusive=self._exclusive, | |
consumer_tag=self._consumer_tag_given, | |
arguments=self._arguments) | |
self._consumer_tag = result | |
log.debug('Consumer tag is %s', self._consumer_tag) | |
except: | |
log.exception('Failed to consume %s', self._queue.routing_key) | |
self._consumer_tag = None | |
raise | |
# noinspection PyUnusedLocal | |
@coroutine | |
def cancel(self, nowait=False, arguments=None, timeout=None): | |
log = self._get_log('cancel') | |
try: | |
log.debug('Cancelling consumer of queue %s', self._queue.routing_key) | |
yield self._channel.basic_cancel( | |
consumer_tag=self._consumer_tag_given, | |
nowait=nowait, | |
arguments=arguments) | |
self._consumer_tag = None | |
log.debug('Consumer is cancelled') | |
except: | |
log.exception('Failed to cancel consuming %s', | |
self._queue.routing_key) | |
raise | |
class TimeoutError(Exception): | |
""" | |
Exception raised when an operation times out. | |
""" | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment