Skip to content

Instantly share code, notes, and snippets.

@biniama
Forked from quiver/new_task.py
Created September 13, 2022 16:04
Show Gist options
  • Save biniama/3ec926f81a506ddac5ff1fef9167ff6d to your computer and use it in GitHub Desktop.
Save biniama/3ec926f81a506ddac5ff1fef9167ff6d to your computer and use it in GitHub Desktop.
rabbitmq : dead letter exchange example with python/pika
#!/usr/bin/env python
# http://www.rabbitmq.com/tutorials/tutorial-two-python.html
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
)
print " [x] Sent %r" % (message,)
connection.close()
#!/usr/bin/env python
# http://www.rabbitmq.com/tutorials/tutorial-two-python.html
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='dlx',
type='direct')
result = channel.queue_declare(queue='dl')
queue_name = result.method.queue
channel.queue_bind(exchange='dlx',
routing_key='task_queue', # x-dead-letter-routing-key
queue=queue_name)
print ' [*] Waiting for dead-letters. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] %r" % (properties,)
print " [reason] : %s : %r" % (properties.headers['x-death'][0]['reason'], body)
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(callback,
queue='dl')
channel.start_consuming()
#!/usr/bin/env python
# http://www.rabbitmq.com/tutorials/tutorial-two-python.html
import pika
import time
import random
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue',
arguments={
'x-message-ttl' : 1000,
"x-dead-letter-exchange" : "dlx",
# "x-dead-letter-routing-key" : "dl", # if not specified, queue's routing-key is used
}
)
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
if random.random() < 0.5:
ch.basic_ack(delivery_tag = method.delivery_tag)
time.sleep(5)
print " [x] Done"
else:
ch.basic_reject(delivery_tag = method.delivery_tag, requeue=False)
print " [x] Rejected"
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment