Last active
February 9, 2021 18:23
-
-
Save javierarilos/9348168 to your computer and use it in GitHub Desktop.
RabbitMq messaging concepts using Python + pika
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pika import BlockingConnection, ConnectionParameters, BasicProperties | |
########################################################################## | |
# RABBITMQ & AMQP INTRO - MESSAGING PATTERNS | |
# http://www.slideshare.net/javierarilos/rabbitmq-intromsgingpatterns | |
# | |
# ******** Escenario 1: | |
# * | |
# Using pika we are going to create a exchange named 'important', bind it to a queue named 'important-jobs'. | |
# Finally we will produce (to exchange) and consume (from queue) a message. | |
# escenario 1 - step 1 - connect & channel setup | |
conn = BlockingConnection(ConnectionParameters('localhost')) | |
ch = conn.channel() | |
# escenario 1 - step 2 - declare exchange | |
ch.exchange_declare(exchange='important', type='direct') | |
# escenario 1 - step 3 - declare the queue | |
ch.queue_declare(queue='important-jobs') | |
# escenario 1 - step 4 - bind queue and exchange | |
ch.queue_bind(exchange='important', queue='important-jobs', routing_key='important') | |
# escenario 1 - step 5 - produce the message | |
ch.basic_publish(exchange='important', routing_key='important', body='new important task') | |
# escenario 1 - step 6 - consume the message | |
method_frame, header_frame, body = ch.basic_get('important-jobs') | |
print "msg received from queue 'important-jobs' : ", body | |
# escenario 1 - step 6 - acknowledge the message | |
ch.basic_ack(method_frame.delivery_tag) | |
# ******** Escenario 2: | |
# * | |
# All important messages have to be sent also to a new queue named traces. | |
# We will produce an important message and consume it from both queues: important-jobs and traces | |
# escenario 2 - step 1 - create and bind new traces queue, send an important message | |
ch.queue_declare(queue='traces') | |
ch.queue_bind(exchange='important', queue='traces', routing_key='important') | |
ch.basic_publish(exchange='important', routing_key='important', body='[another task to be handled, important]') | |
# escenario 2 - step 2 - consume message from both queues | |
method_frame, header_frame, important_job = ch.basic_get('important-jobs') | |
print "msg received from queue 'important-jobs' : ", important_job | |
ch.basic_ack(method_frame.delivery_tag) | |
method_frame, header_frame, trace = ch.basic_get('traces') | |
print "msg received from queue 'traces' : ", trace | |
ch.basic_ack(method_frame.delivery_tag) | |
# ******** Escenario 3: | |
# * | |
# customer messages to important exchange must be routed to | |
# different queues depending on the operation to perform (signup, update) and also to traces | |
# escenario 3 - step 1 - bind 'traces' to exchange 'important' and 'customer' routing key, create customer queues | |
ch.queue_bind(exchange='important', queue='traces', routing_key='customer') | |
ch.queue_declare(queue='signup') | |
ch.queue_declare(queue='update') | |
# escenario 3 - step 2 - create new exchange 'customer' of type headers, bind it to 'important' on routing key 'customer' | |
ch.exchange_declare(exchange='customer', type='headers') | |
ch.exchange_bind(source='important', destination='customer', routing_key='customer') | |
ch.queue_bind(exchange='customer', queue='signup', routing_key='', arguments={'operation': 'signup', 'x-match':'any'}) | |
ch.queue_bind(exchange='customer', queue='update', routing_key='', arguments={'operation': 'update', 'x-match':'any'}) | |
# escenario 3 - step 3 - sending a customer signup message (with headers), consume it (from 'signup' and 'traces') | |
ch.basic_publish(exchange='important', routing_key='customer', body='this is our new customer num=25', | |
properties=BasicProperties(headers={'operation': 'signup'})) | |
method_frame, header_frame, msg = ch.basic_get('signup') | |
print "msg received from queue 'signup' : ", msg | |
ch.basic_ack(method_frame.delivery_tag) | |
method_frame, header_frame, trace = ch.basic_get('traces') | |
print "msg received from queue 'traces' : ", trace | |
ch.basic_ack(method_frame.delivery_tag) | |
# ******** Escenario 4: | |
# * | |
# RabbitMQ deadleter exchanges. | |
# Messages in important-jobs queue that cannot be handled will be rejected by client and sent to rejected-jobs exchange. | |
# escenario 4 - step 1 - create rejected-jobs exchange and queue | |
ch.exchange_declare(exchange='rejected-jobs', type='direct') | |
ch.queue_declare(queue='rejected-jobs') | |
ch.queue_bind(exchange='rejected-jobs', queue='rejected-jobs', routing_key='important') | |
# escenario 4 - step 2 - redeclare important-jobs queue to deadletter messages, redeclare bindings | |
ch.queue_delete('important-jobs') | |
ch.queue_declare(queue='important-jobs', arguments={'x-dead-letter-exchange': 'rejected-jobs'}) | |
ch.queue_bind(exchange='important', queue='important-jobs', routing_key='important') | |
# escenario 4 - step 3 - publish to important-jobs, consumer rejects the message | |
ch.basic_publish(exchange='important', routing_key='important', body='[unparseable message]') | |
method_frame, header_frame, important_job = ch.basic_get('important-jobs') | |
print "UNPARSEABLE msg received from queue 'important-jobs' : ", important_job, " >> rejecting msg" | |
ch.basic_reject(method_frame.delivery_tag, requeue=False) | |
# escenario 4 - step 4 - message was routed to rejected-jobs | |
method_frame, header_frame, rejected_job = ch.basic_get('rejected-jobs') | |
print "i know what to do with unparseable messages: received in 'rejected-jobs' : ", rejected_job | |
ch.basic_ack(method_frame.delivery_tag) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment