Skip to content

Instantly share code, notes, and snippets.

@hagino3000
Created August 11, 2013 13:36
Show Gist options
  • Save hagino3000/6204941 to your computer and use it in GitHub Desktop.
Save hagino3000/6204941 to your computer and use it in GitHub Desktop.
pika test
import random
import pika
def get_connection():
mq_clusters = ['rabbit1', 'rabbit2']
random.shuffle(mq_clusters)
for mq in mq_clusters:
try:
params = pika.ConnectionParameters(mq)
connection = pika.BlockingConnection(params)
return connection
except Exception, e:
print("Try next node")
raise Exception("Cannot establish connection")
def send(con, msg):
channel = con.channel()
channel.queue_declare(queue='cluster_test', durable=True)
channel.basic_publish(
exchange='',
routing_key='cluster_test',
body=msg,
properties=pika.BasicProperties(
# Persistent
delivery_mode=2,
)
)
print " [x] sent %s" % msg
for i in xrange(1, 1000):
con = get_connection()
msg = 'Fron AP1 to %s: %i' % (con.params.host, i)
send(con, msg)
con.close()
import pika
queue_name = 'cluster_test'
params = pika.ConnectionParameters('rabbit1')
connection = pika.BlockingConnection(params)
channel = connection.channel()
channel.queue_declare(queue=queue_name, durable=True)
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(callback, queue=queue_name)
channel.start_consuming()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment