Created
June 16, 2015 13:45
-
-
Save txomon/3978fea7a5af79b8214d to your computer and use it in GitHub Desktop.
Kombu based amqp connections
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from kombu import Exchange, messaging, Connection, Queue | |
try: | |
import simplejson as json | |
except ImportError: | |
import json | |
class AMQPConnection: | |
queue_name = None | |
def callback(self, body): | |
raise NotImplementedError() | |
def __init__(self, amqp_uri): | |
self.connection = Connection(amqp_uri) | |
self.exchange = Exchange(self.queue_name, type='direct', | |
durable=True, auto_delete=False) | |
self.queue = Queue(self.queue_name, self.exchange, self.queue_name, | |
durable=True, auto_delete=False) | |
self.channel = self.connection.default_channel | |
self.producer = messaging.Producer(self.channel, self.exchange) | |
self.consumer = messaging.Consumer(self.channel, self.queue) | |
def register_consumer(self): | |
self.consumer.register_callback(self.callback_processor) | |
self.consumer.consume() | |
def close(self): | |
self.consumer.cancel() | |
self.connection.release() | |
def send(self, data): | |
self.producer.publish(data, routing_key=self.queue_name) | |
def callback_processor(self, body, message): | |
self.callback(message.payload) | |
message.ack() | |
def receive(self, callback): | |
self.callback = callback | |
self.register_consumer() | |
while True: | |
self.channel.connection.client.drain_events() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment