Last active
September 12, 2020 13:47
-
-
Save gaufung/ba64df0fb210f5e8f41e15b279589ea7 to your computer and use it in GitHub Desktop.
a asynchronous rabbitmq client using tornado
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@gen.coroutine | |
def rpc(self, exchange, routing_key, body, timeout=None): | |
""" | |
rpc call. It create a queue randomly when encounters first call with the same exchange name. Then, it starts | |
consuming the created queue(waiting result). It publishes message to rabbitmq with properties that has correlation_id | |
and reply_to. if timeout is set, it starts a coroutine to wait timeout and raises an `Exception("timeout")`. | |
If server has been sent result, it return it asynchronously. | |
:param exchange: exchange name | |
:param routing_key: routing key(e.g. dog.Yellow, cat.big) | |
:param body: message | |
:param timeout: timeout | |
:return: result or Exception("timeout") | |
""" | |
self.logger.info("rpc call. exchange: %s; routing_key: %s; body: %s" % (exchange, routing_key, body,)) | |
if exchange not in self._rpc_exchange_dict: | |
self._rpc_exchange_dict[exchange] = Queue(maxsize=1) | |
callback_queue = yield self._initialize_rpc_callback(exchange) | |
yield self._rpc_exchange_dict[exchange].put(callback_queue) | |
callback_queue = yield self._rpc_exchange_dict[exchange].get() | |
yield self._rpc_exchange_dict[exchange].put(callback_queue) | |
self.logger.info("starting calling. %s" % body) | |
result = yield self._call(exchange, callback_queue, routing_key, body, timeout) | |
raise gen.Return(result) | |
@gen.coroutine | |
def _initialize_rpc_callback(self, exchange): | |
self.logger.info("initialize rpc callback queue") | |
rpc_channel = yield self._create_channel(self._receive_connection) | |
yield self._exchange_declare(rpc_channel, exchange) | |
callback_queue = yield self._queue_declare(rpc_channel, auto_delete=True) | |
self.logger.info("callback queue: %s" % callback_queue) | |
yield self._queue_bind(rpc_channel, exchange=exchange, queue=callback_queue, routing_key=callback_queue) | |
rpc_channel.basic_consume(self._rpc_callback_process, queue=callback_queue) | |
raise gen.Return(callback_queue) | |
def _rpc_callback_process(self, unused_channel, basic_deliver, properties, body): | |
if properties.correlation_id in self._rpc_corr_id_dict: | |
self._rpc_corr_id_dict[properties.correlation_id].set_result(body) | |
def _call(self, exchange, callback_queue, routing_key, body, timeout=None): | |
future = Future() | |
corr_id = str(uuid.uuid1()) | |
self._rpc_corr_id_dict[corr_id] = future | |
self.publish(exchange, routing_key, body, | |
properties=BasicProperties(correlation_id=corr_id, | |
reply_to=callback_queue)) | |
def on_timeout(): | |
self.logger.error("timeout") | |
del self._rpc_corr_id_dict[corr_id] | |
future.set_exception(RabbitMQError('rpc timeout')) | |
if timeout is not None: | |
self._io_loop.add_timeout(float(timeout), on_timeout) | |
return future |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment