Created
September 12, 2020 13:44
-
-
Save gaufung/57a5d16e4e908ecb7632e4e7f8a98a73 to your computer and use it in GitHub Desktop.
Rabbitmq asyncrhonous receive
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@gen.coroutine | |
def receive(self, exchange, routing_key, queue_name, handler, no_ack=False, prefetch_count=0): | |
""" | |
receive message. creating a brand new channel to consume message. Before consuming, it have to declaring | |
exchange and queue. And bind queue to particular exchange with routing key. if received properties is not | |
none, it publishes result back to `reply_to` queue. | |
:param exchange: exchange name | |
:param routing_key: routing key (e.g. dog.*, *.big) | |
:param queue_name: queue name | |
:param handler: message handler, | |
:type handler def fn(body) | |
:param no_ack: ack | |
:param prefetch_count: prefetch count | |
:return: None | |
""" | |
self.logger.info("[receive] exchange: %s; routing key: %s; queue name: %s" % (exchange, routing_key, queue_name,)) | |
channel = yield self._create_channel(self._publish_connection) | |
yield self._exchange_declare(channel, exchange=exchange) | |
yield self._queue_declare(channel, queue=queue_name, auto_delete=True) | |
yield self._queue_bind(channel, exchange=exchange, queue=queue_name, routing_key=routing_key) | |
self.logger.info("[start consuming] exchange: %s; routing key: %s; queue name: %s" % (exchange, | |
routing_key, queue_name,)) | |
channel.basic_qos(prefetch_count=prefetch_count) | |
channel.basic_consume(functools.partial(self._on_message, exchange=exchange, handler=handler) | |
, queue=queue_name, no_ack=no_ack) | |
def _on_message(self, unused_channel, basic_deliver, properties, body, exchange, handler=None): | |
self.logger.info("consuming message: %s" % body) | |
self._io_loop.spawn_callback(self._process_message, unused_channel, basic_deliver, properties, body, | |
exchange, handler) | |
@gen.coroutine | |
def _process_message(self, unused_channel, basic_deliver, properties, body, exchange, handler=None): | |
try: | |
result = yield handler(body) | |
self.logger.info("%s has been processed successfully and result is %s" % (body, result,)) | |
if properties is not None \ | |
and properties.reply_to is not None \ | |
and properties.correlation_id is not None: | |
self.logger.info("sending result back to %s" % properties.reply_to) | |
self.publish(exchange=exchange, | |
routing_key=properties.reply_to, | |
properties=BasicProperties(correlation_id=properties.correlation_id), | |
body=str(result)) | |
unused_channel.basic_ack(basic_deliver.delivery_tag) | |
except Exception: | |
unused_channel.basic_ack(basic_deliver.delivery_tag) | |
import traceback | |
self.logger.error(traceback.format_exc()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment