Skip to content

Instantly share code, notes, and snippets.

@hanx11
Forked from brimcfadden/tornadoweb_pika.py
Created November 19, 2018 00:42

Revisions

  1. @brimcfadden brimcfadden created this gist Jun 1, 2012.
    160 changes: 160 additions & 0 deletions tornadoweb_pika.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,160 @@
    #!/usr/bin/env python

    """A Tornado example of RPC.
    Designed to work with rpc_server.py as found in RabbitMQ Tutorial #6:
    http://www.rabbitmq.com/tutorials/tutorial-six-python.html
    Some code is borrowed from pika's tornado example.
    """

    import platform
    import os
    import sys
    import time
    import uuid

    import pika
    import tornado.ioloop
    import tornado.web

    from pika.adapters.tornado_connection import TornadoConnection

    __author__ = 'Brian McFadden'
    __email__ = 'brimcfadden+gist.github.com@gmail.com'

    HTML_HEADER = '<html><head><title>Tornado/Pika RPC</title></head><body>'
    HTML_FOOTER = '</body></html>'

    EXCHANGE = ''


    class Fib(tornado.web.RequestHandler):
    """Uses an aysnchronous call to an RPC server to calculate fib(x).
    As with examples of asynchronous HTTP calls, this request will not finish
    until the remote response is received."""

    @tornado.web.asynchronous
    def get(self, number=''):

    if not number:
    self.redirect('/30') # GET / --> GET /30

    self.number = number
    self.pika_client = self.application.settings.get('pika_client')
    self.mq_ch = self.pika_client.channel
    self.corr_id = str(uuid.uuid4())
    # Currently, one callback queue is made per request. Is mapping
    # responses in one queue to multiple RequestHandlers with a
    # correlation ID a better approach or not?
    self.queue_name = "{0}-{1}-{2}".format(platform.node(), os.getpid(),
    id(self))
    # Trying to bind to the nameless exchange breaks the program.
    callback = self.on_mq_declare if EXCHANGE else self.on_queue_bind
    self.mq_ch.queue_declare(exclusive=True, queue=self.queue_name,
    callback=callback)

    def on_mq_declare(self, frame):
    lg = "Queue {0} Declared. Now binding.".format(self.queue_name)
    pika.log.info(lg)
    self.mq_ch.queue_bind(exchange='', queue=self.queue_name,
    callback=self.on_queue_bind)

    def on_queue_bind(self, frame):
    pika.log.info('Queue Bound. Issuing Basic Consume.')
    self.mq_ch.basic_consume(consumer_callback=self.on_rpc_response,
    queue=self.queue_name, no_ack=True)

    # After binding and listening to the queue with basic_consume,
    # publish the message.
    props = pika.BasicProperties(content_type='text/plain',
    delivery_mode=1,
    correlation_id=self.corr_id,
    reply_to=self.queue_name)
    pika.log.info('About to issue Basic Publish.')
    self.mq_ch.basic_publish(exchange='', routing_key='rpc_queue',
    body=str(self.number), properties=props,
    mandatory=1)

    def on_rpc_response(self, channel, method, header, body):
    lg = "RPC response: delivery tag #{0} | Body: {1}"
    pika.log.info(lg.format(method.delivery_tag, body))
    if header.correlation_id != self.corr_id:
    # I'm actually not sure what to do here yet.
    raise Exception('Someone dialed a wrong number.')

    # After the RPC response has been received, write to the browser.
    self.write(HTML_HEADER)
    self.write("fib({0}) = {1}".format(self.number, body))
    self.write(HTML_FOOTER)
    self.finish()


    class PikaClient(object):
    """A modified class as described in pika's demo_tornado.py.
    It handles the connection for the Tornado instance. Messaging/RPC
    callbacks are handled by the Tornado RequestHandler above."""

    def __init__(self):
    self.connecting = False
    self.connection = None
    self.channel = None

    def connect(self):
    if self.connecting:
    pika.log.info('Already connecting to RabbitMQ.')
    return
    pika.log.info("Connecting to RabbitMQ")
    self.connecting = True
    creds = pika.PlainCredentials('guest', 'guest')
    params = pika.ConnectionParameters(host='localhost', port=5672,
    virtual_host='/', credentials=creds)
    self.connection = TornadoConnection(params,
    on_open_callback=self.on_connect)
    self.connection.add_on_close_callback(self.on_closed)

    def on_connect(self, connection):
    self.connection = connection
    connection.channel(self.on_channel_open)

    def on_channel_open(self, channel):
    pika.log.info('Channel Open')
    self.channel = channel
    # I'm having trouble using named exchanges.
    ## channel.exchange_declare(exchange='rpc_ex', type='direct',
    ## auto_delete=True, durable=False,
    ## callback=self.on_exchange_declare)

    def on_exchange_declare(self, frame):
    pika.log.info("Exchange declared.")

    def on_basic_cancel(self, frame):
    pika.log.info('Basic Cancel Ok.')
    # If we don't have any more consumer processes running close
    self.connection.close()

    def on_closed(self, connection):
    # We've closed our pika connection so stop the demo
    tornado.ioloop.IOLoop.instance().stop()


    def main():
    pika.log.setup(color=True)
    pika_client = PikaClient()
    application = tornado.web.Application(
    [(r'/([0-9]*)', Fib)],
    **{'pika_client': pika_client, 'debug': True}
    )
    try:
    port = int(sys.argv[1]) # $ python tornadoweb_pika.py 80
    except:
    port = 8080
    application.listen(port)
    print "Tornado is serving on port {0}.".format(port)
    ioloop = tornado.ioloop.IOLoop.instance()
    ioloop.add_timeout(time.time() + .1, pika_client.connect)
    ioloop.start()

    if __name__ == '__main__':
    main()