Skip to content

Instantly share code, notes, and snippets.

@rissem
Created December 16, 2016 19:15
Show Gist options
  • Save rissem/8565caf2290973c616c1de5376088c61 to your computer and use it in GitHub Desktop.
Save rissem/8565caf2290973c616c1de5376088c61 to your computer and use it in GitHub Desktop.
Bull #371 reproduction
var Bull = require('bull');
const spawn = require('child_process').spawn;
var queue = new Bull('contacts', 6379, 'localhost');
queue.LOCK_RENEW_TIME = 500;
let originalConsumer = !process.env.SPAWNED
if (originalConsumer){
queue.add({}, {
removeOnComplete: true,
attempts: 2
}).then(() => {
console.log('published job');
});
} else {
console.log("started");
}
//if this is not the secondary consumer, spawn another
if (!process.env.SPAWNED){
const consumer2 = spawn('/usr/local/bin/node', ['consumer.js'], {env: {SPAWNED: true}})
consumer2.stdout.on('data', (data) => {
process.stdout.write(`*CONSUMER2 STDOUT ${data}*\n\n`)
});
consumer2.stderr.on('data', (data) => {
process.stderr.write(`*CONSUMER2 STDERR ${data}*\n\n`)
});
consumer2.on('closed', (code) => {
console.log(`CONSUMER 2 exited with code ${code}`)
})
}
//stall blocks the main thread for duration
const stall = (duration) => {
const now = Date.now()
console.log('started stalling', Date.now())
while (Date.now() < now + duration){
}
console.log('finished stalling', Date.now())
}
function handler(job, done) {
console.log('processing job', job.jobId);
if (originalConsumer){
stall(10000)
done(new Error("too many cats"));
// setTimeout(() => {
// done(new Error("too many cats"))
// }, 100)
} else {
setTimeout(() => {
console.log('processing complete', job.jobId);
done()
}, 2000)
}
}
queue.on('stalled', job => console.log('stalled: ', job.jobId));
//without timeout stalling happens before consumer2 starts
setTimeout(()=>{
queue.process(handler, {
concurrency: 1
});
}, 1000)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment