Last active
September 14, 2017 07:45
-
-
Save xiongjia/6867670 to your computer and use it in GitHub Desktop.
node stream tests : It's a simple protocol parser. I created it for test the NODE Stream module. #devsample
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Show hidden characters
{ | |
"curly": true, | |
"eqeqeq": true, | |
"immed": true, | |
"latedef": true, | |
"newcap": true, | |
"noarg": true, | |
"sub": true, | |
"undef": true, | |
"unused": "vars", | |
"boss": true, | |
"eqnull": true, | |
"node": true, | |
"trailing": true | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* node stream tests - Client | |
*/ | |
'use strict'; | |
var logger = require('winston'), | |
_ = require('underscore'); | |
exports = module.exports = function (opts) { | |
var net = require('net'), | |
proto = require('./nsrv-protocol'), | |
Incoming = proto.Incoming, | |
Outgoing = proto.Outgoing, | |
client, | |
defaultTagetAddr = { host : 'localhost', port : 8095 }, | |
clientOpts = { | |
targetSrv : defaultTagetAddr | |
}; | |
/* update client options */ | |
logger.debug('starting client, input opts %s', JSON.stringify(opts)); | |
_.each(_.keys(clientOpts), function (opt) { | |
if (opts[opt]) { | |
clientOpts[opt] = opts[opt]; | |
} | |
}); | |
logger.info('starting client, opts %s', JSON.stringify(clientOpts)); | |
/* connect to server */ | |
client = net.connect({ | |
host : clientOpts.targetSrv.host, | |
port : clientOpts.targetSrv.port | |
}, | |
function() { | |
var req = new Outgoing(), | |
res = new Incoming(); | |
logger.info('connected to %s', JSON.stringify(clientOpts.targetSrv)); | |
client.on('close', function (hasErr) { | |
logger.info('socket closed. hasErr %s', hasErr); | |
}); | |
/* startup the request & response streams */ | |
res.on('error', function (err) { | |
/* close the socket when error occued */ | |
logger.error('Remote %s: An error occued on the response stream'); | |
client.end(null); | |
client.destroy(); | |
}); | |
req.on('error', function (err) { | |
/* close the socket when error occued */ | |
logger.error('Remote %s: An error occued on the request stream.'); | |
client.end(null); | |
client.destroy(); | |
}); | |
res.on('eval', function (cmd) { | |
if (!cmd.command) { | |
logger.error('Serv got an empty command'); | |
return; | |
} | |
else if (cmd.command === 'echo') { | |
/* echo command */ | |
if (cmd.msg) { | |
logger.info('Serv echo rep = [%s]', cmd.msg); | |
} | |
else { | |
logger.debug('Client got an error echo rep'); | |
} | |
} | |
}); | |
req.pipe(client).pipe(res); | |
/* send requests to server */ | |
req.send({ command : 'echo', msg : 'request 1'}, function () { | |
logger.info('sent request 1'); | |
}); | |
req.send({ command : 'echo', msg : 'request 2'}, function () { | |
logger.info('sent request 2'); | |
}); | |
req.send({ command : 'bye'}, function () { | |
logger.info('sent bye'); | |
}); | |
}); | |
}; | |
function main() { | |
var argv = require('optimist').argv; | |
/* startup the logger and set it to console */ | |
logger.cli(); | |
logger.default.transports.console.timestamp = true; | |
if (argv.logLevel) { | |
logger.default.transports.console.level = argv.logLevel; | |
logger.info('Set logLevel to %s', argv.logLevel); | |
} | |
/* run the serv with the argv options */ | |
exports(argv); | |
} | |
if (require.main === module) { | |
/* current module is the node main module */ | |
main(); | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* node stream tests - Protocol stream | |
* | |
* It's a simple protocol. I created it for test the NODE Stream module. | |
* | |
* Protocol: (All the protocol Buffer are UTF8 Strings') | |
* | |
* --------------------------------------- | |
* '{JSON String 1}'\n | |
* '{JSON String 2}'\n | |
* ..... | |
* '{JSON String n}'\n | |
* \n | |
* --------------------------------------- | |
* | |
* Sample Client: nclient.js | |
* Sample Server: nsrv.js | |
* | |
* TODO: | |
* 1. need a Handshake command for check the Protocol versions. | |
* 2. add unit test | |
*/ | |
'use strict'; | |
var logger = require('winston'), | |
util = require('util'), | |
_ = require('underscore'), | |
Transform = require('stream').Transform; | |
/* the construct of IncomingStream */ | |
function IncomingStream(opts) { | |
var self = this, | |
streamOpts = {}, | |
incomingOpts; | |
if (!(self instanceof IncomingStream)) { | |
logger.warn('the caller directly using IncomingStream'); | |
return new IncomingStream(opts); | |
} | |
logger.debug('creating IncomingStream'); | |
Transform.call(self, streamOpts); | |
/* startup options */ | |
incomingOpts = { | |
Eval : function (command) { | |
logger.info('New incoming command %s', | |
JSON.stringify(command)); | |
self.emit('eval', command); | |
} | |
}; | |
if (opts) { | |
/* update customer's options */ | |
_.each(_.keys(incomingOpts), function (opt) { | |
if (opts[opt]) { | |
logger.debug('Update incoming stream opt %s', opt); | |
incomingOpts[opt] = opts[opt]; | |
} | |
}); | |
} | |
self._eval = incomingOpts.Eval; | |
self._lastChunk = null; | |
} | |
/* the construct of OutgoingStream */ | |
function OutgoingStream() { | |
var self = this, | |
streamOpts = {}; | |
if (!(self instanceof OutgoingStream)) { | |
logger.warn('the caller directly using OutgoingStream'); | |
return new OutgoingStream(); | |
} | |
logger.debug('creating OutgoingStream'); | |
Transform.call(self, streamOpts); | |
} | |
/* set the base class and export the streams */ | |
util.inherits(IncomingStream, Transform); | |
util.inherits(OutgoingStream, Transform); | |
exports.Incoming = IncomingStream; | |
exports.Outgoing = OutgoingStream; | |
/* transform function for IncomingStream */ | |
IncomingStream.prototype._transform = function (chunk, encoding, callback) { | |
if (!chunk) { | |
logger.debug('incoming, get an empty chunk'); | |
callback(); | |
return; | |
} | |
logger.debug('incoming chunk size %d', chunk.length); | |
this._incomingProcess(chunk); | |
callback(); | |
}; | |
IncomingStream.prototype._incomingProcess = function (chunk) { | |
var TERMINATE = 10, /* TERMINATE flag is 10 ('\n') */ | |
self = this, | |
curChunk, i, findFirstTerminate, idxTerminate, cmdBuf, remain, cmdsStr; | |
/* parse the chunks */ | |
curChunk = this._lastChunk ? Buffer.concat([this._lastChunk, chunk]) : chunk; | |
findFirstTerminate = false; | |
idxTerminate = -1; | |
for (i = 0; i < curChunk.length; ++i) { | |
if (curChunk[i] === TERMINATE) { | |
if (!findFirstTerminate) { | |
findFirstTerminate = true; | |
} | |
else { | |
idxTerminate = i; | |
} | |
} | |
else { | |
findFirstTerminate = false; | |
} | |
} | |
if (idxTerminate === -1) { | |
/* still waitting \n\n */ | |
logger.debug('cannot find terminate flag & waitting next chunk'); | |
this._lastChunk = curChunk; | |
} | |
else if (idxTerminate === curChunk.length) { | |
logger.debug('Terminate flag is the end of chunk'); | |
cmdBuf = curChunk; | |
this._lastChunk = null; | |
} | |
else if (idxTerminate < curChunk.length) { | |
logger.debug('idxTerminate < curChunk.length'); | |
cmdBuf = curChunk.slice(0, idxTerminate); | |
remain = curChunk.slice(idxTerminate + 1); | |
this.push(remain); | |
} | |
else { | |
/* this can't be happening */ | |
logger.error('bad protocol format. idxTerminate > curChunk.length'); | |
this.emit('error', { code : 'Bad Chunk' }); | |
} | |
if (cmdBuf) { | |
cmdsStr = cmdBuf.toString('utf8'); | |
_.each(cmdsStr.split('\n'), function (cmdStr) { | |
var cmdObj; | |
if (cmdStr.length === 0) { | |
return; | |
} | |
try { | |
cmdObj = JSON.parse(cmdStr); | |
logger.debug('receive a new command %s', JSON.stringify(cmdObj)); | |
self._eval(cmdObj); | |
} | |
catch (exception) { | |
logger.error('receive a bad command string, %s', cmdStr); | |
} | |
}); | |
} | |
}; | |
/* transform function for OutgoingStream */ | |
OutgoingStream.prototype._transform = function (chunk, encoding, callback) { | |
if (!chunk) { | |
logger.debug('outgoing, get an empty chunk'); | |
callback(); | |
return; | |
} | |
logger.debug('outgoing chunk size %d', chunk.length); | |
this.push(chunk); | |
callback(); | |
}; | |
OutgoingStream.prototype.flush = function (callback) { | |
var cmdBuf = new Buffer('\n', 'utf8'); | |
this.write(cmdBuf, 'utf8', callback); | |
}; | |
OutgoingStream.prototype.send = function (command, opts, callback) { | |
var cmdBuf, | |
sendOpts = { | |
flush : true | |
}; | |
/* update send options */ | |
if (opts) { | |
_.each(_.keys(sendOpts), function (opt) { | |
if (opts[opt]) { | |
sendOpts[opt] = opts[opt]; | |
} | |
}); | |
} | |
if (sendOpts.flush) { | |
cmdBuf = new Buffer(util.format('%j\n\n', command), 'utf8'); | |
} | |
else { | |
cmdBuf = new Buffer(util.format('%j\n', command), 'utf8'); | |
} | |
this.write(cmdBuf, 'utf8', callback); | |
}; | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* node stream tests - Server | |
*/ | |
'use strict'; | |
var logger = require('winston'), | |
util = require('util'), | |
_ = require('underscore'); | |
exports = module.exports = function (opts) { | |
var net = require('net'), | |
proto = require('./nsrv-protocol'), | |
Incoming = proto.Incoming, | |
Outgoing = proto.Outgoing, | |
serv, | |
startListening = false, | |
defaultSrvAddr = { host : 'localhost', port : 8095 }, | |
srvOpts = { | |
address : defaultSrvAddr | |
}; | |
/* update server options */ | |
logger.debug('starting serv, input opts %s', JSON.stringify(opts)); | |
_.each(_.keys(srvOpts), function (opt) { | |
if (opts[opt]) { | |
srvOpts[opt] = opts[opt]; | |
} | |
}); | |
logger.info('starting serv, opts %s', JSON.stringify(srvOpts)); | |
/* create server */ | |
serv = net.createServer({ allowHalfOpen: true }, function (sock) { | |
var remote = util.format('%s:%d', sock.remoteAddress, sock.remotePort), | |
req = new Incoming(), | |
res = new Outgoing(); | |
logger.info('new connection from %s', remote); | |
sock.on('close', function (hadErr) { | |
logger.info('socket closed (%s). hadErr %s', remote, hadErr); | |
}); | |
sock.on('error', function (err) { | |
logger.error('error occued on socket (%s), error %s', remote, err.code); | |
}); | |
/* startup the request & response streams */ | |
res.on('error', function (err) { | |
/* close the socket when error occued */ | |
logger.error('Remote %s: An error occued on the response stream (%s).', | |
remote, err); | |
sock.end(null); | |
}); | |
req.on('error', function (err) { | |
/* close the socket when error occued */ | |
logger.error('Remote %s: An error occued on the request stream (%s).', | |
remote, err); | |
sock.end(null); | |
}); | |
req.on('eval', function (cmd) { | |
if (!cmd.command) { | |
logger.error('Serv got an empty command'); | |
return; | |
} | |
if (cmd.command === 'bye') { | |
/* disconnect command */ | |
sock.end(); | |
} | |
else if (cmd.command === 'echo') { | |
/* echo command */ | |
if (cmd.msg) { | |
res.send({ | |
command : 'echo', | |
msg : util.format('Serv response message (%s)', cmd.msg) | |
}); | |
} | |
else { | |
logger.error('serv got an error "echo" command. Cannot parse "msg" filed'); | |
} | |
} | |
}); | |
res.pipe(sock).pipe(req); | |
}); | |
serv.on('error', function (err) { | |
logger.error('An error occured, server error code %s', err.code); | |
if (startListening) { | |
serv.close(); | |
startListening = false; | |
} | |
}); | |
serv.on('listening', function () { | |
logger.info('server started'); | |
startListening = true; | |
}); | |
/* start the server */ | |
logger.info('start listen on %s', JSON.stringify(srvOpts.address)); | |
serv.listen(srvOpts.address.port, srvOpts.address.host); | |
}; | |
function main() { | |
var argv = require('optimist').argv; | |
/* startup the logger and set it to console */ | |
logger.cli(); | |
logger.default.transports.console.timestamp = true; | |
if (argv.logLevel) { | |
logger.default.transports.console.level = argv.logLevel; | |
logger.info('Set logLevel to %s', argv.logLevel); | |
} | |
/* run the serv with the argv options */ | |
exports(argv); | |
} | |
if (require.main === module) { | |
/* current module is the node main module */ | |
main(); | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"name": "nstream-tests", | |
"version": "0.0.0", | |
"description": "Node Stream Tests", | |
"author": { | |
"name": "LeXiongJia", | |
"email": "[email protected]" | |
}, | |
"licenses": [ | |
{ | |
"type": "MIT" | |
} | |
], | |
"dependencies": { | |
"underscore": "~1.5.2", | |
"winston": "~0.7.2", | |
"optimist": "~0.6.0" | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment