Skip to content

Instantly share code, notes, and snippets.

@regmicmahesh
Created January 22, 2021 07:17
Show Gist options
  • Save regmicmahesh/fe9e5631362efb70accc743604f3fcbf to your computer and use it in GitHub Desktop.
Save regmicmahesh/fe9e5631362efb70accc743604f3fcbf to your computer and use it in GitHub Desktop.
import pika
import logging
class Producer:
def __init__(self,host='localhost', exchanges=[], queues=[], conn_params=None):
self.queueNames = [ x.get('queue') for queue in queues ]
self.exchangeNames = [ x.get('exchange') for exchange in exchanges ]
if conn_params:
self.connection = pika.BlockingConnection(conn_params)
else:
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host))
self.channel = self.connection.channel()
for exchange in exchanges:
self.channel.exchange_declare(**exchage)
for queue in queues:
self.channel.queue_declare(**queues)
def bind(queue , exchange, routing_key=""):
if queue not in self.queueNames:
logging.FATAL("Failed to find the queue.")
return
if exchange not in self.exchangeNames:
logging.FATAL("Failed to find the exchange")
return
self.channel.queue_bind(queue, exchange, routing_key)
def close():
for q in self.queueNames:
self.channel.queue_delete(q)
for e in self.exchangeNames:
self.channel.exchange_delete(e)
def __call__(self):
return self.channel
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment