Skip to content

Instantly share code, notes, and snippets.

@gaufung
Created September 12, 2020 13:24
Show Gist options
  • Save gaufung/4b8674de501061105764ad188e861d16 to your computer and use it in GitHub Desktop.
Save gaufung/4b8674de501061105764ad188e861d16 to your computer and use it in GitHub Desktop.
RabbitMQ_Syncrhonous_producer.py
class SyncRabbitMQProducer(object):
"""
synchronize amqp producer
usage:
with SyncAMQPProducer("127.0.0.1") as p:
p.publish("exchange_name", "dog.black", "message1", "message2")
"""
def __init__(self, rabbitmq_url):
"""
:param rabbitmq_url:
"""
self._parameter = ConnectionParameters("127.0.0.1") if rabbitmq_url in ["localhost", "127.0.0.1"] else \
URLParameters(rabbitmq_url)
self._logger = logging.getLogger(__name__)
self._connection = None
self._channel = None
@property
def logger(self):
"""
logger
:return:
"""
return self._logger
def _connect(self):
self.logger.info("initialize connection and channel")
self._connection = BlockingConnection(self._parameter)
self._channel = self._connection.channel()
def _disconnect(self):
self.logger.info("tear down channel and connection")
if self._channel is not None:
self._channel.close()
if self._connection is not None:
self._connection.close()
def __enter__(self):
self._connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._disconnect()
return not isinstance(exc_val, Exception)
def publish(self, exchange, routing_key, *messages, **kwargs):
self.logger.info("[publishing] exchange name: %s; routing key: %s" % (exchange, routing_key,))
if self._channel is None:
raise RabbitMQError("channel has not been initialized")
properties = kwargs.pop("properties") if kwargs.has_key("properties") else None
for message in messages:
self.logger.info("publish message: %s" % message)
self._channel.basic_publish(exchange=exchange,
routing_key=routing_key,
body=str(message),
properties=properties)
def publish_messages(self, exchange, messages, **kwargs):
self.logger.info("[publish_message] exchange name: %s" % exchange)
if self._channel is None:
raise RabbitMQError("channel has not been initialized")
if not isinstance(messages, dict):
raise RabbitMQError("messages is not dict")
properties = kwargs.pop("properties") if kwargs.has_key("properties") else None
for routing_key, message in messages.items():
self.logger.info("routing key:%s, message: %s" % (routing_key, message, ))
self._channel.basic_publish(exchange=exchange,
routing_key=routing_key,
body=str(message),
properties=properties)
def connect(self):
"""
This is method doesn't recommend. using `with` context instead
:return: None
"""
warnings.warn("Call connect() method, using `with` context instead.", category=DeprecationWarning, stacklevel=2)
self._connect()
def disconnect(self):
"""
This is method doesn't recommend. using `with` context instead
:return: None
"""
warnings.warn("Call disconnect() method, using `with` context instead.", category=DeprecationWarning, stacklevel=2)
self._disconnect()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment