Skip to content

Instantly share code, notes, and snippets.

@mauritslamers
Last active October 8, 2017 09:46
Show Gist options
  • Save mauritslamers/1e5e24207e389dd00373b57b1f0ce359 to your computer and use it in GitHub Desktop.
Save mauritslamers/1e5e24207e389dd00373b57b1f0ce359 to your computer and use it in GitHub Desktop.
Changes to socketcluster to accomodate loading the worker as ES6 module
// One change: expose SCWorker by adding the following line as last line
module.exports.SCWorker = require('./lib/scworker');
/*
CHANGES
- run the worker by calling this.run() in SCWorker.prototype._startServer. There is no need to pass the worker
along as it is available as this in the run function.
- Added a setup function which sets up the listeners for the worker, and saves the user options (such as the run function) in
the message handler through a closure.
- currently the run function is set as this.run in SCWorker.prototype._init, but it would be more consistent with the
current implementation to keep it as part of the options (as it is passed in) and call it as this.options.run(this) from
SCWorker.prototype._startServer.
*/
var socketClusterServer = require('socketcluster-server');
var EventEmitter = require('events').EventEmitter;
var crypto = require('crypto');
var uuid = require('uuid');
var http = require('http');
var https = require('https');
var fs = require('fs');
var base64id = require('base64id');
var async = require('async');
var AuthEngine = require('sc-auth').AuthEngine;
var scErrors = require('sc-errors');
var InvalidActionError = scErrors.InvalidActionError;
var ResourceLimitError = scErrors.ResourceLimitError;
var BrokerError = scErrors.BrokerError;
var HTTPServerError = scErrors.HTTPServerError;
var TimeoutError = scErrors.TimeoutError;
var processTermTimeout = 10000;
var handleError = function (isFatal, err) {
var error = scErrors.dehydrateError(err, true);
process.send({
type: 'error',
data: {
error: error,
workerPid: process.pid
}
}, null, function () {
if (isFatal) {
process.exit(1);
}
});
};
var handleWarning = function (warning) {
var warning = scErrors.dehydrateError(warning, true);
process.send({
type: 'warning',
data: {
error: warning,
workerPid: process.pid
}
});
};
var handleReady = function () {
process.send({type: 'ready'});
};
var handleExit = function () {
process.exit();
};
var worker;
var createMessageHandler = function (userOptions) {
return function (m) {
if (m.type == 'init') {
if (m.data.processTermTimeout) {
processTermTimeout = m.data.processTermTimeout;
}
if (m.data && m.data.protocolOptions && m.data.protocolOptions.pfx) {
m.data.protocolOptions.pfx = new Buffer(m.data.protocolOptions.pfx, 'base64');
}
if (typeof m.data.authKey === 'object' && m.data.authKey !== null && m.data.authKey.type === 'Buffer') {
m.data.authKey = new Buffer(m.data.authKey.data, 'base64');
}
// merge m.data + userOptions
var workerOptions = Object.assign({}, m.data, (userOptions || {}));
global.worker = worker = new SCWorker(workerOptions);
if (m.data.propagateErrors) {
worker.on('error', handleError.bind(null, worker.options.crashWorkerOnError));
if (m.data.propagateWarnings) {
worker.on('warning', handleWarning);
}
worker.on('exit', handleExit);
}
worker.on('ready', function () {
worker.start();
handleReady();
});
} else if (m.type == 'emit') {
if (m.data) {
worker.handleMasterEvent(m.event, m.data);
} else {
worker.handleMasterEvent(m.event);
}
} else if (m.type == 'masterMessage') {
worker.handleMasterMessage(m);
} else if (m.type == 'masterResponse') {
worker.handleMasterResponse(m);
} else if (m.type == 'terminate') {
if (worker && !m.data.immediate) {
worker.close(function () {
process.exit();
});
setTimeout(function () {
process.exit();
}, processTermTimeout);
} else {
process.exit();
}
}
};
}
var SCWorker = function (options) {
var self = this;
this.EVENT_ERROR = 'error';
this.EVENT_WARNING = 'warning';
this.EVENT_EXIT = 'exit';
this.EVENT_READY = 'ready';
this.EVENT_CONNECTION = 'connection';
this.MIDDLEWARE_START = 'start';
this.type = 'worker';
self._pendingResponseHandlers = {};
if (options.run) this.run = options.run;
this._init(options);
};
SCWorker.prototype = Object.create(EventEmitter.prototype);
SCWorker.prototype.setAuthEngine = function (authEngine) {
this.auth = authEngine;
this.httpServer.auth = this.auth;
this.scServer.setAuthEngine(this.auth);
};
SCWorker.prototype.setCodecEngine = function (codecEngine) {
this.codec = codecEngine;
this.scServer.setCodecEngine(this.codec);
};
SCWorker.prototype._init = function (options) {
var self = this;
this.options = {};
for (var i in options) {
if (options.hasOwnProperty(i)) {
this.options[i] = options[i];
}
}
this.id = this.options.id;
this.isLeader = this.id == 0;
this._middleware = {};
this._middleware[this.MIDDLEWARE_START] = [];
if (this.options.downgradeToUser && process.setuid) {
try {
process.setuid(this.options.downgradeToUser);
} catch (err) {
throw new InvalidActionError('Could not downgrade to user "' + this.options.downgradeToUser +
'" - Either this user does not exist or the current process does not have the permission' +
' to switch to it');
}
}
this.brokerEngine = require(this.options.brokerEngine);
this._paths = options.paths;
this._httpRequestCount = 0;
this._wsRequestCount = 0;
this._httpRPM = 0;
this._wsRPM = 0;
this.brokerEngineClient = new this.brokerEngine.Client({
brokers: this.options.brokers,
secretKey: this.options.secretKey,
pubSubBatchDuration: this.options.pubSubBatchDuration,
connectRetryErrorThreshold: this.options.brokerConnectRetryErrorThreshold
});
this.brokerEngineClient.on('error', function (err) {
var error;
if (typeof err == 'string') {
error = new BrokerError(err);
} else {
error = err;
}
self.emitError(error);
});
this.brokerEngineClient.on('warning', function (warning) {
self.emitWarning(warning);
});
this.exchange = this.global = this.brokerEngineClient.exchange();
if (this.options.httpServerModule) {
var httpServerFactory = require(this.options.httpServerModule);
this.httpServer = httpServerFactory.createServer(this.options.protocolOptions);
} else {
if (this.options.protocol == 'https') {
this.httpServer = https.createServer(this.options.protocolOptions);
} else {
this.httpServer = http.createServer();
}
}
this.httpServer.on('request', this._httpRequestHandler.bind(this));
this.httpServer.on('upgrade', this._httpRequestHandler.bind(this));
this.httpServer.exchange = this.httpServer.global = this.exchange;
this.httpServer.on('error', function (err) {
var error;
if (typeof err == 'string') {
error = new HTTPServerError(err);
} else {
error = err;
}
self.emitError(error);
});
var secure = this.options.protocol == 'https' ? 1 : 0;
this.scServer = socketClusterServer.attach(this.httpServer, {
brokerEngine: this.brokerEngineClient,
wsEngine: this._paths.wsEnginePath,
allowClientPublish: this.options.allowClientPublish,
handshakeTimeout: this.options.handshakeTimeout,
ackTimeout: this.options.ackTimeout,
pingTimeout: this.options.pingTimeout,
pingInterval: this.options.pingInterval,
origins: this.options.origins,
appName: this.options.appName,
path: this.options.path,
authKey: this.options.authKey,
authPrivateKey: this.options.authPrivateKey,
authPublicKey: this.options.authPublicKey,
authAlgorithm: this.options.authAlgorithm,
authSignAsync: this.options.authSignAsync,
authVerifyAsync: this.options.authVerifyAsync,
authDefaultExpiry: this.options.authDefaultExpiry,
middlewareEmitWarnings: this.options.middlewareEmitWarnings,
socketChannelLimit: this.options.socketChannelLimit,
pubSubBatchDuration: this.options.pubSubBatchDuration,
perMessageDeflate: this.options.perMessageDeflate
});
if (this.brokerEngineClient.setSCServer) {
this.brokerEngineClient.setSCServer(this.scServer);
}
// Default authentication engine
this.setAuthEngine(new AuthEngine());
this.codec = this.scServer.codec;
this._socketPath = this.scServer.getPath();
this._socketPathRegex = new RegExp('^' + this._socketPath);
this.scServer.on('_connection', function (socket) {
// The connection event counts as a WS request
self._wsRequestCount++;
socket.on('message', function () {
self._wsRequestCount++;
});
self.emit(self.EVENT_CONNECTION, socket);
});
this.scServer.on('warning', function (warning) {
self.emitWarning(warning);
});
this.scServer.on('error', function (error) {
self.emitError(error);
});
this.scServer.on('ready', function () {
self.emit(self.EVENT_READY);
});
};
SCWorker.prototype.open = function () {
this._startServer();
};
SCWorker.prototype.close = function (callback) {
this.scServer.close();
this.httpServer.close(callback);
};
// getSocketURL is deprecated
SCWorker.prototype.getSocketPath = SCWorker.prototype.getSocketURL = function () {
return this._socketPath;
};
SCWorker.prototype.addMiddleware = function (type, middleware) {
this._middleware[type].push(middleware);
};
SCWorker.prototype.removeMiddleware = function (type, middleware) {
var middlewareFunctions = this._middleware[type];
this._middleware[type] = middlewareFunctions.filter(function (fn) {
return fn != middleware;
});
};
SCWorker.prototype._startServer = function () {
var self = this;
if (this.run) this.run();
else console.log('no run function found');
var options = this.options;
var start = function () {
if (options.tcpSynBacklog != null) {
self.httpServer.listen(options.sourcePort, options.host, options.tcpSynBacklog);
} else if (options.host != null) {
self.httpServer.listen(options.sourcePort, options.host);
} else {
self.httpServer.listen(options.sourcePort);
}
};
var startMiddleware = this._middleware[this.MIDDLEWARE_START];
if (startMiddleware.length) {
var callbackInvoked = false;
async.applyEachSeries(startMiddleware, options, function (err) {
if (callbackInvoked) {
self.emit('warning', new InvalidActionError('Callback for ' + self.MIDDLEWARE_START + ' middleware was already invoked'));
} else {
callbackInvoked = true;
if (err) {
throw err;
} else {
start();
}
}
});
} else {
start();
}
};
SCWorker.prototype.start = function () {
var self = this;
this._httpRequestCount = 0;
this._wsRequestCount = 0;
this._httpRPM = 0;
this._wsRPM = 0;
if (this._statusInterval != null) {
clearInterval(this._statusInterval);
}
this._statusInterval = setInterval(this._calculateStatus.bind(this), this.options.workerStatusInterval);
// Run the initController to initialize the global context
if (this._paths.appInitControllerPath != null) {
this._initController = require(this._paths.appInitControllerPath);
this._initController.run(this);
}
this._startServer();
};
SCWorker.prototype._httpRequestHandler = function (req, res) {
var self = this;
this._httpRequestCount++;
req.exchange = req.global = this.exchange;
var forwardedFor = req.headers['x-forwarded-for'];
if (forwardedFor) {
var forwardedClientIP;
if (forwardedFor.indexOf(',') > -1) {
forwardedClientIP = forwardedFor.split(',')[0];
} else {
forwardedClientIP = forwardedFor;
}
req.forwardedForAddress = forwardedClientIP;
}
if (req.connection) {
req.remoteAddress = req.connection.remoteAddress;
req.remoteFamily = req.connection.remoteFamily;
req.remotePort = req.connection.remotePort;
} else if (req.socket) {
req.remoteAddress = req.socket.remoteAddress;
req.remoteFamily = req.socket.remoteFamily;
req.remotePort = req.socket.remotePort;
}
};
SCWorker.prototype.getSCServer = function () {
return this.scServer;
};
SCWorker.prototype.getHTTPServer = function () {
return this.httpServer;
};
SCWorker.prototype._calculateStatus = function () {
var perMinuteFactor = 60000 / this.options.workerStatusInterval;
this._httpRPM = this._httpRequestCount * perMinuteFactor;
this._wsRPM = this._wsRequestCount * perMinuteFactor;
this._httpRequestCount = 0;
this._wsRequestCount = 0;
var memThreshold = this.options.killWorkerMemoryThreshold;
if (memThreshold != null) {
var memoryUsage = process.memoryUsage();
if (memoryUsage.heapUsed > memThreshold) {
var message = 'Worker killed itself because its memory ';
message += 'usage of ' + memoryUsage.heapUsed + ' exceeded ';
message += 'the killWorkerMemoryThreshold of ' + memThreshold;
var warning = new ResourceLimitError(message);
this.emitWarning(warning);
process.exit();
}
}
};
SCWorker.prototype.getStatus = function () {
return {
clientCount: this.scServer.clientsCount,
httpRPM: this._httpRPM,
wsRPM: this._wsRPM
};
};
SCWorker.prototype._createIPCResponseHandler = function (callback) {
var self = this;
var cid = uuid.v4();
var responseTimeout = setTimeout(function () {
var responseHandler = self._pendingResponseHandlers[cid];
delete self._pendingResponseHandlers[cid];
var timeoutError = new TimeoutError('IPC response timed out');
responseHandler.callback(timeoutError);
}, this.options.ipcAckTimeout);
this._pendingResponseHandlers[cid] = {
callback: callback,
timeout: responseTimeout
};
return cid;
};
SCWorker.prototype.handleMasterResponse = function (message) {
var responseHandler = this._pendingResponseHandlers[message.rid];
if (responseHandler) {
clearTimeout(responseHandler.timeout);
delete this._pendingResponseHandlers[message.rid];
var properError = scErrors.hydrateError(message.error, true);
responseHandler.callback(properError, message.data);
}
};
SCWorker.prototype.sendToMaster = function (data, callback) {
var messagePacket = {
type: 'workerMessage',
data: data,
workerId: this.id
};
if (callback) {
messagePacket.cid = this._createIPCResponseHandler(callback);
}
process.send(messagePacket);
};
SCWorker.prototype.respondToMaster = function (err, data, rid) {
process.send({
type: 'workerResponse',
error: scErrors.dehydrateError(err, true),
data: data,
workerId: this.id,
rid: rid
});
};
SCWorker.prototype.handleMasterEvent = function () {
this.emit.apply(this, arguments);
};
SCWorker.prototype.handleMasterMessage = function (message) {
var self = this
self.emit('masterMessage', message.data, function (err, data) {
if (message.cid) {
self.respondToMaster(err, data, message.cid);
}
});
};
SCWorker.prototype.emitError = function (err) {
this.emit(this.EVENT_ERROR, err);
};
SCWorker.prototype.emitWarning = function (warning) {
this.emit(this.EVENT_WARNING, warning);
};
SCWorker.setup = function (options) {
userOptions = options;
process.on('uncaughtException', function (err) {
handleError(true, err);
});
process.on('message', createMessageHandler(options));
}
module.exports = SCWorker;
/*
CHANGES:
- the else clause from top level if(cluster.isMaster), optionally replaced by a warning
- adding a worker.on('online') to feed the init options with a setTimeout with a 2000ms hardcoded delay.
This is prone to breaking of course, so this needs revision.
- use cluster.setupMaster to set the worker.mjs file path
*/
var cluster = require('cluster');
var scErrors = require('sc-errors');
var InvalidActionError = scErrors.InvalidActionError;
var processTermTimeout = 10000;
if (cluster.isMaster) {
process.on('disconnect', function () {
console.log('exiting workercluster on disconnect');
process.exit();
});
var workers;
var alive = true;
var hasExited = false;
var terminatedCount = 0;
var sendErrorToMaster = function (err) {
var error = scErrors.dehydrateError(err, true);
process.send({
type: 'error',
data: {
pid: process.pid,
error: error
}
});
};
var terminate = function () {
alive = false;
setTimeout(function () {
if (!hasExited) {
hasExited = true;
process.exit();
}
}, processTermTimeout);
};
process.on('message', function (m) {
if (m.type == 'init') {
if (m.data.schedulingPolicy != null) {
cluster.schedulingPolicy = m.data.schedulingPolicy;
}
if (m.data.processTermTimeout) {
processTermTimeout = m.data.processTermTimeout;
}
// Run an initController to initialize anything for the master process of all SCWorkers
if (m.data.paths.appWorkerClusterControllerPath != null) {
var initWorkerClusterController = require(m.data.paths.appWorkerClusterControllerPath);
initWorkerClusterController.run(process);
}
var workerCount = m.data.workerCount;
var readyCount = 0;
var isReady = false;
workers = [];
var launchWorker = function (i, respawn) {
var worker = cluster.fork();
workers[i] = worker;
worker.on('online', function () {
var workerInitOptions = {};
for (var j in m) {
if (m.hasOwnProperty(j)) {
workerInitOptions[j] = m[j];
}
}
workerInitOptions.data.id = i;
setTimeout(function () { worker.send(workerInitOptions); }, 2000);
})
worker.on('error', sendErrorToMaster);
worker.on('message', function (m) {
if (m.type == 'ready') {
process.send({
type: 'workerStart',
data: {
id: i,
pid: worker.process.pid,
respawn: respawn || false
}
});
if (!isReady && ++readyCount >= workerCount) {
isReady = true;
process.send({
type: 'ready'
});
}
} else {
process.send(m);
}
});
worker.on('exit', function (code, signal) {
if (alive) {
process.send({
type: 'workerExit',
data: {
id: i,
pid: worker.process.pid,
code: code,
signal: signal
}
});
if (m.data.rebootWorkerOnCrash) {
launchWorker(i, true);
}
} else if (++terminatedCount >= workers.length) {
if (!hasExited) {
hasExited = true;
process.exit();
}
}
});
};
cluster.setupMaster({
exec: m.data.paths.appWorkerControllerPath,
});
for (var i = 0; i < workerCount; i++) {
launchWorker(i);
}
} else if (m.type == 'masterMessage') {
var targetWorker = workers[m.workerId];
if (targetWorker) {
targetWorker.send(m);
} else {
var errorMessage = 'Cannot send message to worker with id ' + m.workerId +
' because the worker does not exist';
var notFoundError = new InvalidActionError(errorMessage);
sendErrorToMaster(notFoundError);
if (m.cid) {
process.send({
type: 'workerClusterResponse',
error: scErrors.dehydrateError(notFoundError, true),
data: null,
workerId: m.workerId,
rid: m.cid
});
}
}
} else {
if (m.type == 'terminate' && m.data.killClusterMaster) {
terminate();
}
for (var i in workers) {
if (workers.hasOwnProperty(i)) {
workers[i].send(m);
}
}
}
});
}
else {
console.log('weird... workercluster is called as client...');
}
/*
The file with the most changes. First of all, everything is require()'d through import.
It imports the SCWorker prototype through socketcluster, and uses the setup() function to pass
in the run function as part of the options. The setup function also handles setting up the
process listeners.
*/
import socketcluster from 'socketcluster';
import express from 'express';
import morgan from 'morgan';
import fs from 'fs';
import serveStatic from 'serve-static';
import path from 'path';
import healthChecker from 'sc-framework-health-check';
import cluster from 'cluster';
import scErrors from 'sc-errors';
socketcluster.SCWorker.setup({
run: function () {
console.log(' >> Worker PID:', process.pid);
var environment = this.options.environment;
var app = express();
var httpServer = this.httpServer;
var scServer = this.scServer;
if (environment == 'dev') {
// Log every HTTP request. See https://github.com/expressjs/morgan for other
// available formats.
app.use(morgan('dev'));
}
//app.use(serveStatic(path.resolve(__dirname, 'public')));
// Add GET /health-check express route
healthChecker.attach(this, app);
httpServer.on('request', app);
var count = 0;
/*
In here we handle our incoming realtime connections and listen for events.
*/
scServer.on('connection', function (socket) {
// Some sample logic to show how to handle client events,
// replace this with your own logic
socket.on('sampleClientEvent', function (data) {
count++;
console.log('Handled sampleClientEvent', data);
scServer.exchange.publish('sample', count);
});
var interval = setInterval(function () {
socket.emit('rand', {
rand: Math.floor(Math.random() * 5)
});
}, 1000);
socket.on('disconnect', function () {
clearInterval(interval);
});
});
}
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment