Created
May 22, 2013 18:14
-
-
Save amferraz/5629669 to your computer and use it in GitHub Desktop.
Comments on the example usage of TwistedProtocolConnection
https://github.com/pika/pika/blob/master/docs/examples/twisted_example.rst
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding:utf-8 -*- | |
import pika | |
from pika import exceptions | |
from pika.adapters import twisted_connection | |
from twisted.internet import defer, reactor, protocol, task | |
# First of all, what is a Twisted deferred? | |
# reference http://migre.me/eBL9F | |
# | |
# A twisted.internet.defer.Deferred is a promise that a function will at some | |
# point have a result. We can attach callback functions to a Deferred, and once | |
# it gets a result these callbacks will be called. In addition Deferreds allow | |
# the developer to register a callback for an error, with the default behavior | |
# of logging the error. The deferred mechanism standardizes the application | |
# programmer's interface with all sorts of blocking or delayed operations. | |
# And what about inlineCallbacks? | |
# reference: http://migre.me/eBCdy | |
# | |
# "When the code inside a function decorated by the inlineCallbacks decorator | |
# yields a Deferred (in that case, a function that returns a Deferred), the | |
# code goes on and the reactor will come back after the Deferred fires. Its | |
# return value will be returned on the yield statement, and, if any errors | |
# occurred, the exception will be raised." | |
# Also, very important: "Your inlineCallbacks-enabled generator will return a | |
# Deferred object". (http://migre.me/eEBnw) | |
# When we use TwistedProtocolConnection, channel() is a deferred. Its | |
# underlying channel implementation is a TwistedChannel, and the following | |
# methods all return deferreds: exchange_declare(), queue_declare(), | |
# queue_bind(), basic_qos() and basic_consume(). You can see below that the | |
# metioned methods are called with yield, so they are fired and its value is | |
# returned. | |
@defer.inlineCallbacks | |
def run(connection): | |
"""Receives the connection as parameter and then consumes from the queue | |
periodically. | |
""" | |
channel = yield connection.channel() | |
exchange = yield channel.exchange_declare(exchange='topic_link', | |
type='topic') | |
queue = yield channel.queue_declare(queue='hello', auto_delete=False, | |
exclusive=False) | |
yield channel.queue_bind(exchange='topic_link', queue='hello', | |
routing_key='hello.world') | |
yield channel.basic_qos(prefetch_count=1) | |
# The queue object is an instance of ClosableDeferredQueue, where data | |
# received from the queue will be stored. Clients should use its get() | |
# method to fetch individual message. | |
# reference: http://migre.me/eBEmp | |
queue_object, consumer_tag = yield channel.basic_consume(queue='hello', | |
no_ack=False) | |
# call read(queue_object) repeatedly. However, "If f [the parameter | |
# function] returns a deferred, rescheduling will not take place until the | |
# deferred has fired. The result value is ignored." | |
# And, remember, "Your inlineCallbacks-enabled generator will return a | |
# Deferred object.. | |
# reference: http://migre.me/eBCeF | |
l = task.LoopingCall(read, queue_object) | |
# Starts running the function every interval seconds. | |
l.start(interval=0.01) | |
@defer.inlineCallbacks | |
def read(queue_object): | |
"""Responsible for consuming the queue. See that it ACKs at the end. | |
""" | |
# queue_object.get() is a ClosableDeferredQueue, hence, a Deferred | |
# ch: the channel | |
# method: the deliver method | |
# properties: message properties | |
# body: the message body | |
# (these are the same values that are passed as parameters to the callback | |
# funcion. See an example in the Pika Tutorial: http://migre.me/eBLB3 ) | |
ch, method, properties, body = yield queue_object.get() | |
if body: | |
print body | |
yield ch.basic_ack(delivery_tag=method.delivery_tag) | |
# default connection parameters | |
parameters = pika.ConnectionParameters() | |
# a deffered of a connection | |
cc = protocol.ClientCreator( | |
reactor, | |
twisted_connection.TwistedProtocolConnection, | |
parameters | |
) | |
# Take into account: d is a Deferred, hence, a promise of something. In this | |
# case, a promise of a connection: when you add a function as a callback of a | |
# deferred, when the deferred is ready, it will call the callback passing the | |
# promised value of the defered as parameter to that callback. So, | |
# d.addlCallback(function) whill call function(connection), and connection | |
# will be the promised value of cc.connectTCP. The deferred promised result | |
# will be of type twisted_connection.TwistedProtocolConnection, since it is the | |
# configured type. | |
d = cc.connectTCP('127.0.0.1', 5672) | |
# "protocol" is a TwistedProtocolConnection. It has a deferred (ready) that is | |
# very important. Here goes the TwistedProtocolConnection docstring: | |
# | |
# > A hybrid between a Pika Connection and a Twisted Protocol. Allows using | |
# > Twisted's non-blocking connectTCP/connectSSL methods for connecting to the | |
# > server. | |
# > It has one caveat: TwistedProtocolConnection objects have a ready | |
# > instance variable that's a Deferred which fires when the connection is | |
# > ready to be used (the initial AMQP handshaking has been done). You *have* | |
# > to wait for this Deferred to fire before requesting a channel. | |
# > Since it's Twisted handling connection establishing it does not accept | |
# > connect callbacks, you have to implement that within Twisted. (...) | |
# | |
# That's why we first put TwistedProtocolConnection.ready() as a callback to | |
# the connection callback, and then later we put run() as a callback | |
d.addCallback(lambda protocol: protocol.ready) | |
# then call run(connection) | |
d.addCallback(run) | |
# let the carnage begin | |
reactor.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment