Skip to content

Instantly share code, notes, and snippets.

@krlicmuhamed
Last active July 15, 2016 20:25
Show Gist options
  • Save krlicmuhamed/fd51b4abb05aafe110bc9edb98fc8aa3 to your computer and use it in GitHub Desktop.
Save krlicmuhamed/fd51b4abb05aafe110bc9edb98fc8aa3 to your computer and use it in GitHub Desktop.
actionhero socket server with fast protobuf serializer
var net = require('net');
var tls = require('tls');
var path = require('path');
var protobuf = require('protocol-buffers');
var initialize = function(api, options, next) {
var type = 'protobuf';
var attributes = {
pendingShutdownWaitLimit: 5000,
schema: require('fs').readFileSync(path.join(__dirname, 'schema.proto')).toString()
};
var server = new api.genericServer(type, options, attributes);
server.start = function(next) {
if (options.secure === false) {
server.server = net.createServer(api.config.servers.protobuf.serverOptions, function(rawConnection) {
handleConnection(rawConnection);
});
} else {
server.server = tls.createServer(api.config.servers.protobuf.serverOptions, function(rawConnection) {
handleConnection(rawConnection);
});
}
server.server.on('error', function(e) {
return next(new Error('Cannot start socket server @ ' + options.bindIP + ':' + options.port + ' => ' + e.message));
});
server.server.listen(options.port, options.bindIP, function() {
process.nextTick(function() {
return next();
});
});
};
server.stop = function(next) {
gracefulShutdown(next);
};
server.sendMessage = function(connection, message, messageCount) {
if (message.status === 'error') {
message.message = api.config.errors.serializers.servers.socket(message.message);
}
if (connection.respondingTo) {
message.msgcount = messageCount;
connection.respondingTo = null;
} else if (message.context === 'response') {
if (messageCount) {
message.msgcount = messageCount;
} else {
message.msgcount = connection.messageCount;
}
}
if (!message.context) {
message.context = 'response';
}
try {
var response = produce({
schema: attributes.schema,
message: 'Response',
data: message
});
//console.log('Response:', response, response.toString());
connection.rawConnection.write(response + '\r\n');
} catch (e) {
api.log('socket write error: ' + e.message, 'error');
server.sendMessage(connection, {
status: 'error',
message: e.message + '[1]'
});
}
};
server.goodbye = function(connection) {
try {
connection.rawConnection.end();
} catch (e) {}
};
/*
server.sendFile = function(connection, error, fileStream) {
if (error) {
server.sendMessage(connection, error, connection.messageCount);
} else {
fileStream.pipe(connection.rawConnection, {
end: false
});
}
};
*/
server.on('connection', function(connection) {
connection.params = {};
connection.rawConnection.on('data', function(chunk) {
// Replace all carriage returns with newlines.
connection.rawConnection.socketDataString += chunk.toString('utf-8'); //.replace(/\r/g, '\n');
var index;
var d = String(api.config.servers.protobuf.delimiter);
while ((index = connection.rawConnection.socketDataString.indexOf(d)) > -1) {
var data = connection.rawConnection.socketDataString.slice(0, index);
connection.rawConnection.socketDataString = connection.rawConnection.socketDataString.slice(index + d.length);
data.split(d).forEach(parseLine);
}
});
connection.rawConnection.on('end', function() {
if (connection.destroyed !== true) {
try {
connection.rawConnection.end();
} catch (e) {}
connection.destroy();
}
});
connection.rawConnection.on('error', function(e) {
if (connection.destroyed !== true) {
server.log('socket error: ' + e, 'error');
try {
connection.rawConnection.end();
} catch (e) {}
connection.destroy();
}
});
var parseLine = function(line) {
if (api.config.servers.protobuf.maxDataLength > 0) {
var blen = Buffer.byteLength(line, 'utf8');
if (blen > api.config.servers.protobuf.maxDataLength) {
var error = api.config.errors.dataLengthTooLarge(api.config.servers.protobuf.maxDataLength, blen);
server.log(error, 'error');
return server.sendMessage(connection, {
status: 'error',
message: error + '[2]'
});
}
}
if (line.length > 0) {
// increment at the start of the request so that responses can be caught in order on the client
// this is not handled by the genericServer
connection.messageCount++;
parseRequest(connection, line);
}
};
});
var parseRequest = function(connection, line) {
return new Promise(function(resolve, fail) {
var input = {
schema: attributes.schema,
message: 'Request',
buffer: new Buffer(line, 'utf8')
};
//console.log('Request:', new Buffer(line, 'utf8'), line.toString());
var messageRoot = parse(input);
// action requested messageRoot.header.action
// request message messageRoot.body
var input = {
schema: attributes.schema,
message: messageRoot.header.action,
buffer: new Buffer(messageRoot.body, 'utf8')
};
if (messageRoot.header) {
connection.mockHeaders = messageRoot.header;
}
if (messageRoot.header.authorization) {
connection.mockHeaders.authorization = messageRoot.header.authorization
}
connection.params.action = messageRoot.header.action;
var parsed = parse(input);
connection.params = Object.assign(connection.params, parsed);
return resolve(connection)
}).catch(function(e){
server.sendMessage(connection, {
status: 'error',
message: e.message + '[3]'
});
}).then(function(connection){
// Process action
connection.error = null;
connection.response = {};
server.processAction(connection);
});
};
server.on('actionComplete', function(data) {
if (data.toRender === true) {
// data.response must match protobuf schema!
data.response.context = 'response';
if (!data.response.error) {
data.response.status = 'action';
} else {
data.response.status = 'error';
data.response.message = data.response.error.message;
data.response.error = null;
}
server.sendMessage(data.connection, data.response, data.messageCount);
}
});
var handleConnection = function(rawConnection) {
if (api.config.servers.protobuf.setKeepAlive === true) {
rawConnection.setKeepAlive(true);
}
rawConnection.socketDataString = '';
server.buildConnection({
rawConnection: rawConnection,
remoteAddress: rawConnection.remoteAddress,
remotePort: rawConnection.remotePort
}); // will emit 'connection'
};
// I check for ctrl+c in the stream
var checkBreakChars = function(chunk) {
var found = false;
var hexChunk = chunk.toString('hex', 0, chunk.length);
if (hexChunk === 'fff4fffd06') {
found = true; // CTRL + C
} else if (hexChunk === '04') {
found = true; // CTRL + D
}
return found;
};
var gracefulShutdown = function(next, alreadyShutdown) {
if (!alreadyShutdown || alreadyShutdown === false) {
server.server.close();
}
var pendingConnections = 0;
server.connections().forEach(function(connection) {
if (connection.pendingActions === 0) {
connection.destroy();
} else {
pendingConnections++;
if (!connection.rawConnection.shutDownTimer) {
connection.rawConnection.shutDownTimer = setTimeout(function() {
connection.destroy();
}, attributes.pendingShutdownWaitLimit);
}
}
});
if (pendingConnections > 0) {
server.log('waiting on shutdown, there are still ' + pendingConnections + ' connected clients waiting on a response', 'notice');
setTimeout(function() {
gracefulShutdown(next, true);
}, 1000);
} else if (typeof next === 'function') {
return next();
}
};
var parse = function(input) {
return protobuf(input.schema)[input.message].decode(input.buffer);
};
var produce = function(input) {
return protobuf(input.schema)[input.message].encode(input.data);
};
return next(server);
};
exports.initialize = initialize;
// Internal
message Header {
required string action = 1;
optional string authorization = 2;
}
message Request {
required Header header = 1;
required bytes body = 2;
}
message Response {
required string context = 1;
optional string status = 2;
optional int32 msgcount = 3
optional string message = 4;
optional bytes body = 5;
}
// Actions
message testAction {
required string test = 1;
}
message deviceEnroll {
required string deviceId = 1;
required string deviceModel = 2;
}
message deviceLocation {
required string deviceId = 1;
}
var should = require('should');
var uuid = require('node-uuid');
var path = require('path');
var protobuf = require('protocol-buffers');
var actionheroPrototype = require('actionhero').actionheroPrototype;
var actionhero = new actionheroPrototype();
var api, client;
var net = require('net');
var protobufSchema = require('fs').readFileSync(path.join(__dirname, '../servers/generic/schema.proto')).toString();
var client = new net.Socket();
describe('Generic protobuf server', function() {
before(function(done) {
actionhero.start(function(error, a) {
api = a;
done();
})
});
after(function(done) {
actionhero.stop(function(error) {
done();
});
});
it('should have booted into the test env', function() {
process.env.NODE_ENV.should.equal('test');
api.env.should.equal('test');
should.exist(api.id);
});
it('can retrieve server uptime via the status action', function(done) {
api.specHelper.runAction('status', function(response) {
should.not.exist(response.error);
response.uptime.should.be.above(0);
done();
});
});
it('should be able to connect', function(done) {
client.connect(api.config.servers.protobuf.port, function() {
client.setEncoding('utf8');
done();
});
});
it('should be able to respond with protobuf error on invalid request', function(done) {
makeSocketRequest(client, 'hello', function(response) {
should.exist(response);
response.should.be.an.instanceOf(Object)
response.context.should.equal('response');
response.status.should.equal('error');
response.message.should.equal('Groups are not supported[3]');
done();
});
});
it('should be able to request an action', function(done) {
var b = produce({
schema: protobufSchema,
message: 'testAction',
data: {
test: 'testing123'
}
});
var p = parse({
schema: protobufSchema,
message: 'testAction',
buffer: b
});
console.log(2, b, p);
var message = produce({
schema: protobufSchema,
message: 'Request',
data: {
header: {
action: 'testAction'
},
body: produce({
schema: protobufSchema,
message: 'testAction',
data: {
test: 'testing123'
}
})
}
});
makeSocketRequest(client, message, function(response) {
response.message.should.be.equal('Successfully processed the action!');
done();
});
});
});
var makeSocketRequest = function(thisClient, message, cb, delimiter) {
var lines = [];
var counter = 0;
if (delimiter === null || typeof delimiter === 'undefined') {
var delimiter = '\r\n';
}
var rsp = function(d) {
d.split(delimiter).forEach(function(l) {
lines.push(l);
});
lines.push();
};
var responder = function() {
if (lines.length === 0 && counter < 20) {
counter++;
return setTimeout(responder, 10);
}
var lastLine = lines[(lines.length - 1)];
if (lastLine === '') {
lastLine = lines[(lines.length - 2)];
}
console.log(lines);
var input = {
schema: protobufSchema,
message: 'Response',
buffer: new Buffer(lastLine, 'utf8')
};
var parsed = parse(input);
thisClient.removeListener('data', rsp);
if (typeof cb === 'function') {
cb(parsed);
}
};
setTimeout(responder, 100);
thisClient.on('data', rsp);
thisClient.write(message + delimiter);
}
var parse = function(input) {
return protobuf(input.schema)[input.message].decode(input.buffer);
};
var produce = function(input) {
return protobuf(input.schema)[input.message].encode(input.data);
};
'use strict';
if (process.env.NODE_ENV == "test") {
exports.action = {
name: 'testAction',
description: 'testAction',
blockedConnectionTypes: [],
outputExample: {},
matchExtensionMimeType: false,
version: 1.0,
toDocument: true,
middleware: [],
inputs: {},
run: function(api, data, next) {
data.response.message = 'Successfully processed the action!';
return next();
}
};
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment