Skip to content

Instantly share code, notes, and snippets.

@freeart
Created November 15, 2014 14:02
Show Gist options
  • Save freeart/1c96490756f4401dc8c8 to your computer and use it in GitHub Desktop.
Save freeart/1c96490756f4401dc8c8 to your computer and use it in GitHub Desktop.
parser
var cluster = require('cluster');
var os = require('os');
var async = require('async');
var strftime = require('strftime');
var argv = require('yargs').argv;
var config = require(__dirname + '/config/' + argv.conf + '.json');
var logger = {
trace: function () {
//var args = Array.prototype.slice.call(arguments);
//args.unshift(strftime('%F %T', new Date()));
//console.log.apply(this, args);
},
debug: function () {
//var args = Array.prototype.slice.call(arguments);
//args.unshift(strftime('%F %T', new Date()));
//console.log.apply(this, args);
},
log: function () {
var args = Array.prototype.slice.call(arguments);
args.unshift(strftime('%F %T', new Date()));
console.log.apply(this, args);
},
info: function () {
var args = Array.prototype.slice.call(arguments);
args.unshift(strftime('%F %T', new Date()));
console.log.apply(this, args);
},
system: function () {
var args = Array.prototype.slice.call(arguments);
args.unshift(strftime('%F %T', new Date()));
console.log.apply(this, args);
},
warn: function () {
var args = Array.prototype.slice.call(arguments);
args.unshift(strftime('%F %T', new Date()));
console.log.apply(this, args);
},
error: function () {
var args = Array.prototype.slice.call(arguments);
args.unshift(strftime('%F %T', new Date()));
console.log.apply(this, args);
},
fatal: function () {
var args = Array.prototype.slice.call(arguments);
args.unshift(strftime('%F %T', new Date()));
console.log.apply(this, args);
}
}
if (cluster.isMaster) {
var uid = argv.conf + (argv.reseller_id ? '-' + argv.reseller_id : '');
var d = require('domain').create();
d.on('error', function (er) {
logger.error('domain master', er.message, er.stack);
require('fs').unlinkSync('/var/run/index.' + uid + '.pid')
process.exit(0);
});
d.run(function () {
if (require('fs').existsSync('/var/run/index.' + uid + '.pid')) {
logger.system(argv.conf + ' is already running')
process.exit(0);
}
require('fs').writeFileSync('/var/run/index.' + uid + '.pid', process.pid);
var threads = config.threads || os.cpus().length;
var activethreads = 0;
function run(count) {
var service = {};
(config.undead instanceof Array ? config.undead : []).forEach(function (module) {
service[module] = function (cb) {
require('./service/' + module + '.js').create({config: config, argv: argv, logger: logger}, cb);
}
});
async.parallel(service, function () {
logger.system('running ' + argv.conf + ' in ' + count + ' threads')
for (var i = 0; i < count; i += 1) {
activethreads++;
cluster.fork();
}
});
}
process.on('SIGHUP', function () {
for (var worker in cluster.workers) {
cluster.workers[worker].send({cmd: 'shutdown'});
}
});
process.on('SIGTERM', function () {
config.undead = false;
logger.system('SIGTERM', 'undead is false');
});
cluster.on('exit', function (worker, code, signal) {
logger.system('worker %d died (%s).', worker.process.pid, signal || code);
activethreads--;
if (activethreads <= 0) {
if (config.undead) {
logger.system('restart application');
run(threads);
} else {
logger.system('exit application');
require('fs').unlinkSync('/var/run/index.' + uid + '.pid')
process.exit(0);
}
}
});
run(threads);
});
} else if (cluster.isWorker) {
var d = require('domain').create();
d.on('error', function (er) {
logger.error('domain main', er.message, er.stack);
cluster.worker.kill();
});
d.run(function () {
async.auto({
argv: function (cb) {
cb(null, argv);
},
config: function (cb) {
cb(null, config);
},
cluster: function (cb) {
cb(null, cluster);
},
proxy: function (cb) {
var fs = require('fs');
var UUID = require('./helper/uuidv5.js');
var jf = require('jsonfile')
if (config.proxyFiles instanceof Array) {
async.concatSeries(config.proxyFiles, function (item, cb) {
var filename = UUID.v5(UUID.URL, item);
jf.readFile(__dirname + '/proxy/' + filename + '.json', cb)
}, cb);
} else if (config.proxyUrls instanceof Array) {
var proxyReader = require('./helper/proxyreader.js');
async.concatSeries(config.proxyUrls, function (item, cb) {
setTimeout(function () {
proxyReader.read(item, function (err, json) {
if (!err) {
var filename = UUID.v5(UUID.URL, item);
jf.writeFileSync(__dirname + '/proxy/' + filename + '.json', json)
}
cb(err, json)
});
}, 1000)
}, cb);
} else {
cb(null, []);
}
},
logger: function (cb) {
cb(null, logger)
},
cron: function (cb) {
var CronJob = require('cron').CronJob;
var jobs = [];
var cron = {
addJob: function (pattern, fn) {
var job = new CronJob(pattern, fn);
jobs.push(job);
job.start();
},
quit: function (cb) {
async.whilst(
function () {
return (jobs.length > 0)
},
function (cb) {
var job = jobs.pop();
job.stop();
setImmediate(cb, null);
},
function (err) {
cb(err)
}
);
}
}
cb(null, cron);
},
redis: function (cb) {
var redis = require("redis"),
client = redis.createClient();
cb(null, {client: client, server: redis})
},
pool: function (cb) {
var poolModule = require('./helper/pool.js');
var pools = [];
cb(null, {
quit: function (cb) {
async.whilst(
function () {
return (pools.length > 0)
},
function (cb) {
var pool = pools.pop();
pool.drain(cb);
}, cb);
},
create: function (factory) {
var pool = poolModule.Pool(factory);
pools.push(pool);
return pool;
}
});
},
poolPg: function (cb) {
var pg = require('./helper/pg_helper.js');
cb(null, pg.pool);
},
queue: function (cb) {
var amqp = require('./helper/amqp.js');
amqp.connect(cb)
},
models: ['config', function (cb, scope) {
var models = {};
scope.logger.debug('models to load', scope.config.models.length);
scope.config.models.forEach(function (module) {
scope.logger.debug('load', './models/' + module + '.js');
models[module] = function (cb) {
require('./models/' + module + '.js').create(scope, cb);
}
});
async.parallel(models, cb);
}],
service: ['argv', 'config', 'logger', 'cron', 'redis', 'pool', 'poolPg', 'proxy', 'queue', 'models', 'cluster', function (cb, scope) {
var service = {};
scope.logger.debug('models to load', scope.config.service.length);
scope.config.service.forEach(function (module) {
scope.logger.debug('load', './service/' + module + '.js');
service[module] = function (cb) {
require('./service/' + module + '.js').create(scope, cb);
}
});
async.parallel(service, cb);
}]
}, function (err, scope) {
if (err) {
scope.logger.fatal('main', err);
cluster.worker.kill();
} else {
scope.logger.system('waiting for jobs...')
cluster.worker.process.on('message', function (msg) {
if (msg.cmd === 'shutdown') {
scope.queue.drain();
async.series([
function (cb) {
scope.cron.quit(cb);
},
function (cb) {
scope.pool.quit(cb);
},
function (cb) {
scope.poolPg.quit(cb);
},
function (cb) {
scope.redis.client.quit(cb);
},
function (cb) {
scope.queue.quit(cb);
}
],
function (err, results) {
logger.system('ready to die', err)
cluster.worker.kill();
}
);
}
});
}
});
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment