Created
September 12, 2020 13:24
-
-
Save gaufung/4b8674de501061105764ad188e861d16 to your computer and use it in GitHub Desktop.
RabbitMQ_Syncrhonous_producer.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class SyncRabbitMQProducer(object): | |
""" | |
synchronize amqp producer | |
usage: | |
with SyncAMQPProducer("127.0.0.1") as p: | |
p.publish("exchange_name", "dog.black", "message1", "message2") | |
""" | |
def __init__(self, rabbitmq_url): | |
""" | |
:param rabbitmq_url: | |
""" | |
self._parameter = ConnectionParameters("127.0.0.1") if rabbitmq_url in ["localhost", "127.0.0.1"] else \ | |
URLParameters(rabbitmq_url) | |
self._logger = logging.getLogger(__name__) | |
self._connection = None | |
self._channel = None | |
@property | |
def logger(self): | |
""" | |
logger | |
:return: | |
""" | |
return self._logger | |
def _connect(self): | |
self.logger.info("initialize connection and channel") | |
self._connection = BlockingConnection(self._parameter) | |
self._channel = self._connection.channel() | |
def _disconnect(self): | |
self.logger.info("tear down channel and connection") | |
if self._channel is not None: | |
self._channel.close() | |
if self._connection is not None: | |
self._connection.close() | |
def __enter__(self): | |
self._connect() | |
return self | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
self._disconnect() | |
return not isinstance(exc_val, Exception) | |
def publish(self, exchange, routing_key, *messages, **kwargs): | |
self.logger.info("[publishing] exchange name: %s; routing key: %s" % (exchange, routing_key,)) | |
if self._channel is None: | |
raise RabbitMQError("channel has not been initialized") | |
properties = kwargs.pop("properties") if kwargs.has_key("properties") else None | |
for message in messages: | |
self.logger.info("publish message: %s" % message) | |
self._channel.basic_publish(exchange=exchange, | |
routing_key=routing_key, | |
body=str(message), | |
properties=properties) | |
def publish_messages(self, exchange, messages, **kwargs): | |
self.logger.info("[publish_message] exchange name: %s" % exchange) | |
if self._channel is None: | |
raise RabbitMQError("channel has not been initialized") | |
if not isinstance(messages, dict): | |
raise RabbitMQError("messages is not dict") | |
properties = kwargs.pop("properties") if kwargs.has_key("properties") else None | |
for routing_key, message in messages.items(): | |
self.logger.info("routing key:%s, message: %s" % (routing_key, message, )) | |
self._channel.basic_publish(exchange=exchange, | |
routing_key=routing_key, | |
body=str(message), | |
properties=properties) | |
def connect(self): | |
""" | |
This is method doesn't recommend. using `with` context instead | |
:return: None | |
""" | |
warnings.warn("Call connect() method, using `with` context instead.", category=DeprecationWarning, stacklevel=2) | |
self._connect() | |
def disconnect(self): | |
""" | |
This is method doesn't recommend. using `with` context instead | |
:return: None | |
""" | |
warnings.warn("Call disconnect() method, using `with` context instead.", category=DeprecationWarning, stacklevel=2) | |
self._disconnect() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment