Skip to content

Instantly share code, notes, and snippets.

@gagnec
Created May 6, 2013 20:07
Show Gist options
  • Save gagnec/5527757 to your computer and use it in GitHub Desktop.
Save gagnec/5527757 to your computer and use it in GitHub Desktop.
Sample code for a blocking amqp fanout exchange producer using Pika.
#!/usr/bin/env python
import pika
class IncrementProducer(object):
"""
A simple RabbitMQ producer that repeatedly sends an incrementing number to
the specified exchange. It blocks until interrupted by the user or
the process is terminated.
"""
def __init__(self, host, exchange):
"""
Create a producer instance and a connection to RabbitMQ.
"""
self.host = host
self.exchange = exchange
self.channel = None
self.queue = ''
self.type = 'fanout'
self.channel = None
self.number = 1
self.parameters = pika.ConnectionParameters(self.host)
self.connection = pika.SelectConnection(self.parameters,
self.on_connected)
def on_connected(self, connection):
"""
Called by pika when a connection is established.
"""
connection.channel(self.on_channel_open)
def on_channel_open(self, channel):
"""
Called by pika when the channel is opened.
"""
self.channel = channel
self.channel.exchange_declare(self.on_exchange_declareok,
self.exchange,
self.type)
def on_exchange_declareok(self, frame):
"""
Called by pika when RabbitMQ has finished the Exchange.Declare
command.
"""
self.channel.queue_declare(self.on_queue_declareok, self.queue)
def on_queue_declareok(self, frame):
"""
Called by pika when RabbitMQ has finished the Queue.Declare
command.
"""
self.channel.queue_bind(self.on_bindok, self.queue, self.exchange)
def on_bindok(self, frame):
"""
Called by pika when RabbitMQ has finished the Queue.Bind command.
Now it's safe to start publishing messages.
"""
self.start_publishing()
def start_publishing(self):
"""
Start publishing messages.
Note: This blocks the ioloop from processing any other events.
"""
i = 1
while True:
self.channel.basic_publish(
self.exchange, '', str(i),
pika.BasicProperties(content_type='text/plain',
delivery_mode=1))
i += 1
if __name__ == "__main__":
producer = IncrementProducer('localhost', 'fizzbuzz')
try:
producer.connection.ioloop.start()
except KeyboardInterrupt:
# Close the connection and restart the ioloop so the process can
# terminate.
producer.connection.close()
producer.connection.ioloop.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment