-
-
Save carlhoerberg/006b01ac17a0a94859ba to your computer and use it in GitHub Desktop.
var amqp = require('amqplib/callback_api'); | |
// if the connection is closed or fails to be established at all, we will reconnect | |
var amqpConn = null; | |
function start() { | |
amqp.connect(process.env.CLOUDAMQP_URL + "?heartbeat=60", function(err, conn) { | |
if (err) { | |
console.error("[AMQP]", err.message); | |
return setTimeout(start, 1000); | |
} | |
conn.on("error", function(err) { | |
if (err.message !== "Connection closing") { | |
console.error("[AMQP] conn error", err.message); | |
} | |
}); | |
conn.on("close", function() { | |
console.error("[AMQP] reconnecting"); | |
return setTimeout(start, 1000); | |
}); | |
console.log("[AMQP] connected"); | |
amqpConn = conn; | |
whenConnected(); | |
}); | |
} | |
function whenConnected() { | |
startPublisher(); | |
startWorker(); | |
} | |
var pubChannel = null; | |
var offlinePubQueue = []; | |
function startPublisher() { | |
amqpConn.createConfirmChannel(function(err, ch) { | |
if (closeOnErr(err)) return; | |
ch.on("error", function(err) { | |
console.error("[AMQP] channel error", err.message); | |
}); | |
ch.on("close", function() { | |
console.log("[AMQP] channel closed"); | |
}); | |
pubChannel = ch; | |
while (true) { | |
var m = offlinePubQueue.shift(); | |
if (!m) break; | |
publish(m[0], m[1], m[2]); | |
} | |
}); | |
} | |
// method to publish a message, will queue messages internally if the connection is down and resend later | |
function publish(exchange, routingKey, content) { | |
try { | |
pubChannel.publish(exchange, routingKey, content, { persistent: true }, | |
function(err, ok) { | |
if (err) { | |
console.error("[AMQP] publish", err); | |
offlinePubQueue.push([exchange, routingKey, content]); | |
pubChannel.connection.close(); | |
} | |
}); | |
} catch (e) { | |
console.error("[AMQP] publish", e.message); | |
offlinePubQueue.push([exchange, routingKey, content]); | |
} | |
} | |
// A worker that acks messages only if processed succesfully | |
function startWorker() { | |
amqpConn.createChannel(function(err, ch) { | |
if (closeOnErr(err)) return; | |
ch.on("error", function(err) { | |
console.error("[AMQP] channel error", err.message); | |
}); | |
ch.on("close", function() { | |
console.log("[AMQP] channel closed"); | |
}); | |
ch.prefetch(10); | |
ch.assertQueue("jobs", { durable: true }, function(err, _ok) { | |
if (closeOnErr(err)) return; | |
ch.consume("jobs", processMsg, { noAck: false }); | |
console.log("Worker is started"); | |
}); | |
function processMsg(msg) { | |
work(msg, function(ok) { | |
try { | |
if (ok) | |
ch.ack(msg); | |
else | |
ch.reject(msg, true); | |
} catch (e) { | |
closeOnErr(e); | |
} | |
}); | |
} | |
}); | |
} | |
function work(msg, cb) { | |
console.log("Got msg", msg.content.toString()); | |
cb(true); | |
} | |
function closeOnErr(err) { | |
if (!err) return false; | |
console.error("[AMQP] error", err); | |
amqpConn.close(); | |
return true; | |
} | |
setInterval(function() { | |
publish("", "jobs", new Buffer("work work work")); | |
}, 1000); | |
start(); |
I tried running above snippet by dividing them in two separate files in worker.js and producer.js.
Notable changes I've done in worker.js
is to change cb(true)
to setTimeout(function() { cb(true) }, 12000)
, to mimic a some time consuming processing.
Also producer.js
changed to publish just one message at a time.
Now problem comes when rabbitmq server restarts after worker receives the message and before worker sends the ack
for the message it reconnects to the server. After reconnecting worker receives the same message from server while current message is being prepared to send ack
.
Here is the snippet of test files:
worker.js
/**
* worker.js
*/
var amqp = require('amqplib/callback_api');
process.env.CLOUDAMQP_URL = 'amqp://localhost';
// if the connection is closed or fails to be established at all, we will reconnect
var amqpConn = null;
function start() {
amqp.connect(process.env.CLOUDAMQP_URL + "?heartbeat=60", function(err, conn) {
if (err) {
console.error("[AMQP]", err.message);
return setTimeout(start, 7000);
}
conn.on("error", function(err) {
if (err.message !== "Connection closing") {
console.error("[AMQP] conn error", err.message);
}
});
conn.on("close", function() {
console.error("[AMQP] reconnecting");
return setTimeout(start, 7000);
});
console.log("[AMQP] connected");
amqpConn = conn;
whenConnected();
});
}
function whenConnected() {
startWorker();
}
// A worker that acks messages only if processed succesfully
function startWorker() {
amqpConn.createChannel(function(err, ch) {
if (closeOnErr(err)) return;
ch.on("error", function(err) {
console.error("[AMQP] channel error", err.message);
});
ch.on("close", function() {
console.log("[AMQP] channel closed");
});
ch.prefetch(10);
ch.assertQueue("jobs", { durable: true }, function(err, _ok) {
if (closeOnErr(err)) return;
ch.consume("jobs", processMsg, { noAck: false });
console.log("Worker is started");
});
function processMsg(msg) {
var incomingDate = (new Date()).toISOString();
console.log("Msg [deliveryTag=" + msg.fields.deliveryTag + "] arrived at " + incomingDate);
work(msg, function(ok) {
console.log("Sending Ack for msg at time " + incomingDate);
try {
if (ok)
ch.ack(msg);
else
ch.reject(msg, true);
} catch (e) {
closeOnErr(e);
}
});
}
});
}
function work(msg, cb) {
console.log("Got msg", msg.content.toString());
setTimeout(() => cb(true), process.env.WORK_WAIT_TIME || 12000);
}
function closeOnErr(err) {
if (!err) return false;
console.error("[AMQP] error", err);
amqpConn.close();
return true;
}
start();
producer.js
/**
* producer.js
**/
var amqp = require('amqplib/callback_api');
process.env.CLOUDAMQP_URL = 'amqp://localhost';
// if the connection is closed or fails to be established at all, we will reconnect
var amqpConn = null;
function start() {
amqp.connect(process.env.CLOUDAMQP_URL + "?heartbeat=60", function(err, conn) {
if (err) {
console.error("[AMQP]", err.message);
return setTimeout(start, 7000);
}
conn.on("error", function(err) {
if (err.message !== "Connection closing") {
console.error("[AMQP] conn error", err.message);
}
});
conn.on("close", function() {
console.error("[AMQP] reconnecting");
return setTimeout(start, 7000);
});
console.log("[AMQP] connected");
amqpConn = conn;
whenConnected();
});
}
function whenConnected() {
startPublisher();
}
var pubChannel = null;
var offlinePubQueue = [];
function startPublisher() {
amqpConn.createConfirmChannel(function(err, ch) {
if (closeOnErr(err)) return;
ch.on("error", function(err) {
console.error("[AMQP] channel error", err.message);
});
ch.on("close", function() {
console.log("[AMQP] channel closed");
});
pubChannel = ch;
while (true) {
var m = offlinePubQueue.shift();
console.log('M = ', m);
if (!m) break;
publish(m[0], m[1], m[2]);
}
});
}
// method to publish a message, will queue messages internally if the connection is down and resend later
function publish(exchange, routingKey, content) {
try {
pubChannel.publish(exchange, routingKey, content, { persistent: true },
function(err, ok) {
if (err) {
console.error("[AMQP] publish", err);
offlinePubQueue.push([exchange, routingKey, content]);
pubChannel.connection.close();
}
});
} catch (e) {
console.error("[AMQP] publish", e.message);
offlinePubQueue.push([exchange, routingKey, content]);
}
}
function closeOnErr(err) {
if (!err) return false;
console.error("[AMQP] error", err);
amqpConn.close();
return true;
}
setTimeout(function() {
publish("", "jobs", new Buffer("work work work"));
}, 3000);
start();
Here is the log received from worker.js:
[AMQP] connected
Worker is started
### Message Received ###
Msg [deliveryTag=1] arrived at 2017-07-13T16:17:37.331Z
Got msg work work work
### RABBITMQ server restarted before we could send ack ###
[AMQP] channel closed
[AMQP] reconnecting
### Server Connected ###
[AMQP] connected
Worker is started
### Same message received again ###
Msg [deliveryTag=1] arrived at 2017-07-13T16:17:46.285Z
Got msg work work work
### Error while sending ack now (For message received before reconnecting ###
Sending Ack for msg at time 2017-07-13T16:17:37.331Z
[AMQP] error { IllegalOperationError: Channel closed
at Channel.<anonymous> (/test/node_modules/amqplib/lib/channel.js:149:11)
at Channel.ack (/test/node_modules/amqplib/lib/callback_model.js:234:8)
at /test/worker.js:57:16
at Timeout.setTimeout (/test/worker.js:70:20)
at ontimeout (timers.js:380:14)
at tryOnTimeout (timers.js:244:5)
at Timer.listOnTimeout (timers.js:214:5)
message: 'Channel closed',
stack: 'IllegalOperationError: Channel closed\n at Channel.<anonymous> (/test/node_modules/amqplib/lib/channel.js:149:11)\n at Channel.ack (/test/node_modules/amqplib/lib/callback_model.js:234:8)\n at /test/worker.js:57:16\n at Timeout.setTimeout (/test/worker.js:70:20)\n at ontimeout (timers.js:380:14)\n at tryOnTimeout (timers.js:244:5)\n at Timer.listOnTimeout (timers.js:214:5)',
stackAtStateChange: 'Stack capture: Connection closed: 320 (CONNECTION-FORCED) with message "CONNECTION_FORCED - broker forced connection closure with reason \'shutdown\'"\n at Object.accept (/test/node_modules/amqplib/lib/connection.js:89:15)\n at Connection.mainAccept [as accept] (/test/node_modules/amqplib/lib/connection.js:63:33)\n at Socket.go (/test/node_modules/amqplib/lib/connection.js:476:48)\n at emitNone (events.js:86:13)\n at Socket.emit (events.js:185:7)\n at emitReadable_ (_stream_readable.js:432:10)\n at emitReadable (_stream_readable.js:426:7)\n at readableAddChunk (_stream_readable.js:187:13)\n at Socket.Readable.push (_stream_readable.js:134:10)\n at TCP.onread (net.js:551:20)' }
[AMQP] channel closed
[AMQP] reconnecting
Any ideas on how can we handle this scenario?
What if error
or close
are emitted on channel(not on connection) object from worker perspective? No need to reconnect?
Following is my code with Async/Await:
const amqp = require("amqplib");
var connection;
async function connectRabbitMQ()
{
try
{
connection = await amqp.connect("amqp://localhost");
console.log("connect to RabbitMQ success!");
// How to reproduce "error" event?
connection.on("error", function(err)
{
logger.error(err);
setTimeout(connectRabbitMQ, 10000);
});
connection.on("close", function()
{
logger.error("connection to RabbitQM closed!");
setTimeout(connectRabbitMQ, 10000);
});
}
catch (err)
{
console.error(err);
setTimeout(connectRabbitMQ, 10000);
}
}
connectRabbitMQ();
By stopping RabbitMQ container, I can reproduce "close" event, but how can I reproduce "error" event on connection?
Take a look at this library: https://github.com/benbria/node-amqp-connection-manager
@varunnayal I have same issue, any workarounds?
@kiwenlau: await connection.connection._events.error('foobar');
We should update the user of Buffer()
:
(node:25702) [DEP0005] DeprecationWarning: Buffer() is deprecated due to security and usability issues. Please use the Buffer.alloc(), Buffer.allocUnsafe(), or Buffer.from() methods instead.
@johanrhodin please use this code instead of the depreciated one;
publish("", "jobs", new Buffer.from("work work work"));
Thank you for your code logic. In fact, when I try to restart RabbitMQ, because it took more than 1s to connect to the MQ, 2 or more connections were created. So I just want to add a bit of code to make it perfect.
var isConnecting = false; function start() { if (isConnecting) return; isConnecting = true;** amqp.connect(process.env.CLOUDAMQP_URL + "?heartbeat=60", function(err, conn) { isConnecting = false;** ... }
Exactly what i did !!!!!! I thought none see it.
how can i set rate limit
i mean consumer should read just 4 message per minute
@Bahramibh
You can use channel.prefetch to limit the number of messages that the consumer has but has not acknowledged. I.e, only process upto 4 messages at a time: channel.prefetch(4)
Following is my code with Async/Await:
const amqp = require("amqplib"); var connection; async function connectRabbitMQ() { try { connection = await amqp.connect("amqp://localhost"); console.log("connect to RabbitMQ success!"); // How to reproduce "error" event? connection.on("error", function(err) { logger.error(err); setTimeout(connectRabbitMQ, 10000); }); connection.on("close", function() { logger.error("connection to RabbitQM closed!"); setTimeout(connectRabbitMQ, 10000); }); } catch (err) { console.error(err); setTimeout(connectRabbitMQ, 10000); } } connectRabbitMQ();
By stopping RabbitMQ container, I can reproduce "close" event, but how can I reproduce "error" event on connection?
One way is just turn on VPN - it's should broke connection
Thank you for your code logic. In fact, when I try to restart RabbitMQ, because it took more than 1s to connect to the MQ, 2 or more connections were created. So I just want to add a bit of code to make it perfect.