Skip to content

Instantly share code, notes, and snippets.

@txomon
Created June 16, 2015 13:45
Show Gist options
  • Save txomon/3978fea7a5af79b8214d to your computer and use it in GitHub Desktop.
Save txomon/3978fea7a5af79b8214d to your computer and use it in GitHub Desktop.
Kombu based amqp connections
from kombu import Exchange, messaging, Connection, Queue
try:
import simplejson as json
except ImportError:
import json
class AMQPConnection:
queue_name = None
def callback(self, body):
raise NotImplementedError()
def __init__(self, amqp_uri):
self.connection = Connection(amqp_uri)
self.exchange = Exchange(self.queue_name, type='direct',
durable=True, auto_delete=False)
self.queue = Queue(self.queue_name, self.exchange, self.queue_name,
durable=True, auto_delete=False)
self.channel = self.connection.default_channel
self.producer = messaging.Producer(self.channel, self.exchange)
self.consumer = messaging.Consumer(self.channel, self.queue)
def register_consumer(self):
self.consumer.register_callback(self.callback_processor)
self.consumer.consume()
def close(self):
self.consumer.cancel()
self.connection.release()
def send(self, data):
self.producer.publish(data, routing_key=self.queue_name)
def callback_processor(self, body, message):
self.callback(message.payload)
message.ack()
def receive(self, callback):
self.callback = callback
self.register_consumer()
while True:
self.channel.connection.client.drain_events()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment