Skip to content

Instantly share code, notes, and snippets.

@amferraz
Created May 22, 2013 18:14
Show Gist options
  • Save amferraz/5629669 to your computer and use it in GitHub Desktop.
Save amferraz/5629669 to your computer and use it in GitHub Desktop.
Comments on the example usage of TwistedProtocolConnection https://github.com/pika/pika/blob/master/docs/examples/twisted_example.rst
# -*- coding:utf-8 -*-
import pika
from pika import exceptions
from pika.adapters import twisted_connection
from twisted.internet import defer, reactor, protocol, task
# First of all, what is a Twisted deferred?
# reference http://migre.me/eBL9F
#
# A twisted.internet.defer.Deferred is a promise that a function will at some
# point have a result. We can attach callback functions to a Deferred, and once
# it gets a result these callbacks will be called. In addition Deferreds allow
# the developer to register a callback for an error, with the default behavior
# of logging the error. The deferred mechanism standardizes the application
# programmer's interface with all sorts of blocking or delayed operations.
# And what about inlineCallbacks?
# reference: http://migre.me/eBCdy
#
# "When the code inside a function decorated by the inlineCallbacks decorator
# yields a Deferred (in that case, a function that returns a Deferred), the
# code goes on and the reactor will come back after the Deferred fires. Its
# return value will be returned on the yield statement, and, if any errors
# occurred, the exception will be raised."
# Also, very important: "Your inlineCallbacks-enabled generator will return a
# Deferred object". (http://migre.me/eEBnw)
# When we use TwistedProtocolConnection, channel() is a deferred. Its
# underlying channel implementation is a TwistedChannel, and the following
# methods all return deferreds: exchange_declare(), queue_declare(),
# queue_bind(), basic_qos() and basic_consume(). You can see below that the
# metioned methods are called with yield, so they are fired and its value is
# returned.
@defer.inlineCallbacks
def run(connection):
"""Receives the connection as parameter and then consumes from the queue
periodically.
"""
channel = yield connection.channel()
exchange = yield channel.exchange_declare(exchange='topic_link',
type='topic')
queue = yield channel.queue_declare(queue='hello', auto_delete=False,
exclusive=False)
yield channel.queue_bind(exchange='topic_link', queue='hello',
routing_key='hello.world')
yield channel.basic_qos(prefetch_count=1)
# The queue object is an instance of ClosableDeferredQueue, where data
# received from the queue will be stored. Clients should use its get()
# method to fetch individual message.
# reference: http://migre.me/eBEmp
queue_object, consumer_tag = yield channel.basic_consume(queue='hello',
no_ack=False)
# call read(queue_object) repeatedly. However, "If f [the parameter
# function] returns a deferred, rescheduling will not take place until the
# deferred has fired. The result value is ignored."
# And, remember, "Your inlineCallbacks-enabled generator will return a
# Deferred object..
# reference: http://migre.me/eBCeF
l = task.LoopingCall(read, queue_object)
# Starts running the function every interval seconds.
l.start(interval=0.01)
@defer.inlineCallbacks
def read(queue_object):
"""Responsible for consuming the queue. See that it ACKs at the end.
"""
# queue_object.get() is a ClosableDeferredQueue, hence, a Deferred
# ch: the channel
# method: the deliver method
# properties: message properties
# body: the message body
# (these are the same values that are passed as parameters to the callback
# funcion. See an example in the Pika Tutorial: http://migre.me/eBLB3 )
ch, method, properties, body = yield queue_object.get()
if body:
print body
yield ch.basic_ack(delivery_tag=method.delivery_tag)
# default connection parameters
parameters = pika.ConnectionParameters()
# a deffered of a connection
cc = protocol.ClientCreator(
reactor,
twisted_connection.TwistedProtocolConnection,
parameters
)
# Take into account: d is a Deferred, hence, a promise of something. In this
# case, a promise of a connection: when you add a function as a callback of a
# deferred, when the deferred is ready, it will call the callback passing the
# promised value of the defered as parameter to that callback. So,
# d.addlCallback(function) whill call function(connection), and connection
# will be the promised value of cc.connectTCP. The deferred promised result
# will be of type twisted_connection.TwistedProtocolConnection, since it is the
# configured type.
d = cc.connectTCP('127.0.0.1', 5672)
# "protocol" is a TwistedProtocolConnection. It has a deferred (ready) that is
# very important. Here goes the TwistedProtocolConnection docstring:
#
# > A hybrid between a Pika Connection and a Twisted Protocol. Allows using
# > Twisted's non-blocking connectTCP/connectSSL methods for connecting to the
# > server.
# > It has one caveat: TwistedProtocolConnection objects have a ready
# > instance variable that's a Deferred which fires when the connection is
# > ready to be used (the initial AMQP handshaking has been done). You *have*
# > to wait for this Deferred to fire before requesting a channel.
# > Since it's Twisted handling connection establishing it does not accept
# > connect callbacks, you have to implement that within Twisted. (...)
#
# That's why we first put TwistedProtocolConnection.ready() as a callback to
# the connection callback, and then later we put run() as a callback
d.addCallback(lambda protocol: protocol.ready)
# then call run(connection)
d.addCallback(run)
# let the carnage begin
reactor.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment