Created
May 6, 2013 20:07
-
-
Save gagnec/5527757 to your computer and use it in GitHub Desktop.
Sample code for a blocking amqp fanout exchange producer using Pika.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import pika | |
class IncrementProducer(object): | |
""" | |
A simple RabbitMQ producer that repeatedly sends an incrementing number to | |
the specified exchange. It blocks until interrupted by the user or | |
the process is terminated. | |
""" | |
def __init__(self, host, exchange): | |
""" | |
Create a producer instance and a connection to RabbitMQ. | |
""" | |
self.host = host | |
self.exchange = exchange | |
self.channel = None | |
self.queue = '' | |
self.type = 'fanout' | |
self.channel = None | |
self.number = 1 | |
self.parameters = pika.ConnectionParameters(self.host) | |
self.connection = pika.SelectConnection(self.parameters, | |
self.on_connected) | |
def on_connected(self, connection): | |
""" | |
Called by pika when a connection is established. | |
""" | |
connection.channel(self.on_channel_open) | |
def on_channel_open(self, channel): | |
""" | |
Called by pika when the channel is opened. | |
""" | |
self.channel = channel | |
self.channel.exchange_declare(self.on_exchange_declareok, | |
self.exchange, | |
self.type) | |
def on_exchange_declareok(self, frame): | |
""" | |
Called by pika when RabbitMQ has finished the Exchange.Declare | |
command. | |
""" | |
self.channel.queue_declare(self.on_queue_declareok, self.queue) | |
def on_queue_declareok(self, frame): | |
""" | |
Called by pika when RabbitMQ has finished the Queue.Declare | |
command. | |
""" | |
self.channel.queue_bind(self.on_bindok, self.queue, self.exchange) | |
def on_bindok(self, frame): | |
""" | |
Called by pika when RabbitMQ has finished the Queue.Bind command. | |
Now it's safe to start publishing messages. | |
""" | |
self.start_publishing() | |
def start_publishing(self): | |
""" | |
Start publishing messages. | |
Note: This blocks the ioloop from processing any other events. | |
""" | |
i = 1 | |
while True: | |
self.channel.basic_publish( | |
self.exchange, '', str(i), | |
pika.BasicProperties(content_type='text/plain', | |
delivery_mode=1)) | |
i += 1 | |
if __name__ == "__main__": | |
producer = IncrementProducer('localhost', 'fizzbuzz') | |
try: | |
producer.connection.ioloop.start() | |
except KeyboardInterrupt: | |
# Close the connection and restart the ioloop so the process can | |
# terminate. | |
producer.connection.close() | |
producer.connection.ioloop.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment