@@ -0,0 +1,160 @@ |
#!/usr/bin/env python |
"""A Tornado example of RPC. |
Designed to work with rpc_server.py as found in RabbitMQ Tutorial #6: |
http://www.rabbitmq.com/tutorials/tutorial-six-python.html |
Some code is borrowed from pika's tornado example. |
""" |
import platform |
import os |
import sys |
import time |
import uuid |
import pika |
import tornado.ioloop |
import tornado.web |
from pika.adapters.tornado_connection import TornadoConnection |
__author__ = 'Brian McFadden' |
__email__ = 'brimcfadden+gist.github.com@gmail.com' |
HTML_HEADER = '<html><head><title>Tornado/Pika RPC</title></head><body>' |
HTML_FOOTER = '</body></html>' |
class Fib(tornado.web.RequestHandler): |
"""Uses an aysnchronous call to an RPC server to calculate fib(x). |
As with examples of asynchronous HTTP calls, this request will not finish |
until the remote response is received.""" |
@tornado.web.asynchronous |
def get(self, number=''): |
if not number: |
self.redirect('/30') # GET / --> GET /30 |
self.number = number |
self.pika_client = self.application.settings.get('pika_client') |
self.mq_ch = self.pika_client.channel |
self.corr_id = str(uuid.uuid4()) |
# Currently, one callback queue is made per request. Is mapping |
# responses in one queue to multiple RequestHandlers with a |
# correlation ID a better approach or not? |
self.queue_name = "{0}-{1}-{2}".format(platform.node(), os.getpid(), |
id(self)) |
# Trying to bind to the nameless exchange breaks the program. |
callback = self.on_mq_declare if EXCHANGE else self.on_queue_bind |
self.mq_ch.queue_declare(exclusive=True, queue=self.queue_name, |
callback=callback) |
def on_mq_declare(self, frame): |
lg = "Queue {0} Declared. Now binding.".format(self.queue_name) |
pika.log.info(lg) |
self.mq_ch.queue_bind(exchange='', queue=self.queue_name, |
callback=self.on_queue_bind) |
def on_queue_bind(self, frame): |
pika.log.info('Queue Bound. Issuing Basic Consume.') |
self.mq_ch.basic_consume(consumer_callback=self.on_rpc_response, |
queue=self.queue_name, no_ack=True) |
# After binding and listening to the queue with basic_consume, |
# publish the message. |
props = pika.BasicProperties(content_type='text/plain', |
delivery_mode=1, |
correlation_id=self.corr_id, |
reply_to=self.queue_name) |
pika.log.info('About to issue Basic Publish.') |
self.mq_ch.basic_publish(exchange='', routing_key='rpc_queue', |
body=str(self.number), properties=props, |
mandatory=1) |
def on_rpc_response(self, channel, method, header, body): |
lg = "RPC response: delivery tag #{0} | Body: {1}" |
pika.log.info(lg.format(method.delivery_tag, body)) |
if header.correlation_id != self.corr_id: |
# I'm actually not sure what to do here yet. |
raise Exception('Someone dialed a wrong number.') |
# After the RPC response has been received, write to the browser. |
self.write(HTML_HEADER) |
self.write("fib({0}) = {1}".format(self.number, body)) |
self.write(HTML_FOOTER) |
self.finish() |
class PikaClient(object): |
"""A modified class as described in pika's demo_tornado.py. |
It handles the connection for the Tornado instance. Messaging/RPC |
callbacks are handled by the Tornado RequestHandler above.""" |
def __init__(self): |
self.connecting = False |
self.connection = None |
self.channel = None |
def connect(self): |
if self.connecting: |
pika.log.info('Already connecting to RabbitMQ.') |
return |
pika.log.info("Connecting to RabbitMQ") |
self.connecting = True |
creds = pika.PlainCredentials('guest', 'guest') |
params = pika.ConnectionParameters(host='localhost', port=5672, |
virtual_host='/', credentials=creds) |
self.connection = TornadoConnection(params, |
on_open_callback=self.on_connect) |
self.connection.add_on_close_callback(self.on_closed) |
def on_connect(self, connection): |
self.connection = connection |
connection.channel(self.on_channel_open) |
def on_channel_open(self, channel): |
pika.log.info('Channel Open') |
self.channel = channel |
# I'm having trouble using named exchanges. |
## channel.exchange_declare(exchange='rpc_ex', type='direct', |
## auto_delete=True, durable=False, |
## callback=self.on_exchange_declare) |
def on_exchange_declare(self, frame): |
pika.log.info("Exchange declared.") |
def on_basic_cancel(self, frame): |
pika.log.info('Basic Cancel Ok.') |
# If we don't have any more consumer processes running close |
self.connection.close() |
def on_closed(self, connection): |
# We've closed our pika connection so stop the demo |
tornado.ioloop.IOLoop.instance().stop() |
def main(): |
pika.log.setup(color=True) |
pika_client = PikaClient() |
application = tornado.web.Application( |
[(r'/([0-9]*)', Fib)], |
**{'pika_client': pika_client, 'debug': True} |
) |
try: |
port = int(sys.argv[1]) # $ python tornadoweb_pika.py 80 |
except: |
port = 8080 |
application.listen(port) |
print "Tornado is serving on port {0}.".format(port) |
ioloop = tornado.ioloop.IOLoop.instance() |
ioloop.add_timeout(time.time() + .1, pika_client.connect) |
ioloop.start() |
if __name__ == '__main__': |
main() |