Skip to content

Instantly share code, notes, and snippets.

@jpsilvashy
Created March 4, 2017 04:19
Show Gist options
  • Save jpsilvashy/b94810e57c1f51b0950200a39c5152fb to your computer and use it in GitHub Desktop.
Save jpsilvashy/b94810e57c1f51b0950200a39c5152fb to your computer and use it in GitHub Desktop.
A Simple RabbitMQ Client
var amqp = require('amqplib/callback_api');
// if the connection is closed or fails to be established at all, we will reconnect
var amqpConn = null;
var pubChannel = null;
var offlinePubQueue = [];
var mq = {
// Main start
start: function() {
amqp.connect(process.env.CLOUDAMQP_URL + "?heartbeat=60", function(err, conn) {
if (err) {
console.error("[AMQP]", err.message);
return setTimeout(mq.start, 1000);
}
conn.on("error", function(err) {
if (err.message !== "Connection closing") {
console.error("[AMQP] conn error", err.message);
}
});
conn.on("close", function() {
console.error("[AMQP] reconnecting");
return setTimeout(mq.start, 1000);
});
console.log("[AMQP] connected");
amqpConn = conn;
mq.connected();
});
},
// when we connect to amqp
connected: function() {
mq.startPublisher();
mq.startWorker();
},
// startPublisher
startPublisher: function() {
amqpConn.createConfirmChannel(function(err, ch) {
if (mq.closeOnErr(err)) return;
ch.on("error", function(err) {
console.error("[AMQP] channel error", err.message);
});
ch.on("close", function() {
console.log("[AMQP] channel closed");
});
pubChannel = ch;
while (true) {
var m = offlinePubQueue.shift();
if (!m) break;
mq.publish(m[0], m[1], m[2]);
}
});
},
// method to publish a message, will queue messages internally if the connection is down and resend later
publish: function(exchange, routingKey, content) {
try {
pubChannel.publish(exchange, routingKey, content, { persistent: true },
function(err, ok) {
if (err) {
console.error("[AMQP] publish", err);
offlinePubQueue.push([exchange, routingKey, content]);
pubChannel.connection.close();
}
});
} catch (e) {
console.error("[AMQP] publish", e.message);
offlinePubQueue.push([exchange, routingKey, content]);
}
},
// A worker that acks messages only if processed succesfully
startWorker: function() {
amqpConn.createChannel(function(err, ch) {
if (mq.closeOnErr(err)) return;
ch.on("error", function(err) {
console.error("[AMQP] channel error", err.message);
});
ch.on("close", function() {
console.log("[AMQP] channel closed");
});
ch.prefetch(10);
ch.assertQueue("jobs", { durable: true }, function(err, _ok) {
if (mq.closeOnErr(err)) return;
ch.consume("jobs", processMsg, { noAck: false });
console.log("Worker is started");
});
function processMsg(msg) {
mq.work(msg, function(ok) {
try {
if (ok)
ch.ack(msg);
else
ch.reject(msg, true);
} catch (e) {
mq.closeOnErr(e);
}
});
}
});
},
// Actually handle the message
work: function(msg, cb) {
console.log("Got msg", msg.content.toString());
cb(true);
},
//
closeOnErr: function(err) {
if (!err) return false;
console.error("[AMQP] error", err);
amqpConn.close();
return true;
}
}
module.exports = mq;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment