-
-
Save thangtv611/9007362d93797ff2ba598f2176682d6e to your computer and use it in GitHub Desktop.
Scheduling messages with RabbitMQ, using the rabbitmq_delayed_message_exchange plugin and amqplib in NodeJS
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Install and enable the rabbitmq_delayed_message_exchange plugin as described by Alvaro Videla in this blogpost: | |
* https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/ | |
*/ | |
const amqp = require('amqplib'); | |
const exchange = 'yourExchangeName'; | |
const queue = 'yourQueueName'; | |
const queueBinding = 'yourQueueBindingName'; | |
// Message consumer | |
amqp | |
.connect('amqp://localhost') | |
.then(conn => conn.createChannel()) | |
.then(ch => { | |
// Assert a x-delayed-message Exchange. The type of the exchange is specified in the arguments as "x-delayed-type" | |
ch.assertExchange(exchange, 'x-delayed-message', { durable: true, arguments: { 'x-delayed-type': 'direct' } }); | |
return ch | |
.assertQueue(queue, { durable: true }) | |
.then(() => ch.bindQueue(queue, exchange, queueBinding)) | |
.then(() => { | |
ch.consume(queue, (msg) => { | |
// Handle delayed message | |
// ... | |
}, { noAck: true }); | |
}); | |
}); | |
// Publish message | |
amqp | |
.connect('amqp://localhost') | |
.then(conn => conn.createChannel()) | |
.then(ch => { | |
// Publish message with a delay of 500 ms | |
const headers = { 'x-delay': 500 }; | |
ch.publish(exchange, queueBinding, new Buffer('hello world'), { headers }); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment