Popular Pub/Sub message queue with clustering.
Written in Erlang, invented by Pivotal.
- Clustering - all nodes can be queried for all queues
- each queue lives on a single master node + mirrors
- oldest mirror is promoted on master failure
- single master guarantees FIFO ordering in queue
- exchanges + bindings exist on all nodes
- cluster formation:
- static:
- config file
- rabbitmqctl commands
- dynamic:
- DNS
- via plugins:
- AWS EC2
- Kubernetes
- Consul
- Etcd
- HA - replication - mirrored queues - requires nodes to be in a cluster
- Scaling:
- Sharding
- built on hashing exchange (built-in plugin)
- plugins:
rabbitmq-autocluster
rabbitmq-clusterer
- Sharding
- Performance limitation - buffering in RAM backlogs
vm_memory_high_watermark
- resync takes ages
- AMQP 0.9 (ActiveMQ implements AMQP 1.0)
- Kafka has much better performance and durability
Port | Description |
---|---|
5672 | RabbitMQ AMQP |
15672 | Management Plugin UI / Rest API (3.0+) |
55672 | Management Plugin UI / Rest API (2.x) |
Dumb broadcast to all queues, ignores routing_key
.
Sends to queues with routing_key
binding using exact match to message routing_key
as sent by publisher.
- multiple bindings for same queue, each with different
routing_key
- allows same queue to collect selection of messages from exchange
- multiple queues bound to same
routing_key
- delivers same message to all queues bound with that
routing_key
- WARNING: messages with a
routing_key
with no queues bound for thatrouting_key
will be discarded!
Flexible complex routing, sends to queues with matching routing_key
bindings.
- matching logic:
*
represents 1 word#
represents zero or more words
routing_key
- words separated by dots, 255 bytes
Always use Topic exchange - it's the most flexible
- can do Danout tyle with just
#
routing_keys - can do Direct simple behaviour
- can do multi-part matches
Bind exchanges to other exchanges with routing keys just like queues!
See Pika RabbitMQ Python API:
rabbitmqctl status
rabbitmqctl cluster_status
rabbitmqctl list_exchanges
rabbitmqctl list_queues
rabbitmqctl list_bindings
Check for unacknowledged messages (catches client that forgets to send ack in consumer callback function):
rabbitmqctl list_queues name messages_ready messages_unacknowledged
http://www.rabbitmq.com/management-cli.html
Enable web UI / REST API - port 55672 or 15672 (3.0+)
guest
/ guest
username password combination only works on localhost
RABBITMQ_DEFAULT_USER
/ RABBITMQ_DEFAULT_PASS
environment variables are supported by official RabbitMQ images and
probably available in environment:
RABBITMQ_USER="${RABBITMQ_USER:-${RABBITMQ_DEFAULT_USER:-rabbituser}}"
RABBITMQ_PASSWORD="${RABBITMQ_PASSWORD:-${RABBITMQ_DEFAULT_PASS:-rabbitpw}}"
rabbitmqctl add_user "$RABBITMQ_USER" "$RABBITMQ_PASSWORD"
rabbitmqctl set_user_tags "$RABBITMQ_USER" administrator
/usr/lib/rabbitmq/bin/rabbitmq-plugins enable rabbitmq_management
service rabbitmq restart
or on newer Linux systems:
systemctl restart rabbitmq
Python CLI to the HTTP Rest Managment API:
rabbitmqadmin --help
requires Management plugin to be enabled above
Download rabbitmqadmin
from http://$HOST:15672/cli/
Bash Completion:
sudo sh -c 'rabbitmqadmin --bash-completion > /etc/bash_completion.d/rabbitmqadmin'
rabbitmqadmin -H "$HOST" -u "rabbituser" -p "rabbitpw" list vhosts
with specific cols:
rabbitmqadmin list queues vhost name node messages message_stats.publish_details.rate
One per line:
rabbitmqadmin -f long -d 3 list queues
Declare exchange:
rabbitmqadmin declare exchange name=my-new-exchange type=fanout
Declare queue:
rabbitmqadmin declare queue name=my-new-queue durable=false
Publish test message:
rabbitmqadmin publish exchange=amq.default routing_key="test" payload="hello, world"
Get test message:
rabbitmqadmin get queue="test" requeue=false
Close all connections:
rabbitmqadmin -f tsv -q list connections name |
while read conn; do
rabbitmqadmin -q close connection name="${conn}"
done
rabbitmqadmin export rabbit.config
rabbitmqadmin import rabbit.config
Use the Management UI enabled above for interactive monitoring.
See HariSekhon/Nagios Plugins for a selection of Nagios plugins for RabbitMQ.
Official AMQP client:
import pika
credentials = pika.PlainCredentials(user, password)
parameters = pika.ConnectionParameters('192.168.99.100', credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.exchange_declare(exchange='logs', type='fanout')
Generate a random queue name just for our process by omitting the name, set exclusive so it's deleted after disconnect.
Needed to add subscriber to Fanout pub-sub or Direct Exchange.
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue # amq.gen-XXXX....
Bind personal queue to fanout exchange logs
to get all the messages from it:
channel.queue_bind(exchange='logs', queue = result.method.queue)
channel.exchange_declare(exchange='direct_logs', type='direct')
channel.queue_bind(exchange='myExchange', queue='myQueue', routing_key='blah')
Idempotent queue creation (re-runnable, as long as queue params aren't different)
Must define queue in client before using in case producer hasn't run to create queue yet
WARNING: Queues are non-durable by default and disappear on restart!
result = channel.queue_declare(queue='test', durable=True)
assert result.method.queue == 'test'
def my_callback(ch, method, properties, body):
# do stuff
channel.basic_ack(delivery_msg = method.delivery_msg)
Don't give more than one message to consumer in case some get more heavy messages and others get light messages, this way the next free worker gets the next message.
channel.basic_qos(prefetch_count=1)
Configure consumer to call my_callback
function for each message
requires ack by default:
channel.basic_consume(my_callback, queue='test') #, no_ack=True)
Start consuming to loop reading messages from queue indefinitely, calls my_callback
method for each message
requires ack by default:
channel.start_consuming()
This is only the number of messages that consumer can fetch in BlockChannel without blocking, not check worthy - not sure why this is really here:
channel.get_waiting_message_count()
WARNING: Sending msg to non-existent queue will just trash the message!
WARNING: Queues are non-durable by default and disappear on restart!
channel.queue_declare(queue='test', durable=True)
Nameless exchange sends msg to queue with same name as routing_key:
channel.basic_publish(exchange='',
routing_key='test',
body='test message',
properties=pika.BasicProperties(
delivery_mode = 2 # persistent
)
)
# flush
# TODO: is there a way to flush without closing connection?
connection.close()
From the HariSekhon/Diagrams-as-Code repo:
Ported from private Knowledge Base pages 2013+