Skip to content

Instantly share code, notes, and snippets.

View gaufung's full-sized avatar
Focusing

Fung Kao gaufung

Focusing
View GitHub Profile
@gaufung
gaufung / rabbitmq_asynchronous_publish.py
Created September 12, 2020 13:42
Rabbitmq asynchronous publish
@gen.coroutine
def publish(self, exchange, routing_key, body, properties=None):
"""
publish message. creating a brand new channel once invoke this method. After publishing, it closes the
channel.
:param exchange: exchange name
:type exchange; str or unicode
:param routing_key: routing key (e.g. dog.yellow, cat.big)
:param body: message
:param properties: properties
@gaufung
gaufung / exchange_queue_routing-key.py
Created September 12, 2020 13:40
Exhcnage queue and routing key
def _exchange_declare(self, channel, exchange=None, exchange_type='topic', **kwargs):
self.logger.info("declaring exchange: %s " % exchange)
future = Future()
def callback(unframe):
self.logger.info("declared exchange: %s" % exchange)
future.set_result(unframe)
channel.exchange_declare(callback=callback,
exchange=exchange, exchange_type=exchange_type, **kwargs)
@gaufung
gaufung / rabbitmq_asynchronous_channel.py
Created September 12, 2020 13:36
rabbitmq_asynchronous_channel.py
def _create_channel(self, connection):
self.logger.info("creating channel")
future = Future()
def on_channel_closed(channel, reply_code, reply_txt):
if reply_code not in [self._NORMAL_CLOSE_CODE, self._USER_CLOSE_CODE]:
self.logger.error("channel closed. reply code: %s; reply text: %s. system will exist"
% (reply_code, reply_txt,))
sys.exit(self._EXIST_CODE)
@gaufung
gaufung / asynchronous_connection.py
Created September 12, 2020 13:33
asynchronous_connection.py
@gen.coroutine
def connect(self):
"""
establishing two connections for publishing and receiving respectively.
:return: True if establish successfully.
"""
self._publish_connection = yield self._create_connection(self._parameter)
self._receive_connection = yield self._create_connection(self._parameter)
raise gen.Return(True)
@gaufung
gaufung / RabbitMQ_Syncrhonous_producer.py
Created September 12, 2020 13:24
RabbitMQ_Syncrhonous_producer.py
class SyncRabbitMQProducer(object):
"""
synchronize amqp producer
usage:
with SyncAMQPProducer("127.0.0.1") as p:
p.publish("exchange_name", "dog.black", "message1", "message2")
"""
def __init__(self, rabbitmq_url):
"""
@gaufung
gaufung / test_rabbitmq_rpc.py
Last active July 20, 2018 09:54
How to write unit test of tornado asynchronous code
# -*- encoding:utf-8 -*-
from __future__ import unicode_literals
import unittest
import uuid
import time
from rabbitmq.rabbitmq_rpc import AsyncRabbitMQ
from rabbitmq.rabbitmq_util import make_properties
from tornado.gen import coroutine,Return, sleep
from tornado.testing import AsyncTestCase, gen_test
from tornado.queues import Queue
@gaufung
gaufung / rabbitmq_rpc.py
Last active September 12, 2020 13:47
a asynchronous rabbitmq client using tornado
@gen.coroutine
def rpc(self, exchange, routing_key, body, timeout=None):
"""
rpc call. It create a queue randomly when encounters first call with the same exchange name. Then, it starts
consuming the created queue(waiting result). It publishes message to rabbitmq with properties that has correlation_id
and reply_to. if timeout is set, it starts a coroutine to wait timeout and raises an `Exception("timeout")`.
If server has been sent result, it return it asynchronously.
:param exchange: exchange name
:param routing_key: routing key(e.g. dog.Yellow, cat.big)
:param body: message