Skip to content

Instantly share code, notes, and snippets.

@carlhoerberg
Created May 13, 2015 14:45
Show Gist options
  • Save carlhoerberg/006b01ac17a0a94859ba to your computer and use it in GitHub Desktop.
Save carlhoerberg/006b01ac17a0a94859ba to your computer and use it in GitHub Desktop.
How to build reconnect logic for amqplib
var amqp = require('amqplib/callback_api');
// if the connection is closed or fails to be established at all, we will reconnect
var amqpConn = null;
function start() {
amqp.connect(process.env.CLOUDAMQP_URL + "?heartbeat=60", function(err, conn) {
if (err) {
console.error("[AMQP]", err.message);
return setTimeout(start, 1000);
}
conn.on("error", function(err) {
if (err.message !== "Connection closing") {
console.error("[AMQP] conn error", err.message);
}
});
conn.on("close", function() {
console.error("[AMQP] reconnecting");
return setTimeout(start, 1000);
});
console.log("[AMQP] connected");
amqpConn = conn;
whenConnected();
});
}
function whenConnected() {
startPublisher();
startWorker();
}
var pubChannel = null;
var offlinePubQueue = [];
function startPublisher() {
amqpConn.createConfirmChannel(function(err, ch) {
if (closeOnErr(err)) return;
ch.on("error", function(err) {
console.error("[AMQP] channel error", err.message);
});
ch.on("close", function() {
console.log("[AMQP] channel closed");
});
pubChannel = ch;
while (true) {
var m = offlinePubQueue.shift();
if (!m) break;
publish(m[0], m[1], m[2]);
}
});
}
// method to publish a message, will queue messages internally if the connection is down and resend later
function publish(exchange, routingKey, content) {
try {
pubChannel.publish(exchange, routingKey, content, { persistent: true },
function(err, ok) {
if (err) {
console.error("[AMQP] publish", err);
offlinePubQueue.push([exchange, routingKey, content]);
pubChannel.connection.close();
}
});
} catch (e) {
console.error("[AMQP] publish", e.message);
offlinePubQueue.push([exchange, routingKey, content]);
}
}
// A worker that acks messages only if processed succesfully
function startWorker() {
amqpConn.createChannel(function(err, ch) {
if (closeOnErr(err)) return;
ch.on("error", function(err) {
console.error("[AMQP] channel error", err.message);
});
ch.on("close", function() {
console.log("[AMQP] channel closed");
});
ch.prefetch(10);
ch.assertQueue("jobs", { durable: true }, function(err, _ok) {
if (closeOnErr(err)) return;
ch.consume("jobs", processMsg, { noAck: false });
console.log("Worker is started");
});
function processMsg(msg) {
work(msg, function(ok) {
try {
if (ok)
ch.ack(msg);
else
ch.reject(msg, true);
} catch (e) {
closeOnErr(e);
}
});
}
});
}
function work(msg, cb) {
console.log("Got msg", msg.content.toString());
cb(true);
}
function closeOnErr(err) {
if (!err) return false;
console.error("[AMQP] error", err);
amqpConn.close();
return true;
}
setInterval(function() {
publish("", "jobs", new Buffer("work work work"));
}, 1000);
start();
@kiwenlau
Copy link

kiwenlau commented Apr 13, 2018

Following is my code with Async/Await:

const amqp = require("amqplib");

var connection;

async function connectRabbitMQ()
{
    try
    {
        connection = await amqp.connect("amqp://localhost");
        console.log("connect to RabbitMQ success!");

        // How to reproduce "error" event?
        connection.on("error", function(err)
        {
            logger.error(err);
            setTimeout(connectRabbitMQ, 10000);
        });

        connection.on("close", function()
        {
            logger.error("connection to RabbitQM closed!");
            setTimeout(connectRabbitMQ, 10000);
        });

    }
    catch (err)
    {
        console.error(err);
        setTimeout(connectRabbitMQ, 10000);
    }


}

connectRabbitMQ();

By stopping RabbitMQ container, I can reproduce "close" event, but how can I reproduce "error" event on connection?

@OsoianMarcel
Copy link

@pitops
Copy link

pitops commented Jan 17, 2019

@varunnayal I have same issue, any workarounds?

@JuergenSimon
Copy link

JuergenSimon commented Apr 1, 2019

@kiwenlau: await connection.connection._events.error('foobar');

@johanrhodin
Copy link

We should update the user of Buffer():
(node:25702) [DEP0005] DeprecationWarning: Buffer() is deprecated due to security and usability issues. Please use the Buffer.alloc(), Buffer.allocUnsafe(), or Buffer.from() methods instead.

@teamdraftbox
Copy link

@johanrhodin please use this code instead of the depreciated one;
publish("", "jobs", new Buffer.from("work work work"));

@icinemagr
Copy link

Thank you for your code logic. In fact, when I try to restart RabbitMQ, because it took more than 1s to connect to the MQ, 2 or more connections were created. So I just want to add a bit of code to make it perfect.

var isConnecting = false;
function start() {
    if (isConnecting) return;
    isConnecting = true;**
    amqp.connect(process.env.CLOUDAMQP_URL + "?heartbeat=60", function(err, conn) {
        isConnecting = false;**
...
}

Exactly what i did !!!!!! I thought none see it.

@deleteeeeeeeeeeeed
Copy link

how can i set rate limit
i mean consumer should read just 4 message per minute

@jonahmolinski
Copy link

@Bahramibh

You can use channel.prefetch to limit the number of messages that the consumer has but has not acknowledged. I.e, only process upto 4 messages at a time: channel.prefetch(4)

@MaksAnn
Copy link

MaksAnn commented Nov 5, 2022

Following is my code with Async/Await:

const amqp = require("amqplib");

var connection;

async function connectRabbitMQ()
{
    try
    {
        connection = await amqp.connect("amqp://localhost");
        console.log("connect to RabbitMQ success!");

        // How to reproduce "error" event?
        connection.on("error", function(err)
        {
            logger.error(err);
            setTimeout(connectRabbitMQ, 10000);
        });

        connection.on("close", function()
        {
            logger.error("connection to RabbitQM closed!");
            setTimeout(connectRabbitMQ, 10000);
        });

    }
    catch (err)
    {
        console.error(err);
        setTimeout(connectRabbitMQ, 10000);
    }


}

connectRabbitMQ();

By stopping RabbitMQ container, I can reproduce "close" event, but how can I reproduce "error" event on connection?

One way is just turn on VPN - it's should broke connection

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment