Created
November 14, 2011 15:56
-
-
Save AndreasMadsen/1364256 to your computer and use it in GitHub Desktop.
SIGTERM issue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright Joyent, Inc. and other Node contributors. | |
// | |
// Permission is hereby granted, free of charge, to any person obtaining a | |
// copy of this software and associated documentation files (the | |
// "Software"), to deal in the Software without restriction, including | |
// without limitation the rights to use, copy, modify, merge, publish, | |
// distribute, sublicense, and/or sell copies of the Software, and to permit | |
// persons to whom the Software is furnished to do so, subject to the | |
// following conditions: | |
// | |
// The above copyright notice and this permission notice shall be included | |
// in all copies or substantial portions of the Software. | |
// | |
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | |
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF | |
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN | |
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, | |
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR | |
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE | |
// USE OR OTHER DEALINGS IN THE SOFTWARE. | |
//TODO: emit a finalized event when all workers are listening on all ports | |
var assert = require('assert'); | |
var os = require('os'); | |
var fork = require('child_process').fork; | |
var net = require('net'); | |
var EventEmitter = require('events').EventEmitter; | |
var util = require('util'); | |
var cluster = module.exports = new EventEmitter(); | |
// Events threre exists: | |
// death, disconnect, fork, spawn, online, listening; | |
var debug; | |
if (process.env.NODE_DEBUG && /cluster/.test(process.env.NODE_DEBUG)) { | |
debug = function(x) { | |
var prefix = process.pid + ',' + | |
(process.env.NODE_WORKER_ID ? 'Worker' : 'Master'); | |
console.error(prefix, x); | |
}; | |
} else { | |
debug = function() { }; | |
} | |
// Used in the master: | |
var masterStarted = false; | |
var ids = 0; | |
var serverHandlers = {}; | |
var workerFilename; | |
var workerArgs; | |
var workerTotal; | |
// Used in the worker: | |
var serverLisenters = {}; | |
var queryIds = 0; | |
var queryCallbacks = {}; | |
cluster.isWorker = 'NODE_WORKER_ID' in process.env; | |
cluster.isMaster = ! cluster.isWorker; | |
//The worker object is only used in a worker | |
cluster.worker = cluster.isWorker ? {} : null; | |
//The workers array is oly used in the naster | |
cluster.workers = cluster.isMaster ? [] : null; | |
//Simple function there call a function on each worker | |
cluster.eachWorker = function(cb) { | |
// This can only be called from the master. | |
assert(cluster.isMaster); | |
// Go througe all workers | |
for (var id in cluster.workers) { | |
if (cluster.workers[id]) { | |
cb(cluster.workers[id]); | |
} | |
} | |
}; | |
// Call this from the master process. It will start child workers. | |
cluster.setupMaster = function(options) { | |
// This can only be called from the master. | |
assert(cluster.isMaster); | |
//Don't allow this function to run more that once | |
if (masterStarted) return; | |
masterStarted = true; | |
//Get filename and arguments | |
options = options || {}; | |
workerFilename = options.exec || process.argv[1]; | |
workerArgs = options.args || process.argv.slice(2); | |
workerTotal = options.workers || os.cpus().length; | |
// Signal handling | |
process.on('SIGINT', function () { | |
console.log('master: SIGINT'); | |
cluster.destroy(function () { | |
process.exit(0); | |
}); | |
}); | |
process.on('SIGTERM', function () { | |
console.log('master: SIGTERM'); | |
cluster.destroy(function () { | |
process.exit(0); | |
}); | |
}); | |
process.on('SIGQUIT', function () { | |
console.log('master: SIGQUIT'); | |
cluster.disconnect(function () { | |
process.exit(0); | |
}); | |
}); | |
process.on('SIGUSR2', cluster.restart.bind(cluster)); | |
process.on('SIGCHLD', cluster.autoFork.bind(cluster)); | |
/* | |
//This is really bad | |
process.on('uncaughtException', function(e) { | |
// Quickly try to kill all the workers. | |
// TODO: be session leader - will cause auto SIGHUP to the children. | |
cluster.eachWorker(function(worker) { | |
debug('kill worker ' + worker.pid); | |
worker.kill(); | |
}); | |
console.error('Exception in cluster master process: ' + | |
e.message + '\n' + e.stack); | |
console.error('Please report this bug.'); | |
process.exit(1); | |
}); | |
*/ | |
}; | |
//Return the number of alive workers | |
cluster.workersOnline = function () { | |
var online = 0; | |
//Kill all workers | |
cluster.eachWorker(function(worker) { | |
if ({'online': 1, 'listening': 1}[worker.state]) { | |
online += 1; | |
} | |
}); | |
return online; | |
}; | |
//Destroy all workers and itself | |
cluster.destroy = function(callback) { | |
// This can only be called from the master. | |
assert(cluster.isMaster); | |
//Kill all workers | |
cluster.eachWorker(function(worker) { | |
console.log('kill worker'); | |
worker.kill(); | |
}); | |
//Call callback | |
if(callback) callback(); | |
}; | |
//disconnect all workers | |
cluster.disconnect = function(callback) { | |
// This can only be called from the master. | |
assert(cluster.isMaster); | |
//How many workers are online | |
var total = cluster.workersOnline(); | |
var disconnected = 0; | |
cluster.eachWorker(function(worker) { | |
//When all workers are disconnected call callback | |
worker.once('disconnect', function () { | |
disconnected += 1; | |
if (disconnected === total) { | |
if(callback) callback(); | |
} | |
}); | |
//Disconnect all workers | |
worker.disconnect(); | |
}); | |
}; | |
cluster.restart = function() { | |
// This can only be called from the master. | |
assert(cluster.isMaster); | |
//First disconnect then fork | |
cluster.disconnect(function () { | |
cluster.autoFork(); | |
}); | |
}; | |
// Check if a message is internal only | |
function isInternalMessage(message) { | |
return (message !== null && | |
typeof message === 'object' && | |
message._internal === true); | |
} | |
// Check if a message require echo | |
function requireEcho(message) { | |
return (message !== null && | |
typeof message === 'object' && | |
message.hasOwnProperty('_requestEcho')); | |
} | |
// Checi if a echo is recived | |
function receiveEcho(message) { | |
return (message !== null && | |
typeof message === 'object' && | |
message.hasOwnProperty('_queryEcho')); | |
} | |
// Create a internal message object | |
function internalMessage(sending, receiving) { | |
sending = sending || {}; | |
receiving = receiving || {}; | |
sending._internal = true; | |
if (requireEcho(receiving)) { | |
sending._queryEcho = receiving._requestEcho; | |
} | |
return sending; | |
} | |
// Handle messages from both master and workers | |
var messageHandingObject = {}; | |
function handleMessage(message, handle, worker) { | |
//Run handler if it exist | |
if (message.cmd && typeof messageHandingObject[message.cmd] === 'function') { | |
messageHandingObject[message.cmd](message, worker); | |
} | |
//Else check for callback request | |
else { | |
if (requireEcho(message)) { | |
worker.send(internalMessage({}, message)); | |
} | |
if (receiveEcho(message)) { | |
queryCallbacks[message._queryEcho](message, handle); | |
delete queryCallbacks[message._queryEcho]; | |
} | |
} | |
} | |
//Messages to the master will be handled using this methods | |
if (cluster.isMaster) { | |
//Handle online messages from workers | |
messageHandingObject.online = function(message, worker) { | |
worker.state = 'online'; | |
debug('Worker ' + worker.process.pid + ' online'); | |
cluster.emit('online', worker); | |
//Send echo if requested | |
if (requireEcho(message)) { | |
worker.send(internalMessage({}, message)); | |
} | |
}; | |
//Handle queryServer messages form workers | |
messageHandingObject.queryServer = function(message, worker) { | |
//This sequence of infomation is unique to the connection but not the worker | |
var args = [message.address, message.port, message.addressType]; | |
var key = args.join(':'); | |
var handler; | |
if (serverHandlers.hasOwnProperty(key)) { | |
handler = serverHandlers[key]; | |
} else { | |
handler = serverHandlers[key] = net._createServerHandle.apply(net, args); | |
} | |
//echo callback id, with the fd handler associated with it | |
worker.send(internalMessage({}, message), handler); | |
}; | |
//Handle listening messages from workers | |
messageHandingObject.listening = function(message, worker) { | |
worker.state = 'listening'; | |
//Emit a listining now that we know the worker is listning | |
cluster.emit('listening', worker, { | |
address: message.address, | |
port: message.port, | |
addressType: message.addressType | |
}); | |
//Send echo if requested | |
if (requireEcho(message)) { | |
worker.send(internalMessage({}, message)); | |
} | |
}; | |
//Handle suicide messages from workers | |
messageHandingObject.suicide = function(message, worker) { | |
worker.suicide = true; | |
//Send echo if requested | |
if (requireEcho(message)) { | |
worker.send(internalMessage({}, message)); | |
} | |
}; | |
//Handle disconnect messages from workers | |
messageHandingObject.disconnect = function(message, worker) { | |
if (message.state === 'setState') { | |
worker.state = 'disconnect'; | |
worker.suicide = true; | |
//Send echo if requested | |
if (requireEcho(message)) { | |
worker.send(internalMessage({}, message)); | |
} | |
} | |
else if (message.state === 'done') { | |
//Send echo if requested before closing channel | |
if (requireEcho(message)) { | |
worker.send(internalMessage({}, message)); | |
} | |
worker.process._channel.close(); | |
worker.emit('disconnect', worker); | |
cluster.emit('disconnect', worker); | |
} | |
}; | |
} | |
//Messages to a worker will be handled using this methods | |
else if (cluster.isWorker) { | |
//Handle disconnect messages from master | |
messageHandingObject.disconnect = function(message, worker) { | |
//Run disconnect | |
worker.disconnect(); | |
//Send echo if requested | |
if (requireEcho(message)) { | |
worker.send(internalMessage({}, message)); | |
} | |
}; | |
} | |
// Create a worker call there works both for master and worker | |
function Worker(env) { | |
if (!(this instanceof Worker)) return new Worker(); | |
var self = this; | |
//Assign an id and state | |
Object.defineProperty(this, 'workerID', { | |
value: (cluster.isMaster ? ++ids : parseInt(process.env.NODE_WORKER_ID, 10)) | |
}); | |
this.state = 'none'; | |
//Create or get process | |
if (cluster.isMaster) { | |
var envCopy = env || {}; | |
for (var x in process.env) { | |
envCopy[x] = process.env[x]; | |
} | |
envCopy['NODE_WORKER_ID'] = this.workerID; | |
self.process = fork(workerFilename, workerArgs, { env: envCopy }); | |
} else { | |
self.process = process; | |
} | |
//Handle message | |
this.process.on('message', function(message, handle) { | |
debug('recived: ', message); | |
//If this is an internal message handle it and ignore the rest | |
if (isInternalMessage(message)) { | |
handleMessage(message, handle, self); | |
return undefined; | |
} | |
//Check if a echo is required | |
if (requireEcho(message)) { | |
self.send({ _queryEcho: message._requestEcho }); | |
} | |
//Check if a echo is recived | |
if (queryEcho(message)) { | |
queryCallbacks[msg._queryEcho](msg, handle); | |
delete queryCallbacks[msg._queryEcho]; | |
} | |
//Emit message | |
self.emit('message', message, self); | |
}); | |
//Handle exit | |
self.process.on('exit', function() { | |
debug('worker id=' + self.workerID + ' died'); | |
//set state to dead so the user can check for old worker objects | |
self.state = 'dead'; | |
//Make suicide a boolean | |
self.suicide = !!self.suicide; | |
//Remove from workers in the master | |
if (cluster.isMaster) { | |
delete cluster.workers[self.workerID]; | |
} | |
//Emit exit and death | |
self.emit('exit', self); | |
cluster.emit('death', self); | |
}); | |
//Handle disconnect | |
self.on('disconnect', function() { | |
//set state to disconnect | |
self.disconnect = 'disconnect'; | |
//Make suicide a boolean | |
self.suicide = !!self.suicide; | |
//Remove from workers in the master | |
if (cluster.isMaster) { | |
delete cluster.workers[self.workerID]; | |
} | |
//Emit exit and death | |
self.emit('exit', self); | |
cluster.emit('death', self); | |
}); | |
} | |
util.inherits(Worker, EventEmitter); | |
//Send message to worker or master | |
Worker.prototype.send = function(/*message, handler, callback*/) { | |
debug('send ' + JSON.stringify(message)); | |
//Exist callback | |
var callback = arguments[arguments.length - 1]; | |
if (typeof callback !== 'function') { | |
callback = undefined; | |
} | |
//Get message and handler as array | |
var slice = Array.prototype.slice; | |
var sliceTo = callback ? arguments.length - 1 : arguments.length; | |
var message = slice.call(arguments, 0, sliceTo); | |
// Store callback for later. | |
if (callback) { | |
message[0] = message[0] || {}; | |
// Grab some random requestEcho string | |
message[0]._requestEcho = this.workerID + ':' + (++queryIds); | |
queryCallbacks[message[0]._requestEcho] = callback; | |
} | |
// Send message | |
this.process.send.apply(this.process, message); | |
}; | |
// Kill the worker without restarting | |
Worker.prototype.kill = function() { | |
var self = this; | |
this.suicide = true; | |
if (cluster.isMaster) { | |
this.process.kill(); | |
} else { | |
//Inform master that is is suicide and then kill | |
this.send(internalMessage({cmd: 'suicide'}), function() { | |
self.process.kill(); | |
}); | |
} | |
}; | |
// Kill the worker without restarting | |
Worker.prototype.disconnect = function() { | |
var self = this; | |
if (cluster.isMaster) { | |
//Inform worker that is should disconnect from the master | |
this.send(internalMessage({cmd: 'disconnect'})); | |
} else { | |
//Inform master that about state and suicide and make it emit disconnect | |
this.send(internalMessage({ | |
cmd: 'disconnect', state: 'setState' | |
}), function() { | |
var item; | |
//Predefine closeState | |
var closeState = {master: false}; | |
for (item in serverLisenters) { | |
if (serverLisenters.hasOwnProperty(item)) { | |
closeState[item] = false; | |
} | |
} | |
//Check closeState | |
var setState = function(name) { | |
return function() { | |
//Set State | |
closeState[name] = true; | |
//Check all closeStates | |
var state; | |
for (state in closeState) { | |
if (closeState.hasOwnProperty(state) && closeState === false) { | |
return undefined; | |
} | |
} | |
//Emit a disconnect if all closeState are true | |
self.emit('disconnect', self); | |
}; | |
}; | |
//Close TCP connections | |
for (item in serverLisenters) { | |
if (serverLisenters.hasOwnProperty(item)) { | |
//Close TCP connection and set closeState when done | |
serverLisenters[item].once('end', setState(item)); | |
serverLisenters[item].close(); | |
} | |
} | |
//Make master emit a disconnect event | |
self.send(internalMessage({ | |
cmd: 'disconnect', state: 'done' | |
}), function() { | |
//Close connection to the master | |
setState('master')(); | |
}); | |
}); | |
} | |
}; | |
// Fork a new worker | |
cluster.fork = function(env) { | |
// This can only be called from the master. | |
assert(cluster.isMaster); | |
// Make sure that the master has been initalized | |
cluster.setupMaster(); | |
// Create and store worker | |
var worker = new Worker(env); | |
cluster.workers[worker.workerID] = worker; | |
//Emit a fork event | |
cluster.emit('fork', worker); | |
return worker; | |
}; | |
// Spawn all necessary workers | |
cluster.autoFork = function() { | |
// This can only be called from the master. | |
assert(cluster.isMaster); | |
//Make sure that the master is inialized | |
cluster.setupMaster(); | |
//Make sure that there hasn't been spawned any workers | |
if (cluster.workersOnline() >= workerTotal) { | |
return; | |
} | |
//Spawn workers | |
var i = workerTotal; | |
while (i--) { | |
cluster.fork(); | |
} | |
//Restart workers when they die | |
cluster.on('death', function(worker) { | |
if (worker.suicide === false) { | |
debug('worker ' + worker.process.pid + ' died. restart...'); | |
cluster.fork(); | |
} | |
}); | |
}; | |
// Internal function. Called from src/node.js when worker process starts. | |
cluster._setupWorker = function() { | |
// This can only be called from a worker. | |
assert(cluster.isWorker); | |
// Get worker class | |
var worker = cluster.worker = new Worker(); | |
// signal handlers | |
process.on('SIGINT', function () { | |
console.log("worker: SIGINT"); | |
worker.kill.bind(worker); | |
}); | |
process.on('SIGTERM', function () { | |
console.log("worker: SIGTERM"); | |
worker.kill.bind(worker); | |
}); | |
process.on('SIGQUIT', worker.disconnect.bind(worker)); | |
//Tell master that the worker is online | |
worker.state = 'online'; | |
worker.send(internalMessage({ cmd: 'online' })); | |
}; | |
// Internal function. Called by lib/net.js when attempting to bind a server. | |
cluster._getServer = function(tcpSelf, address, port, addressType, cb) { | |
// This can only be called from a worker. | |
assert(cluster.isWorker); | |
//Store tcp instance for later use | |
var key = [address, port, addressType].join(':'); | |
serverLisenters[key] = tcpSelf; | |
//Send a listening message to the master | |
tcpSelf.once('listening', function() { | |
cluster.worker.state = 'listening'; | |
cluster.worker.send(internalMessage({ | |
cmd: 'listening', | |
address: address, | |
port: port, | |
addressType: addressType | |
})); | |
}); | |
//Request the fd handler from the master process | |
var message = internalMessage({ | |
cmd: 'queryServer', | |
address: address, | |
port: port, | |
addressType: addressType | |
}); | |
// The callback will be stored until the master has responed | |
cluster.worker.send(message, function(msg, handle) { | |
cb(handle); | |
}); | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment