Skip to content

Instantly share code, notes, and snippets.

@gaufung
Last active September 12, 2020 13:47
Show Gist options
  • Save gaufung/ba64df0fb210f5e8f41e15b279589ea7 to your computer and use it in GitHub Desktop.
Save gaufung/ba64df0fb210f5e8f41e15b279589ea7 to your computer and use it in GitHub Desktop.
a asynchronous rabbitmq client using tornado
@gen.coroutine
def rpc(self, exchange, routing_key, body, timeout=None):
"""
rpc call. It create a queue randomly when encounters first call with the same exchange name. Then, it starts
consuming the created queue(waiting result). It publishes message to rabbitmq with properties that has correlation_id
and reply_to. if timeout is set, it starts a coroutine to wait timeout and raises an `Exception("timeout")`.
If server has been sent result, it return it asynchronously.
:param exchange: exchange name
:param routing_key: routing key(e.g. dog.Yellow, cat.big)
:param body: message
:param timeout: timeout
:return: result or Exception("timeout")
"""
self.logger.info("rpc call. exchange: %s; routing_key: %s; body: %s" % (exchange, routing_key, body,))
if exchange not in self._rpc_exchange_dict:
self._rpc_exchange_dict[exchange] = Queue(maxsize=1)
callback_queue = yield self._initialize_rpc_callback(exchange)
yield self._rpc_exchange_dict[exchange].put(callback_queue)
callback_queue = yield self._rpc_exchange_dict[exchange].get()
yield self._rpc_exchange_dict[exchange].put(callback_queue)
self.logger.info("starting calling. %s" % body)
result = yield self._call(exchange, callback_queue, routing_key, body, timeout)
raise gen.Return(result)
@gen.coroutine
def _initialize_rpc_callback(self, exchange):
self.logger.info("initialize rpc callback queue")
rpc_channel = yield self._create_channel(self._receive_connection)
yield self._exchange_declare(rpc_channel, exchange)
callback_queue = yield self._queue_declare(rpc_channel, auto_delete=True)
self.logger.info("callback queue: %s" % callback_queue)
yield self._queue_bind(rpc_channel, exchange=exchange, queue=callback_queue, routing_key=callback_queue)
rpc_channel.basic_consume(self._rpc_callback_process, queue=callback_queue)
raise gen.Return(callback_queue)
def _rpc_callback_process(self, unused_channel, basic_deliver, properties, body):
if properties.correlation_id in self._rpc_corr_id_dict:
self._rpc_corr_id_dict[properties.correlation_id].set_result(body)
def _call(self, exchange, callback_queue, routing_key, body, timeout=None):
future = Future()
corr_id = str(uuid.uuid1())
self._rpc_corr_id_dict[corr_id] = future
self.publish(exchange, routing_key, body,
properties=BasicProperties(correlation_id=corr_id,
reply_to=callback_queue))
def on_timeout():
self.logger.error("timeout")
del self._rpc_corr_id_dict[corr_id]
future.set_exception(RabbitMQError('rpc timeout'))
if timeout is not None:
self._io_loop.add_timeout(float(timeout), on_timeout)
return future
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment