Skip to content

Instantly share code, notes, and snippets.

@ry
Created March 22, 2011 00:16
Show Gist options
  • Save ry/880514 to your computer and use it in GitHub Desktop.
Save ry/880514 to your computer and use it in GitHub Desktop.
/*
A simple new-line delimited JSON protocol with upgrades.
Receiving Usage:
protocol = require('./frame-protocol');
// parsing data
parser = protocol.Parser();
parser.on('message', function (msg) {
// handle message
});
parser.on('upgrade', function (msg, firstChunk) {
// handle upgrade
});
socket.on('data', function (d) {
parser.execute(d);
});
Sending Usage:
protocol = require('./frame-protocol');
socket.write(protocol.serialize({"hello": "world"}));
socket.write(protocol.upgrade({"hello": "world"}));
*/
var StringDecoder = require("string_decoder").StringDecoder;
var events = require('events');
var util = require('util');
function Parser () {
if (!(this instanceof Parser)) return new Parser();
events.EventEmitter.call(this);
this.decoder = new StringDecoder('utf8');
this.state = 'JSON_START';
this.stringBuffer = '';
}
util.inherits(Parser, events.EventEmitter);
exports.Parser = Parser;
function char (c) {
return c.charCodeAt(0);
}
Parser.prototype._emitMessage = function (d, i) {
//console.error("i: %d", i);
//console.error("start: %d", this._start);
//console.error("stringBuffer: %s", util.inspect(this.stringBuffer));
//console.error("d.slice: %s", util.inspect(d.slice(this._start, i).toString()));
var s = this.stringBuffer +
this.decoder.write(d.slice(this._start, i));
// Remove any newline characters.
s = s.replace(/[\r\n]/g, '');
if (s.length) {
try {
var msg = JSON.parse(s);
} catch (e) {
this._emitError(d, i, 'problem parsing json ' + util.inspect(s));
return;
}
this.emit('message', msg);
}
this.stringBuffer = '';
this.state = 'JSON_START';
};
Parser.prototype._emitUpgrade = function (d, i) {
var s = this.stringBuffer +
this.decoder.write(d.slice(this._start, i));
// Remove any newline characters.
s = s.replace(/[\r\n]/g, '');
var msg;
if (s.length == 0) {
msg = {};
} else {
try {
var msg = JSON.parse(s);
} catch (e) {
this._emitError(d, i, 'problem parsing json ' + util.inspect(s));
return;
}
}
var rest = d.slice(i+1);
this.emit('upgrade', msg, rest);
// if execute() is called again, emit error.
this.state = 'ERROR';
};
Parser.prototype._emitError = function (d, i, msg) {
this.emit('error',
new Error('parse error ' + (msg || '') + ' <' + util.inspect(d.toString()) + '>'));
this.state = 'ERROR';
};
Parser.prototype.execute = function (d) {
this._start = 0;
// Biggest length for stringBuffer is 65mb. That should be enough.
if (this.stringBuffer.length >= 65*1024*1024) {
this._emitError(d, 0);
return;
}
for (var i = 0; i < d.length; i++) {
//console.error(this.state);
switch (this.state) {
case 'JSON_START':
this._start = i;
this.stringBuffer = '';
this.state = d[i] == char('u') ? 'UP' : 'JSON';
break;
case 'JSON':
if (d[i] == char('\r')) {
this.state = 'JSON_LF';
} else if (d[i] == char('\n')) {
this._emitMessage(d, i);
}
break;
case 'JSON_LF':
if (d[i] != char('\n')) {
this._emitError(d, i, "expected \\n got " + String.fromCharCode(d[i]));
} else {
this._emitMessage(d, i);
}
break;
case 'UP':
this.state = d[i] == char('p') ? 'UPG' : 'JSON';
break;
case 'UPG':
this.state = d[i] == char('g') ? 'UPGR' : 'JSON';
break;
case 'UPGR':
this.state = d[i] == char('r') ? 'UPGRA' : 'JSON';
break;
case 'UPGRA':
this.state = d[i] == char('a') ? 'UPGRAD' : 'JSON';
break;
case 'UPGRAD':
this.state = d[i] == char('d') ? 'UPGRADE' : 'JSON';
break;
case 'UPGRADE':
this.state = d[i] == char('e') ? 'UPGRADE_COLON' : 'JSON';
break;
case 'UPGRADE_COLON':
this.state = d[i] == char(':') ? 'UPGRADE_TYPE_START' : 'JSON';
break;
case 'UPGRADE_TYPE_START':
this._start = i;
this.stringBuffer = '';
this.state = 'UPGRADE_TYPE';
break;
case 'UPGRADE_TYPE':
if (d[i] == char('\r')) {
this.state = 'UPGRADE_LF';
} else if (d[i] == char('\n')) {
this._emitUpgrade(d, i);
return;
}
break;
case 'UPGRADE_LF':
if (d[i] != char('\n')) {
this._emitError(d, i);
} else {
this._emitUpgrade(d, i);
return;
}
break;
case 'ERROR':
this._emitError(d, i, "error state");
return;
default:
throw new Error("Unknown state '" + this.state + "'");
}
}
if (this._start != i) {
// We should store the rest of the string.
this.stringBuffer += this.decoder.write(d.slice(this._start, i));
}
};
exports.serialize = function (message) {
return JSON.stringify(message) + "\r\n";
};
exports.upgrade = function (message) {
return 'upgrade: ' + JSON.stringify(message) + "\r\n";
};
var assert = require('assert');
var util = require('util')
var Parser = require('../lib/frame-protocol').Parser;
function test1 () {
console.error("test 1");
var p = new Parser();
var messageCount = 0;
p.on('message', function (m) {
console.error("Got message: %j", m);
messageCount++;
if (messageCount == 1) {
assert.equal("world", m["hello"]);
} else if (messageCount == 2) {
assert.equal("bar", m["foo"]);
}
});
p.execute(Buffer('{"hello": "world"}\r\n{"foo": "bar"}\r\n'));
assert.equal(2, messageCount);
}
function test2 () {
console.error("test 2");
var p = new Parser();
var messageCount = 0;
p.on('message', function (m) {
//console.error("Got message: %j", m);
messageCount++;
if (messageCount == 1) {
assert.equal("world", m["hello"]);
assert.equal("y", m["x"]);
} else if (messageCount == 2) {
assert.equal("bar", m["foo"]);
}
});
function test (s) {
for (var i = 0; i < s.length; i++) {
console.error('i=%d j=%d', i, s.length - i);
console.error(util.inspect(s.slice(0, i)));
console.error(util.inspect(s.slice(i)));
messageCount = 0;
var first = s.slice(0, i);
var second = s.slice(i);
p.execute(Buffer(first));
p.execute(Buffer(second));
assert.equal(2, messageCount);
}
}
test('{"hello": "world", "x": "y"}\r\n{"foo": "bar"}\r\n');
test('{"hello": "world", "x": "y"}\n{"foo": "bar"}\r\n');
test('\r\n{"hello": "world", "x": "y"}\r\n{"foo": "bar"}\n');
}
function test3 () {
console.error('\n\ntest 3');
var p = new Parser();
var messageCount = 0;
var gotUpgrade = false;
p.on('message', function (m) {
console.error("Got message: %j", m);
messageCount++;
if (messageCount == 1) {
assert.equal("world", m["hello"]);
} else if (messageCount == 2) {
assert.equal("bar", m["foo"]);
}
});
p.on('upgrade', function (msg, rest) {
console.error("upgrade '%j' '%s'", msg, rest);
gotUpgrade = true;
assert.deepEqual( {a: 'b'}, msg);
assert.equal('hello', rest.toString());
});
p.execute(Buffer('{"hello": "world"}\r\n{"foo": "bar"}\r\nupgrade: { "a": "b" }\r\nhello'));
assert.equal(2, messageCount);
assert.ok(gotUpgrade);
}
function test4 () {
console.error("test4");
var messageCount = 0;
var gotUpgrade = false;
function test (s) {
for (var i = 0; i < s.length; i++) {
console.error('i=%d j=%d', i, s.length - i);
messageCount = 0;
gotUpgrade = false;
var p = new Parser();
p.on('message', function (m) {
//console.error("Got message: %j", m);
messageCount++;
if (messageCount == 1) {
assert.equal("world", m["hello"]);
assert.equal("y", m["x"]);
} else if (messageCount == 2) {
assert.equal("bar", m["foo"]);
}
});
p.on('upgrade', function (msg, rest) {
gotUpgrade = true;
assert.deepEqual({a: 'b'}, msg);
assert.equal('hello'.slice(0, rest.length), rest.toString());
});
var first = s.slice(0, i);
var second = s.slice(i);
p.execute(Buffer(first));
if (!gotUpgrade) p.execute(Buffer(second));
assert.equal(2, messageCount);
assert.ok(gotUpgrade);
}
}
test('{"hello": "world", "x": "y"}\r\n{"foo": "bar"}\r\nupgrade: {"a": "b"}\r\nhello');
test('{"hello": "world", "x": "y"}\n{"foo": "bar"}\r\nupgrade: {"a": "b"}\nhello');
test('\r\n{"hello": "world", "x": "y"}\n{"foo": "bar"}\nupgrade: {"a": "b"}\nhello');
}
test1();
test2();
test3();
test4();
console.error("DONE");
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment