Skip to content

Instantly share code, notes, and snippets.

@zeisss
Created October 22, 2012 22:12
Show Gist options
  • Save zeisss/3934936 to your computer and use it in GitHub Desktop.
Save zeisss/3934936 to your computer and use it in GitHub Desktop.
node-amqp worker/producer which print their effectiveness
var amqp = require('amqp'),
util = require('util');
var con = amqp.createConnection();
con.on('ready', function () {
con.exchange('new-users', {type: 'fanout', confirm: true}, function (ex) {
ex.setMaxListeners(100);
var counter = 0,
lastSeenCounter = 0,
failed = 0,
unconfirmed = 0,
msgs = 250;
setInterval(function() {
var since = counter - lastSeenCounter;
console.log('Seq:', counter, 'Rate:', since, 'Unconfirmed:', unconfirmed, 'Failed:', failed, '(msgs:', msgs, ')');
if (failed > 0) {
msgs -= 30;
}
else if (unconfirmed < 15)
{
msgs++;
} else {
msgs -= (unconfirmed > 30) ? 10 : 1;
}
msgs = Math.max(msgs, 1);
failed = 0;
lastSeenCounter = counter;
}, 1000);
function loop() {
var msg = {
counter: counter++
};
ex.publish('', msg, {}, function (err) {
if (err) {failed++;}
unconfirmed--;
});
unconfirmed++;
setTimeout(loop, 1000 / msgs);
}
loop();
});
});
var amqp = require('amqp'),
util = require('util');
var con = amqp.createConnection();
con.on('ready', function () {
var queueOptions = {
durable: true,
exclusive: false
};
con.queue('shinid.worker.in', queueOptions, function (q) {
q.bind('new-users', '');
var messagesSeen = 0,
lastPrintSeen = 0;
lastCounter = 0,
highestCounter = 0,
unack = 0;
setInterval(function() {
messagesSince = (messagesSeen - lastPrintSeen) / 2; // NOTE: We print every two seconds ;)
console.log('Seen:', messagesSeen, 'Last:', lastCounter, 'Highest:', highestCounter, 'unack:', unack, 'msgs/s:', messagesSince);
lastPrintSeen = messagesSeen;
}, 2 * 1000);
q.subscribe({ack: true, prefetchCount: 20}, function (json, headers, delivery, rawMessage) {
messagesSeen++;
lastCounter = json.counter;
if (json.counter > highestCounter) {
if (json.counter > highestCounter + 10) {
console.log('Jumped', (json.counter - highestCounter));
}
highestCounter = json.counter;
}
if (json.counter < lastCounter) {
console.log('Out-of-Order', highestCounter, json.counter);
}
unack++;
setTimeout(function () {
unack--;
rawMessage.acknowledge();
}, 16 * Math.random());
});
});
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment