Created
November 12, 2021 22:11
-
-
Save tiagobnobrega/82f098c232d47424d3f1b6e643ef6e27 to your computer and use it in GitHub Desktop.
Rabbit MQ consumer with Retry
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const amqp = require('amqp-connection-manager'); | |
/** | |
* * Creates an instance of a rabbitMQ manager | |
* @param rabbitUrl - rabbit URL connection | |
* @param systemName - Optional prefix used for shared topology elements (wait queues and retry dispatcher dlx) | |
* @param onConnect - Optional connection callback | |
* @param onDisconnect - Optional disconnection callback | |
* @returns {{sendMessage: (function(*=, *=): Promise<boolean>), consumeWithRetry: ((function(*=, *, *=): Promise<void>)|*)}} | |
*/ | |
function rabbitManager(rabbitUrl,{systemName='', onConnect, onDisconnect}){ | |
//Connect to rabbitMQ | |
const connection = amqp.connect([rabbitUrl]); | |
connection.on('connect', onConnect || (() => console.log(`Connected to ${rabbitUrl}`))); | |
connection.on('disconnect', onDisconnect || (err => console.log('Disconnected from rabbitMQ.', err.stack))); | |
//create a single channel for communication | |
const channel = connection.createChannel({ | |
json: true, | |
}); | |
/** | |
* Build the error queue name for unprocessable messages after retry | |
* @param queueName - Name of the main queue | |
* @returns {`${string}_error`} - Error queue name | |
*/ | |
const getQueueErrorQueueName = (queueName) => `${queueName}_error`; | |
/** | |
* Returns a setup function for queueName to assert the topology | |
* Calling this with different waitIntervalSec will crete different wait queues | |
* @param queueName - Name of the main queue | |
* @param waitIntervalSec - Time in seconds the message should wait to retry | |
* @returns {(function(*): Promise<void>)|*} Setup function for queueName to assert the topology | |
*/ | |
const setupForQueue = (queueName,waitIntervalSec)=>{ | |
return async (ch)=>{ | |
const queueRetryDLXName = `${queueName}_retry-dlx`; | |
const systemWaitQueueName = `${systemName}-wait-queue-${waitIntervalSec}`; | |
const retryDispatcherDLXName = `${systemName}_retry-dispatcher-dlx`; | |
//* routing key of the retry DLX | |
const queueRetryDLX_DLRoutingKey = `rt-${waitIntervalSec}.${queueName}`; | |
//*assert topology | |
await Promise.all([ | |
//* asset queue retry-dlx | |
ch.assertExchange(queueRetryDLXName,"topic", | |
{ | |
arguments:{ | |
'x-dead-letter-routing-key': queueRetryDLX_DLRoutingKey, | |
}, | |
// deadLetterRoutingKey:queueRetryDLX_DLRoutingKey, | |
durable: true, | |
noAck: true | |
}), | |
//* assert system-retry-dlx | |
ch.assertExchange(retryDispatcherDLXName,"topic", | |
{ | |
durable: true, | |
noAck: true | |
}), | |
//* assert main queue | |
ch.assertQueue(queueName, | |
{ | |
durable: true , | |
deadLetterExchange: queueRetryDLXName, // bind queue -> retry-dlx | |
deadLetterRoutingKey: queueRetryDLX_DLRoutingKey, | |
}), | |
//* assert queue error queue | |
ch.assertQueue(getQueueErrorQueueName(queueName), | |
{ | |
durable: true , | |
deadLetterExchange: queueRetryDLXName // bind queue -> retry-dlx | |
}), | |
//* assert system-wait-queue | |
ch.assertQueue(systemWaitQueueName, | |
{ | |
durable: true , | |
deadLetterExchange: retryDispatcherDLXName, // bind system-wait-queue -> system-retry-dlx | |
messageTtl: waitIntervalSec*1_000 | |
}), | |
//* binds | |
//* bind retry-dlx -> system-wait-queue | |
ch.bindQueue(systemWaitQueueName,queueRetryDLXName,`rt-${waitIntervalSec}.#`), | |
//* bind system-retry-dlx -> queue | |
ch.bindQueue(queueName,retryDispatcherDLXName,`#.${queueName}`), | |
]) | |
} | |
} | |
/** | |
* Attempt to get death count from message. This is only available from rabbit 3.8 or above | |
* @param data | |
* @returns {((label?: string) => void)|number|((key?: (IDBValidKey | IDBKeyRange)) => IDBRequest<number>)|number|number} | |
*/ | |
const getDeathCount = (data)=>{ | |
if(!data || !data.properties || !data.properties.headers || !data.properties.headers['x-death'] ) return 0; | |
// console.log(`getDeathCount:: x-death: ${JSON.stringify(data.properties.headers['x-death'])}`) | |
try { | |
return data.properties.headers['x-death'][0].count || 0; | |
}catch (e){ | |
console.error('rabbitManager.getDeathCount:: ERROR:',e); | |
return 0 | |
} | |
} | |
const consumeWithRetryDefaultOptions = {waitIntervalSec:60*5, maxRetries:5}; | |
/** | |
* Assert the topology necessary for the retry and registers the callback as consumer of the queue | |
* @param queueName - Name of the queue to be consumed | |
* @param callback - Consumer callback called with the parsed json data. Can return a Promise to be awaited | |
* @param options - Options for retry strategy | |
* waitIntervalSec - Wait time for messages to be retried. A new queue is created everytime a this receives a new value | |
* maxRetries - Maximum number of retries before sending the message to the error queue | |
* @returns {Promise<void>} | |
*/ | |
const consumeWithRetry = async (queueName, callback, options=consumeWithRetryDefaultOptions) =>{ | |
const {waitIntervalSec, maxRetries} = {...consumeWithRetryDefaultOptions, ...options}; | |
const setupFn = setupForQueue(queueName,waitIntervalSec); | |
console.log(`rabbitManager:: adding queue ${queueName} setup.`) | |
const consumerChannel = connection.createChannel({ | |
json: true, | |
}); | |
await consumerChannel.addSetup(setupFn); | |
console.log(`rabbitManager:: registering queue ${queueName} consumer.`) | |
await consumerChannel.consume(queueName, async (data)=>{ | |
const deathCount = getDeathCount(data); | |
console.log(`rabbitManager::Message received: ${data.content} - [deaths:${deathCount}/${maxRetries}]`) | |
if(deathCount>=maxRetries){ | |
console.log(`rabbitManager:: maximum retries reached (deaths: ${deathCount}) for message: ${data.content.toString()}, publishing to error queue...`); | |
//* Publish to error queue if max retries reached | |
await consumerChannel.sendToQueue(getQueueErrorQueueName(queueName), data.content); | |
//* ack message to remove from original queue | |
consumerChannel.ack(data); | |
return; | |
} | |
try{ | |
const message = JSON.parse(data.content.toString()); | |
await Promise.resolve(callback(message)); | |
consumerChannel.ack(data); | |
}catch (e){ | |
//*On error, nack the message with requeue false to make it go to retry-dlx directly | |
console.log(`rabbitManager:: nacking message: ${data.content.toString()}`); | |
consumerChannel.nack(data,false,false); | |
} | |
}) | |
} | |
const sendMessage = async (queueName, message)=>{ | |
return channel.sendToQueue(queueName, message) | |
} | |
return {consumeWithRetry, sendMessage} | |
} | |
module.exports = rabbitManager; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment