run this with > time DEBUG=amqp:* COUNT=30000 PREFETCH=0 BUCKET_SIZE=20000 node index.js
before applying the fix:
real 1m22.216s
user 1m10.973s
sys 0m16.106s
after:
real 0m25.034s
user 0m14.082s
sys 0m15.153s
var amqp = require('amqp'); | |
var PREFETCH = parseInt(process.env.PREFETCH, 10) || 1; | |
var COUNT = process.env.COUNT ? parseInt(process.env.COUNT, 10) : 30000; | |
var exchangeName = 'helloo'; | |
var exchangeOpts = { | |
type: 'direct' | |
, passive: false | |
, durable: true | |
, autoDelete: false | |
, confirm: true | |
} | |
function consume () { | |
var debug = require('debug')('amqp:consume'); | |
debug('connecting to rabbit'); | |
var conn = amqp.createConnection(); | |
'connect open error'.split(' ').forEach(function (evt) { | |
conn.on(evt, function () { | |
debug(evt, arguments); | |
}) | |
}); | |
conn.once('ready', function () { | |
debug('ready'); | |
conn.exchange(exchangeName, exchangeOpts, function (ex) { | |
conn.queue('hello-queue', function (q) { | |
q.bind(ex, 'hola'); | |
q.on('queueBindOk', function () { | |
debug('queueBindOk'); | |
var opts = {}; | |
opts.ack = true; | |
opts.prefetchCount = PREFETCH; | |
var i = 0; | |
q.subscribe(opts, function (msg, headers, info) { | |
++i; | |
if (('testing '+COUNT) == msg.msg) { | |
debug('closing connection'); | |
conn.end(); | |
return; | |
} | |
if (0 === i%1000) debug('got messages', i); | |
q.shift(); | |
}); | |
}); | |
debug('waiting for `queueBindOk`'); | |
process.send({ ready: true }); | |
}); | |
}); | |
}); | |
} | |
consume(); |
var cp = require('child_process'); | |
var DEBUG = process.env.DEBUG; | |
var PREFETCH = parseInt(process.env.PREFETCH, 10) || 1; | |
var COUNT = process.env.COUNT ? parseInt(process.env.COUNT, 10) : 30000; | |
var consumer = cp.fork(__dirname + '/consumer.js'); | |
consumer.on('message', function (msg) { | |
if (true !== msg.ready) { | |
console.log('unknown message', msg); | |
consumer.kill(); | |
return; | |
} | |
var producer = cp.fork(__dirname + '/producer.js'); | |
}); | |
console.log('waiting for consumer ready'); | |
{ | |
"name": "rabbit", | |
"private": true, | |
"version": "0.0.0", | |
"description": "", | |
"main": "index.js", | |
"dependencies": { | |
"amqp": "~0.1.8", | |
"debug": "~0.7.4" | |
}, | |
"devDependencies": {}, | |
"scripts": { | |
"test": "echo \"Error: no test specified\" && exit 1" | |
}, | |
"author": "", | |
"license": "BSD-2-Clause" | |
} |
var amqp = require('amqp'); | |
var PREFETCH = parseInt(process.env.PREFETCH, 10) || 1; | |
var COUNT = process.env.COUNT ? parseInt(process.env.COUNT, 10) : 30000; | |
var BUCKET_SIZE = process.env.BUCKET_SIZE ? parseInt(process.env.BUCKET_SIZE, 10) : 30000; | |
var TIMEOUT = 1; | |
var exchangeOpts = { | |
type: 'direct' | |
, passive: false | |
, durable: true | |
, autoDelete: false | |
, confirm: true // causes publish cbs to fire upon confirm | |
} | |
var exchangeName = 'helloo'; | |
function produce () { | |
var debug = require('debug')('amqp:produce'); | |
var conn = amqp.createConnection(); | |
'connect open error'.split(' ').forEach(function (evt) { | |
conn.on(evt, function () { | |
debug(evt, arguments); | |
}) | |
}); | |
conn.once('ready', function () { | |
debug('producer ready'); | |
conn.exchange(exchangeName, exchangeOpts, function (ex) { | |
var iteration = 0; | |
publish(iteration); | |
function publish (iteration) { | |
debug('iteration', iteration, COUNT+1, BUCKET_SIZE); | |
var start = iteration * BUCKET_SIZE; | |
if (start >= COUNT+1) { | |
debug('finishing up'); | |
setTimeout(function () { | |
debug('closing connection'); | |
conn.end(); | |
},500) | |
return; | |
} | |
var end = Math.min(COUNT+1, start+BUCKET_SIZE); | |
for (var i = start; i < end; ++i) { | |
var msg = {msg:'testing '+i}; | |
if (0 === i%1000) debug('publishing', msg); | |
ex.publish('hola', msg, {}, (function(i){ return function (failed) { | |
if (0 === i%1000) { | |
debug('exchange published sucessfully?', false === failed); | |
} | |
}})(i)); | |
} | |
iteration++; | |
setTimeout(function(){ | |
publish(iteration); | |
}, TIMEOUT); | |
} | |
}); | |
}); | |
} | |
produce(); |