Last active
January 3, 2019 03:23
-
-
Save philipcristiano/4627474 to your computer and use it in GitHub Desktop.
Simple Tornado/Pika wrapper. Part of an internal library so it raises an error on init instead of importing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
try: | |
import pika | |
from pika.adapters.tornado_connection import TornadoConnection | |
except ImportError: | |
pika = None | |
try: | |
import tornado | |
import tornado.ioloop | |
except ImportError: | |
tornado = None | |
class TornadoQueueConnection(object): | |
def __init__(self, host, user='guest', password='guest', vhost='/'): | |
if tornado is None: | |
raise Exception('You must add tornado to your requirements!') | |
if pika is None: | |
raise Exception('You must add pika to your requirements!') | |
self._parameters = pika.ConnectionParameters( | |
host=host, | |
credentials=pika.PlainCredentials(user, password), | |
virtual_host=vhost | |
) | |
self._connection = None | |
self._channel = None | |
self.ioloop = tornado.ioloop.IOLoop.instance() | |
self.ioloop.add_timeout(0, self._connect) | |
self._delivery_tag = 0 | |
self._confirmation_callbacks = {} | |
def publish(self, exchange, routing_key, headers, body, callback): | |
properties = pika.BasicProperties(content_type='text/plain') | |
if self._connection is None or self._connection.is_closed: | |
self._connect() | |
callback(False) | |
if self._channel is None or self._channel.is_closed: | |
self._open_channel() | |
callback(False) | |
self._channel.basic_publish(exchange, routing_key, body, properties) | |
self._delivery_tag += 1 | |
self._confirmation_callbacks[self._delivery_tag] = callback | |
def publish_json(self, exchange, routing_key, headers, body, callback): | |
data = ujson.dumps(body) | |
self.publish(exchange, routing_key, headers, data, callback) | |
def _on_delivery_confirmation(self, method_frame): | |
confirmation_type = method_frame.method.NAME.split('.')[1].lower() | |
tag = method_frame.method.delivery_tag | |
if confirmation_type == 'ack': | |
success = True | |
else: | |
success = False | |
callback = self._confirmation_callbacks[tag] | |
del self._confirmation_callbacks[tag] | |
callback(success) | |
def close(self): | |
self._connection.close() | |
def _connect(self): | |
self.connection = TornadoConnection( | |
self._parameters, | |
on_open_callback=self._on_connected, | |
stop_ioloop_on_close=False, | |
) | |
def _on_connected(self, connection): | |
self._connection = connection | |
self._connection.add_on_close_callback(self._on_connection_closed) | |
self._open_channel() | |
def _on_connection_closed(self, method_frame): | |
self._connection = None | |
self._connect() | |
def _open_channel(self): | |
self.connection.channel(self._on_channel_open) | |
def _on_channel_open(self, channel): | |
self._channel = channel | |
self._channel.confirm_delivery(self._on_delivery_confirmation) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
It seems that it doesn't work as expected