Created
July 24, 2015 22:01
-
-
Save hanwencheng/be925599c7607bf06d96 to your computer and use it in GitHub Desktop.
rest queue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var mongoDbQueue = require('./mongodb-queue.js'); | |
var logger = require('./logger.js').logger; | |
var async = require('async'); | |
var respond = require('../lib/response-handler.js'); | |
var Promise = require('bluebird'); | |
var payments = require('../api/payments.js'); | |
const queueCheckInterval = 3000; | |
const poolSize = 1000; | |
const visibilityTime = 500; | |
const maxRetries = 5; | |
const emptyQueue = 'emptyQueue'; | |
RestQueue.__proto__ = process.EventEmitter.prototype; | |
function RestQueue(db, callback){ | |
var self = this; | |
var initialized = false; | |
this.isSending = false; | |
//if we tried exceed maxRetry, put it into dead-queue. | |
var deadQueue = mongoDbQueue(db, 'dead-queue'); | |
var queue = mongoDbQueue(db, 'rest-queue', { | |
deadQueue : deadQueue , | |
visibility : visibilityTime, | |
maxRetries : maxRetries | |
}); | |
if(!initialized) { | |
queue.createIndexes(function (err, indexname) { | |
// The indexes needed have been added to MongoDB. | |
logger.info('==the index name is', indexname); | |
initialized = true; | |
}); | |
} | |
this.queue = queue; | |
this.pool = async.queue(function (msg, callback) { | |
var request = msg.payload; | |
var ack = msg.ack; | |
async.waterfall([ | |
//send the request to the router | |
function(next){ | |
//TODO send request to payment.js | |
next(null, null); | |
}, | |
//get the respond and clear it in queue database. | |
function(respond, next){ | |
self.queue.ack(ack, function(err, message){ | |
if(err) { | |
next(err) | |
}else{ | |
self.emit(message.id, {ok : true, msg : message.payload}) | |
next(err, id); | |
} | |
}) | |
} | |
],function(err, result){ | |
if(err) { | |
logger.error('[REST-Queue] Error. When acknowledging the message', err) | |
} | |
callback(err, result); | |
}); | |
}, poolSize); | |
//use this function to periodically check if database is empty. | |
setInterval(function(){ | |
if(!self.isSending){ | |
self.isSending = true; | |
//wake up the parallel sending process until the queue is empty | |
async.forever( | |
function(callback) { | |
async.waterfall([ | |
//send the request to the router | |
function(next) { | |
self.queue.get(function (err, msg) { | |
if(err) { | |
next(err) | |
}else | |
//stop the processing | |
if(!msg){ | |
err = emptyQueue | |
next(err) | |
}else { | |
next(null, msg) | |
} | |
}); | |
}, | |
//send it into pool to be processed | |
function(msg, next){ | |
self.pool.push(msg, function(err){ | |
next(err, msg) | |
}); | |
} | |
],function(err, result){ | |
//result is the msg we saved | |
callback(err, result); | |
}); | |
}, | |
//execute finished, thread sleep now | |
function(err) { | |
if(err && err !== emptyQueue) { | |
logger.error('[REST-Queue] Error. When checking the queue', err); | |
} | |
self.isSending = false; | |
} | |
); | |
} | |
}, queueCheckInterval); | |
callback(null, queue); | |
} | |
/** | |
* the function to react when the new payment request comes | |
* should be called when user submit the request | |
* @param request should at least include query, param, and body | |
*/ | |
RestQueue.prototype.register = function(request, response, next){ | |
var self = this; | |
async.waterfall([ | |
//save into database first. | |
function(next){ | |
self.queue.add(request, function(err, id){ | |
next(err, id) | |
}) | |
}, | |
//if it is asynchronous payment, response to frontend instantly. | |
//and then get one request from the database | |
function(id, next){ | |
if(request.query.validated === 'true'){ | |
//todo response to frontend | |
respond.success(response, { msg : 'success saved in our database' }); | |
next(null, request) | |
}else{ | |
//add event listener and wait until the message is acknowledged | |
return new Promise(function (resolve, reject) { | |
self.once(id, function (message) { | |
if (message.ok) { | |
resolve(message.msg) | |
} else { | |
reject() | |
} | |
}) | |
//send the request now | |
return next(null, request) | |
}).then(function (msg) { | |
//todo post-process and response to frontend | |
logger.info('[REST-Queue] Success receive the ack : ', msg.ack) | |
}).catch(function (err) { | |
//skip the send process, direct to main error handler | |
logger.error('[REST-Queue] Error. When listening to the ack of the message', err); | |
return next(err) | |
}); | |
} | |
}, | |
//now send it to the pool | |
function(request ,next){ | |
self.send(next) | |
} | |
],function(err, result){ | |
//todo send respond to client, where handle error together | |
logger.error('[REST-Queue] Error. When adding message into queue', err) | |
callback(err, result) | |
}); | |
}; | |
/** | |
* get one from the database and send message(request wrapped in here) to the pool | |
* @param callback | |
*/ | |
RestQueue.prototype.send = function(callback){ | |
async.waterfall([ | |
//send the request to the router | |
function(next) { | |
self.queue.get(function (err, msg) { | |
next(err, msg) | |
}); | |
}, | |
//send it into pool to be processed | |
function(msg, next){ | |
self.pool.push(msg, function(err){ | |
next(err, msg) | |
}); | |
} | |
],function(err, result){ | |
if(err && error !== emptyQueue) { | |
logger.error('[REST-Queue] Error. When getting from queue and inserting into the pool', err) | |
} | |
callback(err, result); | |
}); | |
}; | |
module.exports = { | |
restQueue : RestQueue | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment