Skip to content

Instantly share code, notes, and snippets.

@xiongjia
Last active September 14, 2017 07:45
Show Gist options
  • Save xiongjia/6867670 to your computer and use it in GitHub Desktop.
Save xiongjia/6867670 to your computer and use it in GitHub Desktop.
node stream tests : It's a simple protocol parser. I created it for test the NODE Stream module. #devsample
{
"curly": true,
"eqeqeq": true,
"immed": true,
"latedef": true,
"newcap": true,
"noarg": true,
"sub": true,
"undef": true,
"unused": "vars",
"boss": true,
"eqnull": true,
"node": true,
"trailing": true
}
/*
* node stream tests - Client
*/
'use strict';
var logger = require('winston'),
_ = require('underscore');
exports = module.exports = function (opts) {
var net = require('net'),
proto = require('./nsrv-protocol'),
Incoming = proto.Incoming,
Outgoing = proto.Outgoing,
client,
defaultTagetAddr = { host : 'localhost', port : 8095 },
clientOpts = {
targetSrv : defaultTagetAddr
};
/* update client options */
logger.debug('starting client, input opts %s', JSON.stringify(opts));
_.each(_.keys(clientOpts), function (opt) {
if (opts[opt]) {
clientOpts[opt] = opts[opt];
}
});
logger.info('starting client, opts %s', JSON.stringify(clientOpts));
/* connect to server */
client = net.connect({
host : clientOpts.targetSrv.host,
port : clientOpts.targetSrv.port
},
function() {
var req = new Outgoing(),
res = new Incoming();
logger.info('connected to %s', JSON.stringify(clientOpts.targetSrv));
client.on('close', function (hasErr) {
logger.info('socket closed. hasErr %s', hasErr);
});
/* startup the request & response streams */
res.on('error', function (err) {
/* close the socket when error occued */
logger.error('Remote %s: An error occued on the response stream');
client.end(null);
client.destroy();
});
req.on('error', function (err) {
/* close the socket when error occued */
logger.error('Remote %s: An error occued on the request stream.');
client.end(null);
client.destroy();
});
res.on('eval', function (cmd) {
if (!cmd.command) {
logger.error('Serv got an empty command');
return;
}
else if (cmd.command === 'echo') {
/* echo command */
if (cmd.msg) {
logger.info('Serv echo rep = [%s]', cmd.msg);
}
else {
logger.debug('Client got an error echo rep');
}
}
});
req.pipe(client).pipe(res);
/* send requests to server */
req.send({ command : 'echo', msg : 'request 1'}, function () {
logger.info('sent request 1');
});
req.send({ command : 'echo', msg : 'request 2'}, function () {
logger.info('sent request 2');
});
req.send({ command : 'bye'}, function () {
logger.info('sent bye');
});
});
};
function main() {
var argv = require('optimist').argv;
/* startup the logger and set it to console */
logger.cli();
logger.default.transports.console.timestamp = true;
if (argv.logLevel) {
logger.default.transports.console.level = argv.logLevel;
logger.info('Set logLevel to %s', argv.logLevel);
}
/* run the serv with the argv options */
exports(argv);
}
if (require.main === module) {
/* current module is the node main module */
main();
}
/*
* node stream tests - Protocol stream
*
* It's a simple protocol. I created it for test the NODE Stream module.
*
* Protocol: (All the protocol Buffer are UTF8 Strings')
*
* ---------------------------------------
* '{JSON String 1}'\n
* '{JSON String 2}'\n
* .....
* '{JSON String n}'\n
* \n
* ---------------------------------------
*
* Sample Client: nclient.js
* Sample Server: nsrv.js
*
* TODO:
* 1. need a Handshake command for check the Protocol versions.
* 2. add unit test
*/
'use strict';
var logger = require('winston'),
util = require('util'),
_ = require('underscore'),
Transform = require('stream').Transform;
/* the construct of IncomingStream */
function IncomingStream(opts) {
var self = this,
streamOpts = {},
incomingOpts;
if (!(self instanceof IncomingStream)) {
logger.warn('the caller directly using IncomingStream');
return new IncomingStream(opts);
}
logger.debug('creating IncomingStream');
Transform.call(self, streamOpts);
/* startup options */
incomingOpts = {
Eval : function (command) {
logger.info('New incoming command %s',
JSON.stringify(command));
self.emit('eval', command);
}
};
if (opts) {
/* update customer's options */
_.each(_.keys(incomingOpts), function (opt) {
if (opts[opt]) {
logger.debug('Update incoming stream opt %s', opt);
incomingOpts[opt] = opts[opt];
}
});
}
self._eval = incomingOpts.Eval;
self._lastChunk = null;
}
/* the construct of OutgoingStream */
function OutgoingStream() {
var self = this,
streamOpts = {};
if (!(self instanceof OutgoingStream)) {
logger.warn('the caller directly using OutgoingStream');
return new OutgoingStream();
}
logger.debug('creating OutgoingStream');
Transform.call(self, streamOpts);
}
/* set the base class and export the streams */
util.inherits(IncomingStream, Transform);
util.inherits(OutgoingStream, Transform);
exports.Incoming = IncomingStream;
exports.Outgoing = OutgoingStream;
/* transform function for IncomingStream */
IncomingStream.prototype._transform = function (chunk, encoding, callback) {
if (!chunk) {
logger.debug('incoming, get an empty chunk');
callback();
return;
}
logger.debug('incoming chunk size %d', chunk.length);
this._incomingProcess(chunk);
callback();
};
IncomingStream.prototype._incomingProcess = function (chunk) {
var TERMINATE = 10, /* TERMINATE flag is 10 ('\n') */
self = this,
curChunk, i, findFirstTerminate, idxTerminate, cmdBuf, remain, cmdsStr;
/* parse the chunks */
curChunk = this._lastChunk ? Buffer.concat([this._lastChunk, chunk]) : chunk;
findFirstTerminate = false;
idxTerminate = -1;
for (i = 0; i < curChunk.length; ++i) {
if (curChunk[i] === TERMINATE) {
if (!findFirstTerminate) {
findFirstTerminate = true;
}
else {
idxTerminate = i;
}
}
else {
findFirstTerminate = false;
}
}
if (idxTerminate === -1) {
/* still waitting \n\n */
logger.debug('cannot find terminate flag & waitting next chunk');
this._lastChunk = curChunk;
}
else if (idxTerminate === curChunk.length) {
logger.debug('Terminate flag is the end of chunk');
cmdBuf = curChunk;
this._lastChunk = null;
}
else if (idxTerminate < curChunk.length) {
logger.debug('idxTerminate < curChunk.length');
cmdBuf = curChunk.slice(0, idxTerminate);
remain = curChunk.slice(idxTerminate + 1);
this.push(remain);
}
else {
/* this can't be happening */
logger.error('bad protocol format. idxTerminate > curChunk.length');
this.emit('error', { code : 'Bad Chunk' });
}
if (cmdBuf) {
cmdsStr = cmdBuf.toString('utf8');
_.each(cmdsStr.split('\n'), function (cmdStr) {
var cmdObj;
if (cmdStr.length === 0) {
return;
}
try {
cmdObj = JSON.parse(cmdStr);
logger.debug('receive a new command %s', JSON.stringify(cmdObj));
self._eval(cmdObj);
}
catch (exception) {
logger.error('receive a bad command string, %s', cmdStr);
}
});
}
};
/* transform function for OutgoingStream */
OutgoingStream.prototype._transform = function (chunk, encoding, callback) {
if (!chunk) {
logger.debug('outgoing, get an empty chunk');
callback();
return;
}
logger.debug('outgoing chunk size %d', chunk.length);
this.push(chunk);
callback();
};
OutgoingStream.prototype.flush = function (callback) {
var cmdBuf = new Buffer('\n', 'utf8');
this.write(cmdBuf, 'utf8', callback);
};
OutgoingStream.prototype.send = function (command, opts, callback) {
var cmdBuf,
sendOpts = {
flush : true
};
/* update send options */
if (opts) {
_.each(_.keys(sendOpts), function (opt) {
if (opts[opt]) {
sendOpts[opt] = opts[opt];
}
});
}
if (sendOpts.flush) {
cmdBuf = new Buffer(util.format('%j\n\n', command), 'utf8');
}
else {
cmdBuf = new Buffer(util.format('%j\n', command), 'utf8');
}
this.write(cmdBuf, 'utf8', callback);
};
/*
* node stream tests - Server
*/
'use strict';
var logger = require('winston'),
util = require('util'),
_ = require('underscore');
exports = module.exports = function (opts) {
var net = require('net'),
proto = require('./nsrv-protocol'),
Incoming = proto.Incoming,
Outgoing = proto.Outgoing,
serv,
startListening = false,
defaultSrvAddr = { host : 'localhost', port : 8095 },
srvOpts = {
address : defaultSrvAddr
};
/* update server options */
logger.debug('starting serv, input opts %s', JSON.stringify(opts));
_.each(_.keys(srvOpts), function (opt) {
if (opts[opt]) {
srvOpts[opt] = opts[opt];
}
});
logger.info('starting serv, opts %s', JSON.stringify(srvOpts));
/* create server */
serv = net.createServer({ allowHalfOpen: true }, function (sock) {
var remote = util.format('%s:%d', sock.remoteAddress, sock.remotePort),
req = new Incoming(),
res = new Outgoing();
logger.info('new connection from %s', remote);
sock.on('close', function (hadErr) {
logger.info('socket closed (%s). hadErr %s', remote, hadErr);
});
sock.on('error', function (err) {
logger.error('error occued on socket (%s), error %s', remote, err.code);
});
/* startup the request & response streams */
res.on('error', function (err) {
/* close the socket when error occued */
logger.error('Remote %s: An error occued on the response stream (%s).',
remote, err);
sock.end(null);
});
req.on('error', function (err) {
/* close the socket when error occued */
logger.error('Remote %s: An error occued on the request stream (%s).',
remote, err);
sock.end(null);
});
req.on('eval', function (cmd) {
if (!cmd.command) {
logger.error('Serv got an empty command');
return;
}
if (cmd.command === 'bye') {
/* disconnect command */
sock.end();
}
else if (cmd.command === 'echo') {
/* echo command */
if (cmd.msg) {
res.send({
command : 'echo',
msg : util.format('Serv response message (%s)', cmd.msg)
});
}
else {
logger.error('serv got an error "echo" command. Cannot parse "msg" filed');
}
}
});
res.pipe(sock).pipe(req);
});
serv.on('error', function (err) {
logger.error('An error occured, server error code %s', err.code);
if (startListening) {
serv.close();
startListening = false;
}
});
serv.on('listening', function () {
logger.info('server started');
startListening = true;
});
/* start the server */
logger.info('start listen on %s', JSON.stringify(srvOpts.address));
serv.listen(srvOpts.address.port, srvOpts.address.host);
};
function main() {
var argv = require('optimist').argv;
/* startup the logger and set it to console */
logger.cli();
logger.default.transports.console.timestamp = true;
if (argv.logLevel) {
logger.default.transports.console.level = argv.logLevel;
logger.info('Set logLevel to %s', argv.logLevel);
}
/* run the serv with the argv options */
exports(argv);
}
if (require.main === module) {
/* current module is the node main module */
main();
}
{
"name": "nstream-tests",
"version": "0.0.0",
"description": "Node Stream Tests",
"author": {
"name": "LeXiongJia",
"email": "[email protected]"
},
"licenses": [
{
"type": "MIT"
}
],
"dependencies": {
"underscore": "~1.5.2",
"winston": "~0.7.2",
"optimist": "~0.6.0"
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment