Skip to content

Instantly share code, notes, and snippets.

@gaufung
Created September 12, 2020 13:44
Show Gist options
  • Save gaufung/57a5d16e4e908ecb7632e4e7f8a98a73 to your computer and use it in GitHub Desktop.
Save gaufung/57a5d16e4e908ecb7632e4e7f8a98a73 to your computer and use it in GitHub Desktop.
Rabbitmq asyncrhonous receive
@gen.coroutine
def receive(self, exchange, routing_key, queue_name, handler, no_ack=False, prefetch_count=0):
"""
receive message. creating a brand new channel to consume message. Before consuming, it have to declaring
exchange and queue. And bind queue to particular exchange with routing key. if received properties is not
none, it publishes result back to `reply_to` queue.
:param exchange: exchange name
:param routing_key: routing key (e.g. dog.*, *.big)
:param queue_name: queue name
:param handler: message handler,
:type handler def fn(body)
:param no_ack: ack
:param prefetch_count: prefetch count
:return: None
"""
self.logger.info("[receive] exchange: %s; routing key: %s; queue name: %s" % (exchange, routing_key, queue_name,))
channel = yield self._create_channel(self._publish_connection)
yield self._exchange_declare(channel, exchange=exchange)
yield self._queue_declare(channel, queue=queue_name, auto_delete=True)
yield self._queue_bind(channel, exchange=exchange, queue=queue_name, routing_key=routing_key)
self.logger.info("[start consuming] exchange: %s; routing key: %s; queue name: %s" % (exchange,
routing_key, queue_name,))
channel.basic_qos(prefetch_count=prefetch_count)
channel.basic_consume(functools.partial(self._on_message, exchange=exchange, handler=handler)
, queue=queue_name, no_ack=no_ack)
def _on_message(self, unused_channel, basic_deliver, properties, body, exchange, handler=None):
self.logger.info("consuming message: %s" % body)
self._io_loop.spawn_callback(self._process_message, unused_channel, basic_deliver, properties, body,
exchange, handler)
@gen.coroutine
def _process_message(self, unused_channel, basic_deliver, properties, body, exchange, handler=None):
try:
result = yield handler(body)
self.logger.info("%s has been processed successfully and result is %s" % (body, result,))
if properties is not None \
and properties.reply_to is not None \
and properties.correlation_id is not None:
self.logger.info("sending result back to %s" % properties.reply_to)
self.publish(exchange=exchange,
routing_key=properties.reply_to,
properties=BasicProperties(correlation_id=properties.correlation_id),
body=str(result))
unused_channel.basic_ack(basic_deliver.delivery_tag)
except Exception:
unused_channel.basic_ack(basic_deliver.delivery_tag)
import traceback
self.logger.error(traceback.format_exc())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment