-
-
Save carlhoerberg/006b01ac17a0a94859ba to your computer and use it in GitHub Desktop.
var amqp = require('amqplib/callback_api'); | |
// if the connection is closed or fails to be established at all, we will reconnect | |
var amqpConn = null; | |
function start() { | |
amqp.connect(process.env.CLOUDAMQP_URL + "?heartbeat=60", function(err, conn) { | |
if (err) { | |
console.error("[AMQP]", err.message); | |
return setTimeout(start, 1000); | |
} | |
conn.on("error", function(err) { | |
if (err.message !== "Connection closing") { | |
console.error("[AMQP] conn error", err.message); | |
} | |
}); | |
conn.on("close", function() { | |
console.error("[AMQP] reconnecting"); | |
return setTimeout(start, 1000); | |
}); | |
console.log("[AMQP] connected"); | |
amqpConn = conn; | |
whenConnected(); | |
}); | |
} | |
function whenConnected() { | |
startPublisher(); | |
startWorker(); | |
} | |
var pubChannel = null; | |
var offlinePubQueue = []; | |
function startPublisher() { | |
amqpConn.createConfirmChannel(function(err, ch) { | |
if (closeOnErr(err)) return; | |
ch.on("error", function(err) { | |
console.error("[AMQP] channel error", err.message); | |
}); | |
ch.on("close", function() { | |
console.log("[AMQP] channel closed"); | |
}); | |
pubChannel = ch; | |
while (true) { | |
var m = offlinePubQueue.shift(); | |
if (!m) break; | |
publish(m[0], m[1], m[2]); | |
} | |
}); | |
} | |
// method to publish a message, will queue messages internally if the connection is down and resend later | |
function publish(exchange, routingKey, content) { | |
try { | |
pubChannel.publish(exchange, routingKey, content, { persistent: true }, | |
function(err, ok) { | |
if (err) { | |
console.error("[AMQP] publish", err); | |
offlinePubQueue.push([exchange, routingKey, content]); | |
pubChannel.connection.close(); | |
} | |
}); | |
} catch (e) { | |
console.error("[AMQP] publish", e.message); | |
offlinePubQueue.push([exchange, routingKey, content]); | |
} | |
} | |
// A worker that acks messages only if processed succesfully | |
function startWorker() { | |
amqpConn.createChannel(function(err, ch) { | |
if (closeOnErr(err)) return; | |
ch.on("error", function(err) { | |
console.error("[AMQP] channel error", err.message); | |
}); | |
ch.on("close", function() { | |
console.log("[AMQP] channel closed"); | |
}); | |
ch.prefetch(10); | |
ch.assertQueue("jobs", { durable: true }, function(err, _ok) { | |
if (closeOnErr(err)) return; | |
ch.consume("jobs", processMsg, { noAck: false }); | |
console.log("Worker is started"); | |
}); | |
function processMsg(msg) { | |
work(msg, function(ok) { | |
try { | |
if (ok) | |
ch.ack(msg); | |
else | |
ch.reject(msg, true); | |
} catch (e) { | |
closeOnErr(e); | |
} | |
}); | |
} | |
}); | |
} | |
function work(msg, cb) { | |
console.log("Got msg", msg.content.toString()); | |
cb(true); | |
} | |
function closeOnErr(err) { | |
if (!err) return false; | |
console.error("[AMQP] error", err); | |
amqpConn.close(); | |
return true; | |
} | |
setInterval(function() { | |
publish("", "jobs", new Buffer("work work work")); | |
}, 1000); | |
start(); |
What if error
or close
are emitted on channel(not on connection) object from worker perspective? No need to reconnect?
Following is my code with Async/Await:
const amqp = require("amqplib");
var connection;
async function connectRabbitMQ()
{
try
{
connection = await amqp.connect("amqp://localhost");
console.log("connect to RabbitMQ success!");
// How to reproduce "error" event?
connection.on("error", function(err)
{
logger.error(err);
setTimeout(connectRabbitMQ, 10000);
});
connection.on("close", function()
{
logger.error("connection to RabbitQM closed!");
setTimeout(connectRabbitMQ, 10000);
});
}
catch (err)
{
console.error(err);
setTimeout(connectRabbitMQ, 10000);
}
}
connectRabbitMQ();
By stopping RabbitMQ container, I can reproduce "close" event, but how can I reproduce "error" event on connection?
Take a look at this library: https://github.com/benbria/node-amqp-connection-manager
@varunnayal I have same issue, any workarounds?
@kiwenlau: await connection.connection._events.error('foobar');
We should update the user of Buffer()
:
(node:25702) [DEP0005] DeprecationWarning: Buffer() is deprecated due to security and usability issues. Please use the Buffer.alloc(), Buffer.allocUnsafe(), or Buffer.from() methods instead.
@johanrhodin please use this code instead of the depreciated one;
publish("", "jobs", new Buffer.from("work work work"));
Thank you for your code logic. In fact, when I try to restart RabbitMQ, because it took more than 1s to connect to the MQ, 2 or more connections were created. So I just want to add a bit of code to make it perfect.
var isConnecting = false; function start() { if (isConnecting) return; isConnecting = true;** amqp.connect(process.env.CLOUDAMQP_URL + "?heartbeat=60", function(err, conn) { isConnecting = false;** ... }
Exactly what i did !!!!!! I thought none see it.
how can i set rate limit
i mean consumer should read just 4 message per minute
@Bahramibh
You can use channel.prefetch to limit the number of messages that the consumer has but has not acknowledged. I.e, only process upto 4 messages at a time: channel.prefetch(4)
Following is my code with Async/Await:
const amqp = require("amqplib"); var connection; async function connectRabbitMQ() { try { connection = await amqp.connect("amqp://localhost"); console.log("connect to RabbitMQ success!"); // How to reproduce "error" event? connection.on("error", function(err) { logger.error(err); setTimeout(connectRabbitMQ, 10000); }); connection.on("close", function() { logger.error("connection to RabbitQM closed!"); setTimeout(connectRabbitMQ, 10000); }); } catch (err) { console.error(err); setTimeout(connectRabbitMQ, 10000); } } connectRabbitMQ();
By stopping RabbitMQ container, I can reproduce "close" event, but how can I reproduce "error" event on connection?
One way is just turn on VPN - it's should broke connection
I tried running above snippet by dividing them in two separate files in worker.js and producer.js.
Notable changes I've done in
worker.js
is to changecb(true)
tosetTimeout(function() { cb(true) }, 12000)
, to mimic a some time consuming processing.Also
producer.js
changed to publish just one message at a time.Now problem comes when rabbitmq server restarts after worker receives the message and before worker sends the
ack
for the message it reconnects to the server. After reconnecting worker receives the same message from server while current message is being prepared to sendack
.Here is the snippet of test files:
worker.js
producer.js
Here is the log received from worker.js:
Any ideas on how can we handle this scenario?