Created
March 3, 2019 14:42
-
-
Save ssut/c536de2a633d5889bdcc8192c0c95265 to your computer and use it in GitHub Desktop.
RabbitMQ Exponential Backoff (TypeScript) from https://gist.github.com/mpskovvang/6f48b60338d08781b476785455436080
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import * as amqp from 'amqplib'; | |
export class ExponentialBackoffHelper { | |
private maxDelay: number = 60; | |
private factor: number = 2; | |
private maxAttempts: number = 5; | |
constructor( | |
private channel: amqp.Channel, | |
private queue: string, | |
private exchange: string, | |
private routingKey: string, | |
{ | |
maxDelay = 60, | |
factor = 2, | |
maxAttempts = 5, | |
} = {}, | |
) { | |
this.maxDelay = maxDelay; | |
this.factor = factor; | |
this.maxAttempts = maxAttempts; | |
} | |
public acknowledge(message: amqp.Message) { | |
return this.channel.ack(message, false); | |
} | |
public reject(message: amqp.Message, requeue = false) { | |
return this.retry(message); | |
} | |
public error(message: amqp.Message) { | |
return this.retry(message); | |
} | |
public timeout(message: amqp.Message) { | |
return this.retry(message); | |
} | |
private async retry(message: amqp.Message) { | |
const attempts = this.deaths(message); | |
console.info('attempts', attempts, this.maxAttempts); | |
if (attempts < this.maxAttempts) { | |
const delay = this.delay(attempts); | |
const routingKey = `${this.queue}.${delay}`; | |
const queue = await this.createRetryQueue(delay); | |
await this.channel.bindQueue(queue.queue, this.exchange, routingKey); | |
await this.channel.publish(this.exchange, routingKey, message.content, { | |
headers: { | |
...message.properties.headers, | |
}, | |
}); | |
console.info('ack?', delay, routingKey, queue.queue); | |
this.acknowledge(message); | |
// return this.channel.reject(); | |
} else { | |
console.info('max attempt exceeded'); | |
await this.channel.reject(message, false); | |
} | |
} | |
private deaths(message: amqp.Message) { | |
const { | |
properties: { | |
headers, | |
}, | |
} = message; | |
if (!headers || headers['x-death'] === undefined) { | |
return 0; | |
} | |
console.info(headers['x-death']); | |
const count = headers['x-death'].reduce((accum, death) => death.queue.includes(this.queue) ? accum + death.count : accum, 0); | |
return count; | |
} | |
private createRetryQueue(delay: number) { | |
const queue = `${this.queue}.retry.${delay}`; | |
return this.channel.assertQueue(queue, { | |
durable: true, | |
arguments: { | |
'x-dead-letter-exchange': this.exchange, | |
'x-dead-letter-routing-key': this.queue, | |
'x-message-ttl': delay * 1000, | |
'x-expires': delay * 1000 * 2, | |
}, | |
}); | |
} | |
public delay(attempts: number) { | |
return Math.min(this.maxDelay, Math.pow(attempts + 1, this.factor)); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment