Last active
July 15, 2016 20:25
-
-
Save krlicmuhamed/fd51b4abb05aafe110bc9edb98fc8aa3 to your computer and use it in GitHub Desktop.
actionhero socket server with fast protobuf serializer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var net = require('net'); | |
var tls = require('tls'); | |
var path = require('path'); | |
var protobuf = require('protocol-buffers'); | |
var initialize = function(api, options, next) { | |
var type = 'protobuf'; | |
var attributes = { | |
pendingShutdownWaitLimit: 5000, | |
schema: require('fs').readFileSync(path.join(__dirname, 'schema.proto')).toString() | |
}; | |
var server = new api.genericServer(type, options, attributes); | |
server.start = function(next) { | |
if (options.secure === false) { | |
server.server = net.createServer(api.config.servers.protobuf.serverOptions, function(rawConnection) { | |
handleConnection(rawConnection); | |
}); | |
} else { | |
server.server = tls.createServer(api.config.servers.protobuf.serverOptions, function(rawConnection) { | |
handleConnection(rawConnection); | |
}); | |
} | |
server.server.on('error', function(e) { | |
return next(new Error('Cannot start socket server @ ' + options.bindIP + ':' + options.port + ' => ' + e.message)); | |
}); | |
server.server.listen(options.port, options.bindIP, function() { | |
process.nextTick(function() { | |
return next(); | |
}); | |
}); | |
}; | |
server.stop = function(next) { | |
gracefulShutdown(next); | |
}; | |
server.sendMessage = function(connection, message, messageCount) { | |
if (message.status === 'error') { | |
message.message = api.config.errors.serializers.servers.socket(message.message); | |
} | |
if (connection.respondingTo) { | |
message.msgcount = messageCount; | |
connection.respondingTo = null; | |
} else if (message.context === 'response') { | |
if (messageCount) { | |
message.msgcount = messageCount; | |
} else { | |
message.msgcount = connection.messageCount; | |
} | |
} | |
if (!message.context) { | |
message.context = 'response'; | |
} | |
try { | |
var response = produce({ | |
schema: attributes.schema, | |
message: 'Response', | |
data: message | |
}); | |
//console.log('Response:', response, response.toString()); | |
connection.rawConnection.write(response + '\r\n'); | |
} catch (e) { | |
api.log('socket write error: ' + e.message, 'error'); | |
server.sendMessage(connection, { | |
status: 'error', | |
message: e.message + '[1]' | |
}); | |
} | |
}; | |
server.goodbye = function(connection) { | |
try { | |
connection.rawConnection.end(); | |
} catch (e) {} | |
}; | |
/* | |
server.sendFile = function(connection, error, fileStream) { | |
if (error) { | |
server.sendMessage(connection, error, connection.messageCount); | |
} else { | |
fileStream.pipe(connection.rawConnection, { | |
end: false | |
}); | |
} | |
}; | |
*/ | |
server.on('connection', function(connection) { | |
connection.params = {}; | |
connection.rawConnection.on('data', function(chunk) { | |
// Replace all carriage returns with newlines. | |
connection.rawConnection.socketDataString += chunk.toString('utf-8'); //.replace(/\r/g, '\n'); | |
var index; | |
var d = String(api.config.servers.protobuf.delimiter); | |
while ((index = connection.rawConnection.socketDataString.indexOf(d)) > -1) { | |
var data = connection.rawConnection.socketDataString.slice(0, index); | |
connection.rawConnection.socketDataString = connection.rawConnection.socketDataString.slice(index + d.length); | |
data.split(d).forEach(parseLine); | |
} | |
}); | |
connection.rawConnection.on('end', function() { | |
if (connection.destroyed !== true) { | |
try { | |
connection.rawConnection.end(); | |
} catch (e) {} | |
connection.destroy(); | |
} | |
}); | |
connection.rawConnection.on('error', function(e) { | |
if (connection.destroyed !== true) { | |
server.log('socket error: ' + e, 'error'); | |
try { | |
connection.rawConnection.end(); | |
} catch (e) {} | |
connection.destroy(); | |
} | |
}); | |
var parseLine = function(line) { | |
if (api.config.servers.protobuf.maxDataLength > 0) { | |
var blen = Buffer.byteLength(line, 'utf8'); | |
if (blen > api.config.servers.protobuf.maxDataLength) { | |
var error = api.config.errors.dataLengthTooLarge(api.config.servers.protobuf.maxDataLength, blen); | |
server.log(error, 'error'); | |
return server.sendMessage(connection, { | |
status: 'error', | |
message: error + '[2]' | |
}); | |
} | |
} | |
if (line.length > 0) { | |
// increment at the start of the request so that responses can be caught in order on the client | |
// this is not handled by the genericServer | |
connection.messageCount++; | |
parseRequest(connection, line); | |
} | |
}; | |
}); | |
var parseRequest = function(connection, line) { | |
return new Promise(function(resolve, fail) { | |
var input = { | |
schema: attributes.schema, | |
message: 'Request', | |
buffer: new Buffer(line, 'utf8') | |
}; | |
//console.log('Request:', new Buffer(line, 'utf8'), line.toString()); | |
var messageRoot = parse(input); | |
// action requested messageRoot.header.action | |
// request message messageRoot.body | |
var input = { | |
schema: attributes.schema, | |
message: messageRoot.header.action, | |
buffer: new Buffer(messageRoot.body, 'utf8') | |
}; | |
if (messageRoot.header) { | |
connection.mockHeaders = messageRoot.header; | |
} | |
if (messageRoot.header.authorization) { | |
connection.mockHeaders.authorization = messageRoot.header.authorization | |
} | |
connection.params.action = messageRoot.header.action; | |
var parsed = parse(input); | |
connection.params = Object.assign(connection.params, parsed); | |
return resolve(connection) | |
}).catch(function(e){ | |
server.sendMessage(connection, { | |
status: 'error', | |
message: e.message + '[3]' | |
}); | |
}).then(function(connection){ | |
// Process action | |
connection.error = null; | |
connection.response = {}; | |
server.processAction(connection); | |
}); | |
}; | |
server.on('actionComplete', function(data) { | |
if (data.toRender === true) { | |
// data.response must match protobuf schema! | |
data.response.context = 'response'; | |
if (!data.response.error) { | |
data.response.status = 'action'; | |
} else { | |
data.response.status = 'error'; | |
data.response.message = data.response.error.message; | |
data.response.error = null; | |
} | |
server.sendMessage(data.connection, data.response, data.messageCount); | |
} | |
}); | |
var handleConnection = function(rawConnection) { | |
if (api.config.servers.protobuf.setKeepAlive === true) { | |
rawConnection.setKeepAlive(true); | |
} | |
rawConnection.socketDataString = ''; | |
server.buildConnection({ | |
rawConnection: rawConnection, | |
remoteAddress: rawConnection.remoteAddress, | |
remotePort: rawConnection.remotePort | |
}); // will emit 'connection' | |
}; | |
// I check for ctrl+c in the stream | |
var checkBreakChars = function(chunk) { | |
var found = false; | |
var hexChunk = chunk.toString('hex', 0, chunk.length); | |
if (hexChunk === 'fff4fffd06') { | |
found = true; // CTRL + C | |
} else if (hexChunk === '04') { | |
found = true; // CTRL + D | |
} | |
return found; | |
}; | |
var gracefulShutdown = function(next, alreadyShutdown) { | |
if (!alreadyShutdown || alreadyShutdown === false) { | |
server.server.close(); | |
} | |
var pendingConnections = 0; | |
server.connections().forEach(function(connection) { | |
if (connection.pendingActions === 0) { | |
connection.destroy(); | |
} else { | |
pendingConnections++; | |
if (!connection.rawConnection.shutDownTimer) { | |
connection.rawConnection.shutDownTimer = setTimeout(function() { | |
connection.destroy(); | |
}, attributes.pendingShutdownWaitLimit); | |
} | |
} | |
}); | |
if (pendingConnections > 0) { | |
server.log('waiting on shutdown, there are still ' + pendingConnections + ' connected clients waiting on a response', 'notice'); | |
setTimeout(function() { | |
gracefulShutdown(next, true); | |
}, 1000); | |
} else if (typeof next === 'function') { | |
return next(); | |
} | |
}; | |
var parse = function(input) { | |
return protobuf(input.schema)[input.message].decode(input.buffer); | |
}; | |
var produce = function(input) { | |
return protobuf(input.schema)[input.message].encode(input.data); | |
}; | |
return next(server); | |
}; | |
exports.initialize = initialize; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Internal | |
message Header { | |
required string action = 1; | |
optional string authorization = 2; | |
} | |
message Request { | |
required Header header = 1; | |
required bytes body = 2; | |
} | |
message Response { | |
required string context = 1; | |
optional string status = 2; | |
optional int32 msgcount = 3 | |
optional string message = 4; | |
optional bytes body = 5; | |
} | |
// Actions | |
message testAction { | |
required string test = 1; | |
} | |
message deviceEnroll { | |
required string deviceId = 1; | |
required string deviceModel = 2; | |
} | |
message deviceLocation { | |
required string deviceId = 1; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var should = require('should'); | |
var uuid = require('node-uuid'); | |
var path = require('path'); | |
var protobuf = require('protocol-buffers'); | |
var actionheroPrototype = require('actionhero').actionheroPrototype; | |
var actionhero = new actionheroPrototype(); | |
var api, client; | |
var net = require('net'); | |
var protobufSchema = require('fs').readFileSync(path.join(__dirname, '../servers/generic/schema.proto')).toString(); | |
var client = new net.Socket(); | |
describe('Generic protobuf server', function() { | |
before(function(done) { | |
actionhero.start(function(error, a) { | |
api = a; | |
done(); | |
}) | |
}); | |
after(function(done) { | |
actionhero.stop(function(error) { | |
done(); | |
}); | |
}); | |
it('should have booted into the test env', function() { | |
process.env.NODE_ENV.should.equal('test'); | |
api.env.should.equal('test'); | |
should.exist(api.id); | |
}); | |
it('can retrieve server uptime via the status action', function(done) { | |
api.specHelper.runAction('status', function(response) { | |
should.not.exist(response.error); | |
response.uptime.should.be.above(0); | |
done(); | |
}); | |
}); | |
it('should be able to connect', function(done) { | |
client.connect(api.config.servers.protobuf.port, function() { | |
client.setEncoding('utf8'); | |
done(); | |
}); | |
}); | |
it('should be able to respond with protobuf error on invalid request', function(done) { | |
makeSocketRequest(client, 'hello', function(response) { | |
should.exist(response); | |
response.should.be.an.instanceOf(Object) | |
response.context.should.equal('response'); | |
response.status.should.equal('error'); | |
response.message.should.equal('Groups are not supported[3]'); | |
done(); | |
}); | |
}); | |
it('should be able to request an action', function(done) { | |
var b = produce({ | |
schema: protobufSchema, | |
message: 'testAction', | |
data: { | |
test: 'testing123' | |
} | |
}); | |
var p = parse({ | |
schema: protobufSchema, | |
message: 'testAction', | |
buffer: b | |
}); | |
console.log(2, b, p); | |
var message = produce({ | |
schema: protobufSchema, | |
message: 'Request', | |
data: { | |
header: { | |
action: 'testAction' | |
}, | |
body: produce({ | |
schema: protobufSchema, | |
message: 'testAction', | |
data: { | |
test: 'testing123' | |
} | |
}) | |
} | |
}); | |
makeSocketRequest(client, message, function(response) { | |
response.message.should.be.equal('Successfully processed the action!'); | |
done(); | |
}); | |
}); | |
}); | |
var makeSocketRequest = function(thisClient, message, cb, delimiter) { | |
var lines = []; | |
var counter = 0; | |
if (delimiter === null || typeof delimiter === 'undefined') { | |
var delimiter = '\r\n'; | |
} | |
var rsp = function(d) { | |
d.split(delimiter).forEach(function(l) { | |
lines.push(l); | |
}); | |
lines.push(); | |
}; | |
var responder = function() { | |
if (lines.length === 0 && counter < 20) { | |
counter++; | |
return setTimeout(responder, 10); | |
} | |
var lastLine = lines[(lines.length - 1)]; | |
if (lastLine === '') { | |
lastLine = lines[(lines.length - 2)]; | |
} | |
console.log(lines); | |
var input = { | |
schema: protobufSchema, | |
message: 'Response', | |
buffer: new Buffer(lastLine, 'utf8') | |
}; | |
var parsed = parse(input); | |
thisClient.removeListener('data', rsp); | |
if (typeof cb === 'function') { | |
cb(parsed); | |
} | |
}; | |
setTimeout(responder, 100); | |
thisClient.on('data', rsp); | |
thisClient.write(message + delimiter); | |
} | |
var parse = function(input) { | |
return protobuf(input.schema)[input.message].decode(input.buffer); | |
}; | |
var produce = function(input) { | |
return protobuf(input.schema)[input.message].encode(input.data); | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict'; | |
if (process.env.NODE_ENV == "test") { | |
exports.action = { | |
name: 'testAction', | |
description: 'testAction', | |
blockedConnectionTypes: [], | |
outputExample: {}, | |
matchExtensionMimeType: false, | |
version: 1.0, | |
toDocument: true, | |
middleware: [], | |
inputs: {}, | |
run: function(api, data, next) { | |
data.response.message = 'Successfully processed the action!'; | |
return next(); | |
} | |
}; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment