Last active
August 31, 2024 02:04
-
-
Save materkel/f46fdf266f35d8c525aea16719f837ac to your computer and use it in GitHub Desktop.
Scheduling messages with RabbitMQ, using the rabbitmq_delayed_message_exchange plugin and amqplib in NodeJS
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Install and enable the rabbitmq_delayed_message_exchange plugin as described by Alvaro Videla in this blogpost: | |
* https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/ | |
*/ | |
const amqp = require('amqplib'); | |
const exchange = 'yourExchangeName'; | |
const queue = 'yourQueueName'; | |
const queueBinding = 'yourQueueBindingName'; | |
// Message consumer | |
amqp | |
.connect('amqp://localhost') | |
.then(conn => conn.createChannel()) | |
.then(ch => { | |
// Assert a x-delayed-message Exchange. The type of the exchange is specified in the arguments as "x-delayed-type" | |
ch.assertExchange(exchange, 'x-delayed-message', { durable: true, arguments: { 'x-delayed-type': 'direct' } }); | |
return ch | |
.assertQueue(queue, { durable: true }) | |
.then(() => ch.bindQueue(queue, exchange, queueBinding)) | |
.then(() => { | |
ch.consume(queue, (msg) => { | |
// Handle delayed message | |
// ... | |
}, { noAck: true }); | |
}); | |
}); | |
// Publish message | |
amqp | |
.connect('amqp://localhost') | |
.then(conn => conn.createChannel()) | |
.then(ch => { | |
// Publish message with a delay of 500 ms | |
const headers = { 'x-delay': 500 }; | |
ch.publish(exchange, queueBinding, new Buffer('hello world'), { headers }); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks for this