Created
August 3, 2018 15:16
-
-
Save cjus/719d77dd9816115d94083234c293c18b to your computer and use it in GitHub Desktop.
Sample job queue code
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const redis = require('redis'); | |
const moment = require('moment'); | |
class JobQueue { | |
constructor() { | |
this.config = null; | |
this.redisdb = null; | |
this.redisKey; | |
} | |
/** | |
* @name init | |
* @summary initialize with config object | |
* @param {object} config - configurtion object containing specific keys/values | |
* @return {promise} promise - resolves once database is connected or failure | |
*/ | |
init(config) { | |
this.config = config; | |
this.redisKey = this.config.jobQueueBaseName; | |
this.jobProcessorName = this.config.jobProcessorName; | |
return this._configAccessToRedis(); | |
} | |
/** | |
* @name close | |
* @summary close the redis connection | |
*/ | |
close() { | |
this.redisdb.quit(); | |
} | |
/** | |
* @name _configAccessToRedis | |
* @summary Configure access to redis and monitor emitted events | |
* @private | |
* @return {promise} promise - resolves once database is connected or failure | |
*/ | |
_configAccessToRedis() { | |
// from the node redis documentation site: https://github.com/NodeRedis/node_redis | |
// TODO: validate that these values are reasonable. | |
return new Promise((resolve, reject) => { | |
let config = this.config; | |
try { | |
let redisOptions = { | |
retry_strategy: (options) => { | |
if (options.error.code === 'ECONNREFUSED') { | |
// End reconnecting on a specific error and flush all commands with a individual error | |
return new Error('The server refused the connection'); | |
} | |
if (options.total_retry_time > 1000 * 60 * 60) { | |
// End reconnecting after a specific timeout and flush all commands with a individual error | |
return new Error('Retry time exhausted'); | |
} | |
if (options.times_connected > 10) { | |
// End reconnecting with built in error | |
return undefined; | |
} | |
// reconnect after | |
return Math.max(options.attempt * 100, 3000); | |
} | |
}; | |
this.redisdb = redis.createClient(config.redis.port, config.redis.url, redisOptions); | |
this.redisdb | |
.on('connect', () => { | |
this.redisdb.select(config.redis.db, (err, data) => { | |
resolve(); | |
}); | |
}); | |
} catch (e) { | |
reject(); | |
} | |
}); | |
} | |
/** | |
* @name _safeJSONStringify | |
* @summary Safe JSON stringify | |
* @private | |
* @param {object} obj - object to stringify | |
* @return {string} string - stringified object. | |
* Returns undefined if the object can't be stringified | |
*/ | |
_safeJSONStringify(obj) { | |
let data; | |
try { | |
data = JSON.stringify(obj); | |
} catch (e) { | |
} | |
return data; | |
} | |
/** | |
* @name _safeJSONParse | |
* @summary Safe JSON parse | |
* @private | |
* @param {string} str - string which will be parsed | |
* @return {object} obj - parsed object | |
* Returns undefined if string can't be parsed into an object | |
*/ | |
_safeJSONParse(str) { | |
let data; | |
try { | |
data = JSON.parse(str); | |
} catch (e) { | |
} | |
return data; | |
} | |
/** | |
* @name clearJobQueue | |
* @summary clears the queue in prep for new job processing | |
*/ | |
clearJobQueue() { | |
this.redisdb.del(`${this.redisKey}:queued`); | |
} | |
/** | |
* @name clearProcessingQueue | |
* @summary clears the processing queue in prep for new job processing | |
*/ | |
clearProcessingQueue() { | |
this.redisdb.del(`${this.redisKey}:${this.jobProcessorName}:processing`); | |
} | |
/** | |
* @name enqueue | |
* @summary Push a job into a job queue | |
* @param {object} jobObj - job object which will be queued | |
* @return {promise} promise - returns a promise | |
*/ | |
enqueue(jobObj) { | |
return new Promise((resolve, reject) => { | |
let js = Object.assign({}, jobObj, { | |
_ts: moment().unix() | |
}); | |
js = this._safeJSONStringify(js); | |
if (!js) { | |
reject(new Error('unable to stringify object')); | |
return; | |
} | |
this.redisdb.rpush(`${this.redisKey}:queued`, js, (err, data) => { | |
if (err) { | |
reject(err); | |
} else { | |
resolve(data); | |
} | |
}); | |
}); | |
} | |
/** | |
* @name dequeue | |
* @summary Removes a job from the job queue and moves it into the in-processing queue | |
* @return {promise} promise - returns a promise resolving to the dequeued job. | |
*/ | |
dequeue() { | |
return new Promise((resolve, reject) => { | |
this.redisdb.rpoplpush(`${this.redisKey}:queued`, `${this.redisKey}:${this.jobProcessorName}:processing`, (err, data) => { | |
if (err) { | |
reject(err); | |
} else { | |
let job = this._safeJSONParse(data); | |
resolve(job); | |
} | |
}); | |
}); | |
} | |
/** | |
* @name completed | |
* @summary mark a job as completed, by removing it from the in-processing queue | |
* @param {object} job - job which will be marked as completed | |
* @return {promise} promise - returns a promise resolving to the completed job. | |
*/ | |
completed(job) { | |
return new Promise((resolve, reject) => { | |
this.redisdb.lrem(`${this.redisKey}:${this.jobProcessorName}:processing`, -1, this._safeJSONStringify(job), (err, data) => { | |
if (err) { | |
reject(err); | |
} else { | |
let job = this._safeJSONParse(data); | |
resolve(job); | |
} | |
}); | |
}); | |
} | |
/** | |
* @name _recover | |
* @summary Called by the recover function | |
* @param {function} resolve - called to resolve a promise | |
* @param {function} reject - called to rejecx a promise | |
* @private | |
*/ | |
_doRecover(resolve, reject) { | |
this.redisdb.rpoplpush(`${this.redisKey}:${this.jobProcessorName}:processing`, `${this.redisKey}:queued`, (err, data) => { | |
if (err) { | |
reject(err); | |
} else { | |
if (!data) { | |
resolve(); | |
} else { | |
this._doRecover(resolve, reject); | |
} | |
} | |
}); | |
} | |
/** | |
* @name recover | |
* @summary Used when client processor has stopped (crashed?) and is restarting. | |
* @description On restart recover() is called to move all of the clients jobs | |
* from its queue back to the main queue. The thinking here is that | |
* in a recovery suituation the client could use help resolving the jobs | |
* when there are other clients able to help. In either case, the client | |
* can resume dequeing jobs after this call. | |
* @return {promise} promise - resolves when there are are no more jobs in the clients queue. | |
*/ | |
recover() { | |
return new Promise((resolve, reject) => { | |
this._doRecover(resolve, reject); | |
}); | |
} | |
} | |
module.exports.createClient = function() { | |
let jobQueue = new JobQueue(); | |
return jobQueue; | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment