Last active
December 16, 2016 08:35
-
-
Save victusfate/1e2ce9eb73de32b78d2690d660f0f9c8 to your computer and use it in GitHub Desktop.
graceful queue and worker shutdown take 1
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict'; | |
const kue = require('kue'); | |
const url = require('url'); | |
const redis = require('redis'); | |
var getRedis = () => { | |
var redisUrl = url.parse(process.env.LOCAL_REDIS_URL || config.Redis.RedisUrl) | |
, client = redis.createClient(redisUrl.port, redisUrl.hostname); | |
// console.log({ action: 'app', redisUrl: redisUrl }); | |
if (redisUrl.auth) { | |
client.auth(redisUrl.auth.split(":")[1]); | |
} | |
return client; | |
} | |
const watchStuckJobsTime = 1000; | |
const workerShutdownTime = 3000; | |
const jobInactiveTime = workerShutdownTime + 500; | |
const killDelayTime = 5000; // allow all workers in this process time to gracefully shutdown | |
const concurrency = 5; | |
var queue = kue.createQueue({ | |
redis: { | |
createClientFactory: getRedis | |
} | |
}); | |
queue.on( 'error', (err) => { | |
console.log({ action: 'rawBatchJob.queue.err', err:err }); | |
}); | |
queue.watchStuckJobs(watchStuckJobsTime); | |
// the queue application process requires graceful shutdown, already covered by worker below | |
// and will work when workers are run in several different processes | |
// process.on( 'SIGTERM', function ( sig ) { | |
// queue.shutdown( killDelayTime, function(err) { | |
// console.log({ action: 'Kue shutdown SIGTERM: ', err: err||'' }); | |
// process.exit( 0 ); | |
// }); | |
// }); | |
// process.on( 'SIGINT', function ( sig ) { | |
// queue.shutdown( killDelayTime, function(err) { | |
// console.log({ action: 'Kue shutdown SIGINT: ', err: err||'' }); | |
// process.exit( 0 ); | |
// }); | |
// }); | |
// quick spam of jobs | |
for (let i = 0;i < 10; i++) { | |
const job = queue.create('email', { | |
title: 'Marky Mark Job ' + Math.random(), | |
to: 'Mr. Universe', | |
template: 'blah blah blah' | |
}) | |
.removeOnComplete( true ) | |
.save( (err) => { | |
if( !err ) { | |
console.log( job.id ); | |
} | |
}); | |
} | |
let fMessageWorker = (oJob, oData) => { | |
console.log({ id: oJob.toJSON().id, job: oJob.toJSON().type, oData: oData }) | |
// return Promise.resolve(); | |
return new Promise( (resolve,reject) => { | |
const N = 100; | |
let iterate = (i,N) => { | |
setTimeout( () => { | |
// console.log({ action: 'fMessageWorker.iterate', i:i, N: N }); | |
i++; | |
oJob.progress(i,N); | |
if (i === N) { | |
resolve(); | |
} | |
else { | |
iterate(i,N); | |
} | |
}, Math.random() * 100 ); | |
} | |
iterate(0,N); | |
}); | |
} | |
const jobInfo = (job) => { | |
const oJob = job.toJSON() || {}; | |
return { id: oJob.id, type: oJob.type, state: oJob.state }; | |
} | |
const setJobInactive = (jobId,jobType,delayMS) => { | |
const sAction = 'setJobInactive'; | |
return new Promise( (resolve,reject) => { | |
setTimeout( () => { | |
kue.Job.get( jobId, jobType, (err,job) => { | |
if (job) { // job may be complete/removed | |
const oJob = job.toJSON(); | |
console.log({ action: sAction + '.success' , job: jobInfo(job) }); | |
if (oJob.state === 'active' || oJob.state === 'failed') { | |
job.inactive( (err) => { | |
if (err) { | |
console.error({action: sAction + '.err', job: jobInfo(job), err:err }); | |
reject(err); | |
} | |
else { | |
console.log({ action: sAction + '.success' , job: jobInfo(job) }); | |
resolve(); | |
} | |
}); | |
} | |
else { | |
console.log({ action: sAction + '.skipping', job: jobInfo(job) }); | |
resolve(); | |
} | |
} | |
else { | |
console.log({ action: sAction + '.skipping.job.done', id: jobId, type: jobType }); | |
resolve(); | |
} | |
}); | |
},delayMS); | |
}) | |
} | |
queue.process('email', concurrency, (job, ctx, done) => { | |
const oJob = job.toJSON(); | |
const jobId = oJob.id; | |
const jobType = oJob.type; | |
// process.once, Adds a one time listener function for the event named eventName. | |
// The next time eventName is triggered, this listener is removed and then invoked. | |
process.once( 'SIGINT', () => { | |
const sAction = 'SIGINT'; | |
setJobInactive(jobId,jobType,jobInactiveTime) | |
.then( () => { | |
console.error({action: sAction + '.setJobInactive.success' }); | |
}) | |
.catch( (err) => { | |
console.error({action: sAction + '.setJobInactive.err', err:err }); | |
}) | |
ctx.pause(workerShutdownTime, (err) => { | |
if (err) { | |
console.error({action: sAction + '.err', err:err }); | |
} | |
else { | |
console.error({action: sAction + '.gracefully.shutdown' }); | |
} | |
setTimeout( () => { | |
console.info({action: sAction + '.process.exit' }); | |
process.exit(); | |
},killDelayTime); | |
}); | |
}) | |
process.once( 'SIGTERM', () => { | |
const sAction = 'SIGTERM'; | |
setJobInactive(jobId,jobType,jobInactiveTime) | |
.then( () => { | |
console.error({action: sAction + '.setJobInactive.success' }); | |
}) | |
.catch( (err) => { | |
console.error({action: sAction + '.setJobInactive.err', err:err }); | |
}); | |
ctx.pause(workerShutdownTime, (err) => { | |
if (err) { | |
console.error({action: sAction + '.err', err:err }); | |
} | |
else { | |
console.error({action: sAction + '.gracefully.shutdown' }); | |
} | |
setTimeout( () => { | |
console.info({action: sAction + '.process.exit' }); | |
process.exit(); | |
},killDelayTime); | |
}); | |
}) | |
fMessageWorker(job, job.data) | |
.then( (data) => { | |
done(); | |
}) | |
.catch( (err) => { | |
done(err); | |
}) | |
}); | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment