Skip to content

Instantly share code, notes, and snippets.

@rectalogic
Last active August 2, 2016 13:22
Show Gist options
  • Save rectalogic/3c451330f2712e6a2d51e664f0ec5016 to your computer and use it in GitHub Desktop.
Save rectalogic/3c451330f2712e6a2d51e664f0ec5016 to your computer and use it in GitHub Desktop.
RabbitMQ exchange federation test with two brokers.
# Configure two rabbitmq nodes and federate all exchanges between them.
# $ docker build -t rectalogic/rabbitmq:latest .
# $ docker network create -d bridge rabbit-net
# $ docker run -d --hostname rabbit1 --name rabbit1 --network=rabbit-net -p=56721:5672 rectalogic/rabbitmq:latest
# $ docker run -d --hostname rabbit2 --name rabbit2 --network=rabbit-net -p=56722:5672 rectalogic/rabbitmq:latest
# $ docker exec rabbit1 rabbitmqctl set_parameter federation-upstream upstream2 '{"uri":"amqp://rabbit2:5672","max-hops":1}'
# $ docker exec rabbit1 rabbitmqctl set_policy --apply-to exchanges federate-me '.*' '{"federation-upstream-set":"all"}'
# $ docker exec rabbit2 rabbitmqctl set_parameter federation-upstream upstream1 '{"uri":"amqp://rabbit1:5672","max-hops":1}'
# $ docker exec rabbit2 rabbitmqctl set_policy --apply-to exchanges federate-me '.*' '{"federation-upstream-set":"all"}'
# $ docker exec rabbit1 rabbitmqctl eval 'rabbit_federation_status:status().'
# https://hub.docker.com/_/rabbitmq/
FROM rabbitmq:3
MAINTAINER rectalogic
ENV RABBITMQ_ERLANG_COOKIE="secret cookie"
RUN rabbitmq-plugins enable --offline rabbitmq_federation
import sys
import logging
import time
import signal
import itertools
import kombu
import kombu.pools
log = logging.getLogger(__name__)
# RABBITMQ = "amqp://localhost:56722"
RABBITMQ = "amqp://localhost:56721"
#RABBITMQ = "amqp://localhost:56721;amqp://localhost:56722"
MOBILE_QUEUE = "test_queue"
def publish(payload):
def errback(exc, interval):
log.error('Mobile push notify publish error (retry in %f): %r', interval, exc, exc_info=True)
connection = kombu.Connection(RABBITMQ)
with kombu.pools.connections[connection].acquire(block=True) as connection:
connection.ensure_connection(errback=errback, max_retries=2, interval_max=0)
with connection.SimpleQueue(MOBILE_QUEUE, no_ack=False, queue_opts={"durable": True}) as queue:
queue.put(payload)
log.info(u"Published {0}".format(payload))
def receive():
def errback(exc, interval):
log.error('Mobile push receive error (retry in %f): %r', interval, exc, exc_info=True)
connection = kombu.Connection(RABBITMQ)
while True:
try:
with kombu.pools.connections[connection].acquire(block=True) as connection:
connection.ensure_connection(errback=errback, max_retries=2, interval_max=0)
with connection.SimpleQueue(MOBILE_QUEUE, no_ack=False, queue_opts={"durable": True}) as queue:
while True:
try:
# If we use timeout, we can handle signals
message = queue.get(block=True, timeout=86400)
except queue.Empty:
log.warn("Queue timeout, ignoring")
continue
log.info(u"Received {0}".format(message.payload))
time.sleep(0.3)
message.ack()
except Exception:
log.exception("Failure, retrying...")
time.sleep(1)
if __name__ == "__main__":
def signal_handler(signal, frame):
print "signal {0}".format(signal)
signal.signal(signal.SIGTERM, signal_handler)
logging.basicConfig(level=logging.DEBUG, stream=sys.stderr)
if sys.argv[1] == "publish":
log.info("Publishing...")
for i in itertools.count():
publish(u"Message-{0}".format(i))
time.sleep(0.6)
elif sys.argv[1] == "receive":
log.info("Receiving...")
receive()
else:
log.error(u"Usage: {0} publish|receive".format(sys.argv[0]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment