-
-
Save carlhoerberg/006b01ac17a0a94859ba to your computer and use it in GitHub Desktop.
var amqp = require('amqplib/callback_api'); | |
// if the connection is closed or fails to be established at all, we will reconnect | |
var amqpConn = null; | |
function start() { | |
amqp.connect(process.env.CLOUDAMQP_URL + "?heartbeat=60", function(err, conn) { | |
if (err) { | |
console.error("[AMQP]", err.message); | |
return setTimeout(start, 1000); | |
} | |
conn.on("error", function(err) { | |
if (err.message !== "Connection closing") { | |
console.error("[AMQP] conn error", err.message); | |
} | |
}); | |
conn.on("close", function() { | |
console.error("[AMQP] reconnecting"); | |
return setTimeout(start, 1000); | |
}); | |
console.log("[AMQP] connected"); | |
amqpConn = conn; | |
whenConnected(); | |
}); | |
} | |
function whenConnected() { | |
startPublisher(); | |
startWorker(); | |
} | |
var pubChannel = null; | |
var offlinePubQueue = []; | |
function startPublisher() { | |
amqpConn.createConfirmChannel(function(err, ch) { | |
if (closeOnErr(err)) return; | |
ch.on("error", function(err) { | |
console.error("[AMQP] channel error", err.message); | |
}); | |
ch.on("close", function() { | |
console.log("[AMQP] channel closed"); | |
}); | |
pubChannel = ch; | |
while (true) { | |
var m = offlinePubQueue.shift(); | |
if (!m) break; | |
publish(m[0], m[1], m[2]); | |
} | |
}); | |
} | |
// method to publish a message, will queue messages internally if the connection is down and resend later | |
function publish(exchange, routingKey, content) { | |
try { | |
pubChannel.publish(exchange, routingKey, content, { persistent: true }, | |
function(err, ok) { | |
if (err) { | |
console.error("[AMQP] publish", err); | |
offlinePubQueue.push([exchange, routingKey, content]); | |
pubChannel.connection.close(); | |
} | |
}); | |
} catch (e) { | |
console.error("[AMQP] publish", e.message); | |
offlinePubQueue.push([exchange, routingKey, content]); | |
} | |
} | |
// A worker that acks messages only if processed succesfully | |
function startWorker() { | |
amqpConn.createChannel(function(err, ch) { | |
if (closeOnErr(err)) return; | |
ch.on("error", function(err) { | |
console.error("[AMQP] channel error", err.message); | |
}); | |
ch.on("close", function() { | |
console.log("[AMQP] channel closed"); | |
}); | |
ch.prefetch(10); | |
ch.assertQueue("jobs", { durable: true }, function(err, _ok) { | |
if (closeOnErr(err)) return; | |
ch.consume("jobs", processMsg, { noAck: false }); | |
console.log("Worker is started"); | |
}); | |
function processMsg(msg) { | |
work(msg, function(ok) { | |
try { | |
if (ok) | |
ch.ack(msg); | |
else | |
ch.reject(msg, true); | |
} catch (e) { | |
closeOnErr(e); | |
} | |
}); | |
} | |
}); | |
} | |
function work(msg, cb) { | |
console.log("Got msg", msg.content.toString()); | |
cb(true); | |
} | |
function closeOnErr(err) { | |
if (!err) return false; | |
console.error("[AMQP] error", err); | |
amqpConn.close(); | |
return true; | |
} | |
setInterval(function() { | |
publish("", "jobs", new Buffer("work work work")); | |
}, 1000); | |
start(); |
Following is my code with Async/Await:
const amqp = require("amqplib");
var connection;
async function connectRabbitMQ()
{
try
{
connection = await amqp.connect("amqp://localhost");
console.log("connect to RabbitMQ success!");
// How to reproduce "error" event?
connection.on("error", function(err)
{
logger.error(err);
setTimeout(connectRabbitMQ, 10000);
});
connection.on("close", function()
{
logger.error("connection to RabbitQM closed!");
setTimeout(connectRabbitMQ, 10000);
});
}
catch (err)
{
console.error(err);
setTimeout(connectRabbitMQ, 10000);
}
}
connectRabbitMQ();
By stopping RabbitMQ container, I can reproduce "close" event, but how can I reproduce "error" event on connection?
Take a look at this library: https://github.com/benbria/node-amqp-connection-manager
@varunnayal I have same issue, any workarounds?
@kiwenlau: await connection.connection._events.error('foobar');
We should update the user of Buffer()
:
(node:25702) [DEP0005] DeprecationWarning: Buffer() is deprecated due to security and usability issues. Please use the Buffer.alloc(), Buffer.allocUnsafe(), or Buffer.from() methods instead.
@johanrhodin please use this code instead of the depreciated one;
publish("", "jobs", new Buffer.from("work work work"));
Thank you for your code logic. In fact, when I try to restart RabbitMQ, because it took more than 1s to connect to the MQ, 2 or more connections were created. So I just want to add a bit of code to make it perfect.
var isConnecting = false; function start() { if (isConnecting) return; isConnecting = true;** amqp.connect(process.env.CLOUDAMQP_URL + "?heartbeat=60", function(err, conn) { isConnecting = false;** ... }
Exactly what i did !!!!!! I thought none see it.
how can i set rate limit
i mean consumer should read just 4 message per minute
@Bahramibh
You can use channel.prefetch to limit the number of messages that the consumer has but has not acknowledged. I.e, only process upto 4 messages at a time: channel.prefetch(4)
Following is my code with Async/Await:
const amqp = require("amqplib"); var connection; async function connectRabbitMQ() { try { connection = await amqp.connect("amqp://localhost"); console.log("connect to RabbitMQ success!"); // How to reproduce "error" event? connection.on("error", function(err) { logger.error(err); setTimeout(connectRabbitMQ, 10000); }); connection.on("close", function() { logger.error("connection to RabbitQM closed!"); setTimeout(connectRabbitMQ, 10000); }); } catch (err) { console.error(err); setTimeout(connectRabbitMQ, 10000); } } connectRabbitMQ();
By stopping RabbitMQ container, I can reproduce "close" event, but how can I reproduce "error" event on connection?
One way is just turn on VPN - it's should broke connection
What if
error
orclose
are emitted on channel(not on connection) object from worker perspective? No need to reconnect?