Skip to content

Instantly share code, notes, and snippets.

@anhdiepmmk
Last active December 2, 2021 08:00
Show Gist options
  • Save anhdiepmmk/3dbbbabe33e55705f74a4004c54103a3 to your computer and use it in GitHub Desktop.
Save anhdiepmmk/3dbbbabe33e55705f74a4004c54103a3 to your computer and use it in GitHub Desktop.
an example proof how we can do exponetial backoff retries with sqs
const _ = require("lodash");
const moment = require("moment");
// Load the AWS SDK for Node.js
const AWS = require("aws-sdk");
// Set the region to us-west-2
AWS.config.update({ region: "us-west-2" });
// Create the SQS service object
const sqs = new AWS.SQS({ apiVersion: "2012-11-05" });
const queueURL = "https://sqs.REGION.amazonaws.com/ACCOUNT-ID/Ố-MÀI-GÓT";
const params = {
AttributeNames: ["SentTimestamp"],
MaxNumberOfMessages: 1,
MessageAttributeNames: ["All"],
QueueUrl: queueURL,
};
const retryOptions = {
minTimeout: 30,
maxTimeout: 3600,
factor: 2,
randomize: true,
maxAttempts: 5,
};
sqs.receiveMessage(params, function (err, data) {
if (err) {
console.log("Receive Error", err);
} else {
const messages = _.get(data, "Messages", []);
_.forEach(messages, (message) => {
const attempt = _.chain(message)
.get("Attributes.ApproximateReceiveCount", 1)
.parseInt()
.value();
const maxAttempts = _.chain(retryOptions)
.get("maxAttempts", 5)
.add(1)
.value();
if (attempt <= maxAttempts) {
_processMessage(message)
.then(() => {
// remove message from queue if message was processed
_removeMessage(message);
})
.catch((e) => {
// do not thing then the message will be consume again after visibility timeout
// _removeMessage(message);
});
} else {
_removeMessage(message);
}
const timeout = _createTimout(attempt, retryOptions);
_changeMessageVisibility(message, timeout);
});
}
});
async function _processMessage(message) {
console.log("message-------", message.MessageId, moment().format());
// do some thing with this message
// if you faced with some errors then just throw and error (throw new Error('our partner server busy'))
// then the message will not be remove from the queue, the message will be consume again after visibility timeout
// if not just return truly value then message will be remove from queue
}
function _changeMessageVisibility(message, timeout) {
const visibilityParams = {
QueueUrl: queueURL,
ReceiptHandle: message.ReceiptHandle,
VisibilityTimeout: timeout,
};
return new Promise((resolve, reject) => {
sqs.changeMessageVisibility(visibilityParams, function (err, data) {
if (err) {
return reject(err);
}
return resolve(data);
});
});
}
function _removeMessage(message) {
return new Promise((resolve, reject) => {
sqs.deleteMessage(
{
QueueUrl: this.url,
ReceiptHandle: message.ReceiptHandle,
},
(err) => {
if (err) {
return reject(err);
}
return resolve();
}
);
});
}
// create exponential with exponetial backoff
function _createTimout(attempt, retryOptions) {
const minTimeout = _.get(retryOptions, "minTimeout", 30);
const maxTimeout = _.get(retryOptions, "maxTimeout", 12 * 60 * 60);
const factor = _.get(retryOptions, "factor", 2);
const randomize = _.get(retryOptions, "randomize", false);
const random = randomize ? Math.random() + 1 : 1;
return _.min([
_.parseInt(random * minTimeout * Math.pow(factor, attempt)),
maxTimeout,
]);
}
@anhdiepmmk
Copy link
Author

message------- 7d358100-fd0e-11eb-afc1-631828333b1a 2021-08-25T08:41:28+00:00
message------- 5f738d70-fdad-11eb-9001-67db5d303a99 2021-08-25T08:41:28+00:00
message------- 31bcbaf0-fd08-11eb-b489-3b768e7ea5db 2021-08-25T08:41:28+00:00
message------- 48adee50-fc9f-11eb-afc1-631828333b1a 2021-08-25T08:41:29+00:00

30s
message------- 7d358100-fd0e-11eb-afc1-631828333b1a 2021-08-25T08:41:58+00:00
message------- 5f738d70-fdad-11eb-9001-67db5d303a99 2021-08-25T08:41:58+00:00
message------- 31bcbaf0-fd08-11eb-b489-3b768e7ea5db 2021-08-25T08:42:00+00:00
message------- 48adee50-fc9f-11eb-afc1-631828333b1a 2021-08-25T08:42:00+00:00

60s
message------- 7d358100-fd0e-11eb-afc1-631828333b1a 2021-08-25T08:42:58+00:00
message------- 5f738d70-fdad-11eb-9001-67db5d303a99 2021-08-25T08:42:59+00:00
message------- 31bcbaf0-fd08-11eb-b489-3b768e7ea5db 2021-08-25T08:42:59+00:00
message------- 48adee50-fc9f-11eb-afc1-631828333b1a 2021-08-25T08:43:03+00:00

120s
message------- 7d358100-fd0e-11eb-afc1-631828333b1a 2021-08-25T08:44:59+00:00
message------- 5f738d70-fdad-11eb-9001-67db5d303a99 2021-08-25T08:44:59+00:00
message------- 31bcbaf0-fd08-11eb-b489-3b768e7ea5db 2021-08-25T08:45:00+00:00
message------- 48adee50-fc9f-11eb-afc1-631828333b1a 2021-08-25T08:45:03+00:00

240s
message------- 7d358100-fd0e-11eb-afc1-631828333b1a 2021-08-25T08:48:59+00:00
message------- 5f738d70-fdad-11eb-9001-67db5d303a99 2021-08-25T08:48:59+00:00
message------- 31bcbaf0-fd08-11eb-b489-3b768e7ea5db 2021-08-25T08:49:00+00:00
message------- 48adee50-fc9f-11eb-afc1-631828333b1a 2021-08-25T08:49:03+00:00

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment