Skip to content

Instantly share code, notes, and snippets.

@ssut
Created March 3, 2019 14:42
Show Gist options
  • Save ssut/c536de2a633d5889bdcc8192c0c95265 to your computer and use it in GitHub Desktop.
Save ssut/c536de2a633d5889bdcc8192c0c95265 to your computer and use it in GitHub Desktop.
RabbitMQ Exponential Backoff (TypeScript) from https://gist.github.com/mpskovvang/6f48b60338d08781b476785455436080
import * as amqp from 'amqplib';
export class ExponentialBackoffHelper {
private maxDelay: number = 60;
private factor: number = 2;
private maxAttempts: number = 5;
constructor(
private channel: amqp.Channel,
private queue: string,
private exchange: string,
private routingKey: string,
{
maxDelay = 60,
factor = 2,
maxAttempts = 5,
} = {},
) {
this.maxDelay = maxDelay;
this.factor = factor;
this.maxAttempts = maxAttempts;
}
public acknowledge(message: amqp.Message) {
return this.channel.ack(message, false);
}
public reject(message: amqp.Message, requeue = false) {
return this.retry(message);
}
public error(message: amqp.Message) {
return this.retry(message);
}
public timeout(message: amqp.Message) {
return this.retry(message);
}
private async retry(message: amqp.Message) {
const attempts = this.deaths(message);
console.info('attempts', attempts, this.maxAttempts);
if (attempts < this.maxAttempts) {
const delay = this.delay(attempts);
const routingKey = `${this.queue}.${delay}`;
const queue = await this.createRetryQueue(delay);
await this.channel.bindQueue(queue.queue, this.exchange, routingKey);
await this.channel.publish(this.exchange, routingKey, message.content, {
headers: {
...message.properties.headers,
},
});
console.info('ack?', delay, routingKey, queue.queue);
this.acknowledge(message);
// return this.channel.reject();
} else {
console.info('max attempt exceeded');
await this.channel.reject(message, false);
}
}
private deaths(message: amqp.Message) {
const {
properties: {
headers,
},
} = message;
if (!headers || headers['x-death'] === undefined) {
return 0;
}
console.info(headers['x-death']);
const count = headers['x-death'].reduce((accum, death) => death.queue.includes(this.queue) ? accum + death.count : accum, 0);
return count;
}
private createRetryQueue(delay: number) {
const queue = `${this.queue}.retry.${delay}`;
return this.channel.assertQueue(queue, {
durable: true,
arguments: {
'x-dead-letter-exchange': this.exchange,
'x-dead-letter-routing-key': this.queue,
'x-message-ttl': delay * 1000,
'x-expires': delay * 1000 * 2,
},
});
}
public delay(attempts: number) {
return Math.min(this.maxDelay, Math.pow(attempts + 1, this.factor));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment