Last active
August 31, 2024 02:04
-
-
Save materkel/f46fdf266f35d8c525aea16719f837ac to your computer and use it in GitHub Desktop.
Scheduling messages with RabbitMQ, using the rabbitmq_delayed_message_exchange plugin and amqplib in NodeJS
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Install and enable the rabbitmq_delayed_message_exchange plugin as described by Alvaro Videla in this blogpost: | |
* https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/ | |
*/ | |
const amqp = require('amqplib'); | |
const exchange = 'yourExchangeName'; | |
const queue = 'yourQueueName'; | |
const queueBinding = 'yourQueueBindingName'; | |
// Message consumer | |
amqp | |
.connect('amqp://localhost') | |
.then(conn => conn.createChannel()) | |
.then(ch => { | |
// Assert a x-delayed-message Exchange. The type of the exchange is specified in the arguments as "x-delayed-type" | |
ch.assertExchange(exchange, 'x-delayed-message', { durable: true, arguments: { 'x-delayed-type': 'direct' } }); | |
return ch | |
.assertQueue(queue, { durable: true }) | |
.then(() => ch.bindQueue(queue, exchange, queueBinding)) | |
.then(() => { | |
ch.consume(queue, (msg) => { | |
// Handle delayed message | |
// ... | |
}, { noAck: true }); | |
}); | |
}); | |
// Publish message | |
amqp | |
.connect('amqp://localhost') | |
.then(conn => conn.createChannel()) | |
.then(ch => { | |
// Publish message with a delay of 500 ms | |
const headers = { 'x-delay': 500 }; | |
ch.publish(exchange, queueBinding, new Buffer('hello world'), { headers }); | |
}); |
Is there a way with RabbitMQ not only to delay message, but schedule them, so e.g. once a day?
Nice example.
Thanks for that!
Thank you for the example. However, I got the following error trying to run your code:
Error: Connection closed: 503 (COMMAND-INVALID) with message "COMMAND_INVALID - unknown exchange type 'x-delayed-message'"
at Object.accept (/opt/app/node_modules/amqplib/lib/connection.js:90:15)
at Connection.mainAccept [as accept] (/opt/app/node_modules/amqplib/lib/connection.js:63:33)
at Socket.go (/opt/app/node_modules/amqplib/lib/connection.js:476:48)
at emitNone (events.js:86:13)
at Socket.emit (events.js:185:7)
at emitReadable_ (_stream_readable.js:432:10)
at emitReadable (_stream_readable.js:426:7)
at readableAddChunk (_stream_readable.js:187:13)
at Socket.Readable.push (_stream_readable.js:134:10)
at TCP.onread (net.js:551:20)
Callback API version of code
const exchange = 'yourExchangeName';
const queue = 'yourQueueName';
const queueBinding = 'yourQueueBindingName';
require('amqplib/callback_api')
.connect('amqp://localhost', function (err, conn) {
if (err != null) bail(err);
consumer(conn);
publisher(conn);
});
function bail(err) {
console.error(err);
process.exit(1);
}
// Message consumer
function consumer(conn) {
var ok = conn.createChannel(on_open);
function on_open(err, ch) {
if (err != null) bail(err);
ch.assertExchange(exchange, 'x-delayed-message', { durable: true, arguments: { 'x-delayed-type': 'direct' } });
ch.assertQueue(queue, { durable: true });
ch.bindQueue(queue, exchange, queueBinding);
ch.consume(queue, function (msg) {
if (msg !== null) {
console.log(msg.content.toString());
ch.ack(msg);
}
});
}
}
// Publish message
function publisher(conn) {
conn.createChannel(on_open);
function on_open(err, ch) {
if (err != null) bail(err);
const headers = { 'x-delay': 10000 };
ch.publish(exchange, queueBinding, new Buffer('hello 10sn from past'), { headers });
}
}
Is there a way with RabbitMQ not only to delay message, but schedule them, so e.g. once a day?
Could be easily done with cron jobs. Either with crontab or modules like https://github.com/kelektiv/node-cron
Thanks for this
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
awesome.