Last active
May 3, 2023 13:40
-
-
Save maheshgattani/664f78be9e99f6f93e74 to your computer and use it in GitHub Desktop.
RabbitMQ back off and retry
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import rabbitpy | |
import time | |
import random | |
# | |
# This script uses the DLX and TTL concepts in RabbitMQ to create | |
# a back off and retry logic for queue consumer. | |
# | |
# This script nack's every message it gets without requeue and | |
# then pushes the message to one of the waiting queues. Which waiting | |
# queue is found after prying on the message header and finding out | |
# which retry this is. | |
# | |
# In this example, the back offs happen for 1 second, then 3 seconds, | |
# then 10 seconds, and then every 10 seconds. Updating the global | |
# expirations array will automatically be implemented by script by creating | |
# new queues and new bindings | |
# | |
# Recommended reading: | |
# https://www.rabbitmq.com/dlx.html | |
# https://www.rabbitmq.com/ttl.html | |
# http://globaldev.co.uk/2014/07/back-off-and-retry-with-rabbitmq/ | |
# | |
expirations = [1, 3, 10] # in seconds. used as backoffs | |
def publish_to_waiting_exchange_and_declare_queue(ch, message): | |
routing_key = message.routing_key | |
# find the right expiration time for next iteration. Also the routing key for next iteration. | |
if message.properties and "headers" in message.properties and message.properties["headers"] and "x-death" in message.properties["headers"]: | |
headers = message.properties["headers"] | |
tries = len(headers["x-death"]) | |
print tries | |
# Compute the next back off | |
if tries <= len(expirations) - 1: | |
next_expiration = expirations[tries] | |
else: | |
next_expiration = expirations[len(expirations) - 1] | |
# Compute the original routing key | |
first_routing_key = headers["x-death"][len(headers["x-death"]) - 1]["routing-keys"][0] | |
first_routing_key_split = first_routing_key.split("_") | |
routing_key = "_".join(first_routing_key_split[1:]) | |
else: | |
next_expiration = expirations[0] | |
# use the back off time found earlier and do modifications | |
print "backing off" | |
backoff = str(next_expiration * 1000) | |
backoff_hash = {'expiration': backoff} | |
print backoff | |
# create a new waiting queue dead lettering to primary exchange | |
print "waiting queue declared" | |
queue = rabbitpy.Queue(ch, "primary_queue_waiting_queue_"+backoff, durable = True, dead_letter_exchange = "primary_exchange") | |
queue.declare() | |
# bind the queue to waiting exchange | |
print "waiting queue bound to waiting exchange" | |
new_routing_key = backoff + "_" + routing_key | |
print new_routing_key | |
queue.bind("waiting_exchange", new_routing_key) | |
# publish the message on the waiting exchange. updating the expiration flag so enable waiting | |
print "message published" | |
properties = message.properties | |
properties.update(backoff_hash) | |
new_message = rabbitpy.Message(ch, message.body, properties) | |
new_message.publish("waiting_exchange", new_routing_key) | |
# This is needed when we use the same routing key because in that case all | |
# the queues start getting the message, since same routing key. | |
#print "queue unbound" | |
#queue.unbind("waiting_exchange", new_routing_key) | |
with rabbitpy.Connection('amqp://guest:guest@localhost:5672/%2fbirchbox-event-bus') as conn: | |
# Use the channel as a context manager | |
with conn.channel() as channel: | |
waiting_exchange = rabbitpy.Exchange(channel, 'waiting_exchange', durable = True, exchange_type = "topic") | |
waiting_exchange.declare() | |
primary_exchange = rabbitpy.Exchange(channel, 'primary_exchange', durable = True, exchange_type = "topic") | |
primary_exchange.declare() | |
# Create the queue | |
queue = rabbitpy.Queue(channel, 'primary_queue', durable = True) | |
queue.declare() | |
# Bind the queue | |
queue.bind("primary_exchange", "routing_key") | |
for expiration in expirations: | |
queue.bind("primary_exchange", str(expiration*1000) + "_routing_key") | |
try: | |
# Consume the messages | |
for message in queue.consume(): | |
print message | |
publish_to_waiting_exchange_and_declare_queue(channel, message) | |
message.ack() | |
except KeyboardInterrupt: | |
print 'Exited consumer' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@maheshgattani