Skip to content

Instantly share code, notes, and snippets.

@hanwencheng
Created July 24, 2015 22:01
Show Gist options
  • Save hanwencheng/be925599c7607bf06d96 to your computer and use it in GitHub Desktop.
Save hanwencheng/be925599c7607bf06d96 to your computer and use it in GitHub Desktop.
rest queue
var mongoDbQueue = require('./mongodb-queue.js');
var logger = require('./logger.js').logger;
var async = require('async');
var respond = require('../lib/response-handler.js');
var Promise = require('bluebird');
var payments = require('../api/payments.js');
const queueCheckInterval = 3000;
const poolSize = 1000;
const visibilityTime = 500;
const maxRetries = 5;
const emptyQueue = 'emptyQueue';
RestQueue.__proto__ = process.EventEmitter.prototype;
function RestQueue(db, callback){
var self = this;
var initialized = false;
this.isSending = false;
//if we tried exceed maxRetry, put it into dead-queue.
var deadQueue = mongoDbQueue(db, 'dead-queue');
var queue = mongoDbQueue(db, 'rest-queue', {
deadQueue : deadQueue ,
visibility : visibilityTime,
maxRetries : maxRetries
});
if(!initialized) {
queue.createIndexes(function (err, indexname) {
// The indexes needed have been added to MongoDB.
logger.info('==the index name is', indexname);
initialized = true;
});
}
this.queue = queue;
this.pool = async.queue(function (msg, callback) {
var request = msg.payload;
var ack = msg.ack;
async.waterfall([
//send the request to the router
function(next){
//TODO send request to payment.js
next(null, null);
},
//get the respond and clear it in queue database.
function(respond, next){
self.queue.ack(ack, function(err, message){
if(err) {
next(err)
}else{
self.emit(message.id, {ok : true, msg : message.payload})
next(err, id);
}
})
}
],function(err, result){
if(err) {
logger.error('[REST-Queue] Error. When acknowledging the message', err)
}
callback(err, result);
});
}, poolSize);
//use this function to periodically check if database is empty.
setInterval(function(){
if(!self.isSending){
self.isSending = true;
//wake up the parallel sending process until the queue is empty
async.forever(
function(callback) {
async.waterfall([
//send the request to the router
function(next) {
self.queue.get(function (err, msg) {
if(err) {
next(err)
}else
//stop the processing
if(!msg){
err = emptyQueue
next(err)
}else {
next(null, msg)
}
});
},
//send it into pool to be processed
function(msg, next){
self.pool.push(msg, function(err){
next(err, msg)
});
}
],function(err, result){
//result is the msg we saved
callback(err, result);
});
},
//execute finished, thread sleep now
function(err) {
if(err && err !== emptyQueue) {
logger.error('[REST-Queue] Error. When checking the queue', err);
}
self.isSending = false;
}
);
}
}, queueCheckInterval);
callback(null, queue);
}
/**
* the function to react when the new payment request comes
* should be called when user submit the request
* @param request should at least include query, param, and body
*/
RestQueue.prototype.register = function(request, response, next){
var self = this;
async.waterfall([
//save into database first.
function(next){
self.queue.add(request, function(err, id){
next(err, id)
})
},
//if it is asynchronous payment, response to frontend instantly.
//and then get one request from the database
function(id, next){
if(request.query.validated === 'true'){
//todo response to frontend
respond.success(response, { msg : 'success saved in our database' });
next(null, request)
}else{
//add event listener and wait until the message is acknowledged
return new Promise(function (resolve, reject) {
self.once(id, function (message) {
if (message.ok) {
resolve(message.msg)
} else {
reject()
}
})
//send the request now
return next(null, request)
}).then(function (msg) {
//todo post-process and response to frontend
logger.info('[REST-Queue] Success receive the ack : ', msg.ack)
}).catch(function (err) {
//skip the send process, direct to main error handler
logger.error('[REST-Queue] Error. When listening to the ack of the message', err);
return next(err)
});
}
},
//now send it to the pool
function(request ,next){
self.send(next)
}
],function(err, result){
//todo send respond to client, where handle error together
logger.error('[REST-Queue] Error. When adding message into queue', err)
callback(err, result)
});
};
/**
* get one from the database and send message(request wrapped in here) to the pool
* @param callback
*/
RestQueue.prototype.send = function(callback){
async.waterfall([
//send the request to the router
function(next) {
self.queue.get(function (err, msg) {
next(err, msg)
});
},
//send it into pool to be processed
function(msg, next){
self.pool.push(msg, function(err){
next(err, msg)
});
}
],function(err, result){
if(err && error !== emptyQueue) {
logger.error('[REST-Queue] Error. When getting from queue and inserting into the pool', err)
}
callback(err, result);
});
};
module.exports = {
restQueue : RestQueue
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment