Created
April 3, 2010 07:31
-
-
Save horatio-sans-serif/354228 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This is a fully interruptable, binary-safe Redis reply parser for Node.js. | |
// TODO remove debugging statements | |
var sys = require('sys'); | |
var Buffer = require('buffer').Buffer; | |
var PLUS = exports.PLUS = 0x2B; // + | |
var MINUS = exports.MINUS = 0x2D; // - | |
var DOLLAR = exports.DOLLAR = 0x24; // $ | |
var STAR = exports.STAR = 0x2A; // * | |
var COLON = exports.COLON = 0x3A; // : | |
var CR = exports.CR = 0x0D; // \r | |
var LF = exports.LF = 0x0A; // \n | |
var NONE = exports.NONE = "NONE"; | |
var BULK = exports.BULK = "BULK"; | |
var MULTIBULK = exports.MULTIBULK = "MULTIBULK"; | |
var INLINE = exports.INLINE = "INLINE"; | |
var INTEGER = exports.INTEGER = "INTEGER"; | |
var ERROR = exports.ERROR = "ERROR"; | |
function filter(c) { | |
c = c.replace(/\r\n/g, '\\r\\n'); | |
c = c.replace(/\r/g, '\\r'); | |
c = c.replace(/\n/g, '\\n'); | |
return c; | |
} | |
function Parser(onReply, maxReplySize) { | |
this.maxReplySize = maxReplySize || (1024*8); // 8KiB max size by default | |
this.onReply = onReply; | |
this.clearState(); | |
this.clearMultiBulkState(); | |
} | |
Parser.prototype.clearState = function () { | |
this.type = NONE; | |
this.valueBufferLen = 0; | |
this.valueBuffer = new Buffer(this.maxReplySize); | |
this.bulkLengthExpected = null; | |
this.skip = 0; | |
}; | |
Parser.prototype.clearMultiBulkState = function () { | |
this.multibulkReplies = null; | |
this.multibulkRepliesExpected = null; | |
}; | |
Parser.prototype.feed = function (buf) { | |
sys.debug("============================================================"); | |
sys.debug("Parsing: " + filter(buf.utf8Slice(0,buf.length))); // Assume UTF8 for debugging. | |
sys.debug("============================================================"); | |
for (var i=0; i < buf.length; ++i) { | |
sys.debug("BYTE '" + filter(String.fromCharCode(buf[i])) + "'"); | |
if (this.skip > 0) { | |
this.skip--; | |
sys.debug("SKIP!"); | |
continue; | |
} | |
var typeBefore = this.type; | |
if (this.type == NONE) { | |
switch (buf[i]) { | |
case DOLLAR: this.type = BULK; break; | |
case STAR: this.type = MULTIBULK; break; | |
case COLON: this.type = INTEGER; break; | |
case PLUS: this.type = INLINE; break; | |
case MINUS: this.type = ERROR; break; | |
} | |
} | |
if (typeBefore != this.type) { | |
sys.debug("TYPE " + typeBefore + " => " + this.type); | |
continue; | |
} | |
// If the reply is a part of a multi-bulk reply. Save it. If we have | |
// received all the expected replies of a multi-bulk reply, then | |
// callback. If the reply is not part of a multi-bulk. Call back | |
// immediately. | |
var self = this; | |
function maybeCallbackWithReply(reply) { | |
if (self.multibulkReplies != null) { | |
sys.debug("IN A MULTIBULK"); | |
self.multibulkReplies.push(reply); | |
if (--self.multibulkRepliesExpected == 0) { | |
self.onReply({ type:MULTIBULK, val:self.multibulkReplies }); | |
self.clearMultiBulkState(); | |
} | |
sys.debug('now, multibulkRepliesExpected: ' + self.multibulkRepliesExpected); | |
} else { | |
sys.debug("NOT IN A MULTIBULK"); | |
self.onReply(reply); | |
} | |
self.clearState(); | |
self.skip = 1; // Skip LF; this is only called from CR handlers | |
} | |
switch (buf[i]) { | |
case CR: | |
switch (this.type) { | |
case INLINE: | |
case ERROR: | |
// CR denotes end of the inline/error value. | |
// +OK\r\n | |
// ^ | |
var slice = this.valueBuffer.slice(0, this.valueBufferLen); | |
maybeCallbackWithReply({ type:this.type, val:slice }); | |
break; | |
case INTEGER: | |
// CR denotes the end of the integer value. | |
// :42\r\n | |
// ^ | |
var n = parseInt(this.valueBuffer.asciiSlice(0, this.valueBufferLen), 10); | |
maybeCallbackWithReply({ type: INTEGER, val:n }); | |
break; | |
case BULK: | |
sys.debug("CR in BULK"); | |
if (this.bulkLengthExpected == null) { | |
// CR denotes end of first line of a bulk reply, | |
// which is the length of the bulk reply value. | |
// $5\r\nhello\r\n | |
// ^ | |
var bulkLengthExpected = | |
parseInt(this.valueBuffer.asciiSlice(0, this.valueBufferLen), 10); | |
sys.debug("bulkLengthExpected = " + bulkLengthExpected); | |
if (bulkLengthExpected <= 0) { | |
maybeCallbackWithReply({ type:BULK, val:null }); | |
} else { | |
this.clearState(); | |
this.bulkLengthExpected = bulkLengthExpected; | |
this.type = BULK; | |
this.skip = 1; // skip LF | |
} | |
} else if (this.valueBufferLen == this.bulkLengthExpected) { | |
// CR denotes end of the bulk reply value. | |
// $5\r\nhello\r\n | |
// ^ | |
var slice = this.valueBuffer.slice(0, this.valueBufferLen); | |
maybeCallbackWithReply({ type: BULK, val:slice }); | |
} else { | |
// CR is just an embedded CR and has nothing to do | |
// with the reply specification. | |
// $11\r\nhello\rworld\r\n | |
// ^ | |
this.valueBuffer[this.valueBufferLen++] = buf[i]; | |
} | |
break; | |
case MULTIBULK: | |
// Parse the count which is the number of expected replies | |
// in the multi-bulk reply. | |
// *2\r\n$5\r\nhello\r\n$5\r\nworld\r\n | |
// ^ | |
sys.debug("CR in MULTIBULK"); | |
var multibulkRepliesExpected = | |
parseInt(this.valueBuffer.asciiSlice(0, this.valueBufferLen), 10); | |
sys.debug("multibulkRepliesExpected = " + multibulkRepliesExpected); | |
if (multibulkRepliesExpected <= 0) { | |
maybeCallbackWithReply({ type:MULTIBULK, val:null }); | |
} else { | |
this.clearState(); | |
this.type = NONE; // Pick up other types now for subreplies. | |
this.skip = 1; // skip LF | |
this.multibulkReplies = []; | |
this.multibulkRepliesExpected = multibulkRepliesExpected; | |
} | |
break; | |
} | |
break; | |
default: | |
this.valueBuffer[this.valueBufferLen++] = buf[i]; | |
break; | |
} | |
if (this.valueBufferLen >= this.maxReplySize) | |
throw new Error("pending buffer overflow"); | |
if (this.valueBufferLen > 0) | |
sys.debug("VALBUF \"" + filter(this.valueBuffer.utf8Slice(0, this.valueBufferLen) + "\"")); | |
// sys.debug("LENGTH " + this.bulkLengthExpected); | |
// sys.debug("COUNT " + this.count); | |
} | |
}; | |
function onReply(reply) { | |
sys.debug("------------------------------------------------------------"); | |
sys.debug(reply.type + " REPLY"); | |
// NB: we just assume that everything is UTF-8 here. | |
switch (reply.type) { | |
case INLINE: | |
case ERROR: | |
case BULK: | |
if (reply.val != null) | |
sys.debug("VALUE = " + reply.val.utf8Slice(0, reply.val.length)); | |
else | |
sys.debug("NULL VALUE"); | |
break; | |
case INTEGER: | |
sys.debug("VALUE = " + reply.val); | |
break; | |
case MULTIBULK: | |
sys.debug("MULTIBULK with " + reply.val.length + " replies"); | |
for (var i=0; i<reply.val.length; ++i) | |
sys.debug(i + ". " + reply.val[i].val.utf8Slice(0, reply.val[i].val.length)); | |
break; | |
} | |
sys.debug("------------------------------------------------------------"); | |
} | |
function bufferFromString(str, encoding) { | |
var enc = encoding || 'utf8'; | |
var buf = new Buffer(str.length); | |
if (enc == 'utf8') buf.utf8Write(str); | |
else if (enc == 'ascii') buf.asciiWrite(str); | |
return buf; | |
} | |
var p = new Parser(onReply); | |
p.feed(bufferFromString("+OK\r\n")); | |
p.feed(bufferFromString("-ERR you suck\r\n")); | |
p.feed(bufferFromString(":42\r\n")); | |
p.feed(bufferFromString(":-1\r\n")); | |
p.feed(bufferFromString("$3\r\nFOO\r\n")); | |
p.feed(bufferFromString("$-1\r\n")); | |
p.feed(bufferFromString("$0\r\n")); | |
p.feed(bufferFromString("*2\r\n$5\r\nHello\r\n$5\r\nWorld\r\n")); | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
~/projects/redis-node-client/parserwork(nodejs_buffers_and_streams) ⚡ node testbuf2.js | |
DEBUG: ============================================================ | |
DEBUG: Parsing: +OK\r\n | |
DEBUG: ============================================================ | |
DEBUG: BYTE '+' | |
DEBUG: TYPE NONE => INLINE | |
DEBUG: BYTE 'O' | |
DEBUG: VALBUF "O" | |
DEBUG: BYTE 'K' | |
DEBUG: VALBUF "OK" | |
DEBUG: BYTE '\r' | |
DEBUG: NOT IN A MULTIBULK | |
DEBUG: ------------------------------------------------------------ | |
DEBUG: INLINE REPLY | |
DEBUG: VALUE = OK | |
DEBUG: ------------------------------------------------------------ | |
DEBUG: BYTE '\n' | |
DEBUG: SKIP! | |
DEBUG: ============================================================ | |
DEBUG: Parsing: -ERR you suck\r\n | |
DEBUG: ============================================================ | |
DEBUG: BYTE '-' | |
DEBUG: TYPE NONE => ERROR | |
DEBUG: BYTE 'E' | |
DEBUG: VALBUF "E" | |
DEBUG: BYTE 'R' | |
DEBUG: VALBUF "ER" | |
DEBUG: BYTE 'R' | |
DEBUG: VALBUF "ERR" | |
DEBUG: BYTE ' ' | |
DEBUG: VALBUF "ERR " | |
DEBUG: BYTE 'y' | |
DEBUG: VALBUF "ERR y" | |
DEBUG: BYTE 'o' | |
DEBUG: VALBUF "ERR yo" | |
DEBUG: BYTE 'u' | |
DEBUG: VALBUF "ERR you" | |
DEBUG: BYTE ' ' | |
DEBUG: VALBUF "ERR you " | |
DEBUG: BYTE 's' | |
DEBUG: VALBUF "ERR you s" | |
DEBUG: BYTE 'u' | |
DEBUG: VALBUF "ERR you su" | |
DEBUG: BYTE 'c' | |
DEBUG: VALBUF "ERR you suc" | |
DEBUG: BYTE 'k' | |
DEBUG: VALBUF "ERR you suck" | |
DEBUG: BYTE '\r' | |
DEBUG: NOT IN A MULTIBULK | |
DEBUG: ------------------------------------------------------------ | |
DEBUG: ERROR REPLY | |
DEBUG: VALUE = ERR you suck | |
DEBUG: ------------------------------------------------------------ | |
DEBUG: BYTE '\n' | |
DEBUG: SKIP! | |
DEBUG: ============================================================ | |
DEBUG: Parsing: :42\r\n | |
DEBUG: ============================================================ | |
DEBUG: BYTE ':' | |
DEBUG: TYPE NONE => INTEGER | |
DEBUG: BYTE '4' | |
DEBUG: VALBUF "4" | |
DEBUG: BYTE '2' | |
DEBUG: VALBUF "42" | |
DEBUG: BYTE '\r' | |
DEBUG: NOT IN A MULTIBULK | |
DEBUG: ------------------------------------------------------------ | |
DEBUG: INTEGER REPLY | |
DEBUG: VALUE = 42 | |
DEBUG: ------------------------------------------------------------ | |
DEBUG: BYTE '\n' | |
DEBUG: SKIP! | |
DEBUG: ============================================================ | |
DEBUG: Parsing: :-1\r\n | |
DEBUG: ============================================================ | |
DEBUG: BYTE ':' | |
DEBUG: TYPE NONE => INTEGER | |
DEBUG: BYTE '-' | |
DEBUG: VALBUF "-" | |
DEBUG: BYTE '1' | |
DEBUG: VALBUF "-1" | |
DEBUG: BYTE '\r' | |
DEBUG: NOT IN A MULTIBULK | |
DEBUG: ------------------------------------------------------------ | |
DEBUG: INTEGER REPLY | |
DEBUG: VALUE = -1 | |
DEBUG: ------------------------------------------------------------ | |
DEBUG: BYTE '\n' | |
DEBUG: SKIP! | |
DEBUG: ============================================================ | |
DEBUG: Parsing: $3\r\nFOO\r\n | |
DEBUG: ============================================================ | |
DEBUG: BYTE '$' | |
DEBUG: TYPE NONE => BULK | |
DEBUG: BYTE '3' | |
DEBUG: VALBUF "3" | |
DEBUG: BYTE '\r' | |
DEBUG: CR in BULK | |
DEBUG: bulkLengthExpected = 3 | |
DEBUG: BYTE '\n' | |
DEBUG: SKIP! | |
DEBUG: BYTE 'F' | |
DEBUG: VALBUF "F" | |
DEBUG: BYTE 'O' | |
DEBUG: VALBUF "FO" | |
DEBUG: BYTE 'O' | |
DEBUG: VALBUF "FOO" | |
DEBUG: BYTE '\r' | |
DEBUG: CR in BULK | |
DEBUG: NOT IN A MULTIBULK | |
DEBUG: ------------------------------------------------------------ | |
DEBUG: BULK REPLY | |
DEBUG: VALUE = FOO | |
DEBUG: ------------------------------------------------------------ | |
DEBUG: BYTE '\n' | |
DEBUG: SKIP! | |
DEBUG: ============================================================ | |
DEBUG: Parsing: $-1\r\n | |
DEBUG: ============================================================ | |
DEBUG: BYTE '$' | |
DEBUG: TYPE NONE => BULK | |
DEBUG: BYTE '-' | |
DEBUG: VALBUF "-" | |
DEBUG: BYTE '1' | |
DEBUG: VALBUF "-1" | |
DEBUG: BYTE '\r' | |
DEBUG: CR in BULK | |
DEBUG: bulkLengthExpected = -1 | |
DEBUG: NOT IN A MULTIBULK | |
DEBUG: ------------------------------------------------------------ | |
DEBUG: BULK REPLY | |
DEBUG: NULL VALUE | |
DEBUG: ------------------------------------------------------------ | |
DEBUG: BYTE '\n' | |
DEBUG: SKIP! | |
DEBUG: ============================================================ | |
DEBUG: Parsing: $0\r\n | |
DEBUG: ============================================================ | |
DEBUG: BYTE '$' | |
DEBUG: TYPE NONE => BULK | |
DEBUG: BYTE '0' | |
DEBUG: VALBUF "0" | |
DEBUG: BYTE '\r' | |
DEBUG: CR in BULK | |
DEBUG: bulkLengthExpected = 0 | |
DEBUG: NOT IN A MULTIBULK | |
DEBUG: ------------------------------------------------------------ | |
DEBUG: BULK REPLY | |
DEBUG: NULL VALUE | |
DEBUG: ------------------------------------------------------------ | |
DEBUG: BYTE '\n' | |
DEBUG: SKIP! | |
DEBUG: ============================================================ | |
DEBUG: Parsing: *2\r\n$5\r\nHello\r\n$5\r\nWorld\r\n | |
DEBUG: ============================================================ | |
DEBUG: BYTE '*' | |
DEBUG: TYPE NONE => MULTIBULK | |
DEBUG: BYTE '2' | |
DEBUG: VALBUF "2" | |
DEBUG: BYTE '\r' | |
DEBUG: CR in MULTIBULK | |
DEBUG: multibulkRepliesExpected = 2 | |
DEBUG: BYTE '\n' | |
DEBUG: SKIP! | |
DEBUG: BYTE '$' | |
DEBUG: TYPE NONE => BULK | |
DEBUG: BYTE '5' | |
DEBUG: VALBUF "5" | |
DEBUG: BYTE '\r' | |
DEBUG: CR in BULK | |
DEBUG: bulkLengthExpected = 5 | |
DEBUG: BYTE '\n' | |
DEBUG: SKIP! | |
DEBUG: BYTE 'H' | |
DEBUG: VALBUF "H" | |
DEBUG: BYTE 'e' | |
DEBUG: VALBUF "He" | |
DEBUG: BYTE 'l' | |
DEBUG: VALBUF "Hel" | |
DEBUG: BYTE 'l' | |
DEBUG: VALBUF "Hell" | |
DEBUG: BYTE 'o' | |
DEBUG: VALBUF "Hello" | |
DEBUG: BYTE '\r' | |
DEBUG: CR in BULK | |
DEBUG: IN A MULTIBULK | |
DEBUG: now, multibulkRepliesExpected: 1 | |
DEBUG: BYTE '\n' | |
DEBUG: SKIP! | |
DEBUG: BYTE '$' | |
DEBUG: TYPE NONE => BULK | |
DEBUG: BYTE '5' | |
DEBUG: VALBUF "5" | |
DEBUG: BYTE '\r' | |
DEBUG: CR in BULK | |
DEBUG: bulkLengthExpected = 5 | |
DEBUG: BYTE '\n' | |
DEBUG: SKIP! | |
DEBUG: BYTE 'W' | |
DEBUG: VALBUF "W" | |
DEBUG: BYTE 'o' | |
DEBUG: VALBUF "Wo" | |
DEBUG: BYTE 'r' | |
DEBUG: VALBUF "Wor" | |
DEBUG: BYTE 'l' | |
DEBUG: VALBUF "Worl" | |
DEBUG: BYTE 'd' | |
DEBUG: VALBUF "World" | |
DEBUG: BYTE '\r' | |
DEBUG: CR in BULK | |
DEBUG: IN A MULTIBULK | |
DEBUG: ------------------------------------------------------------ | |
DEBUG: MULTIBULK REPLY | |
DEBUG: MULTIBULK with 2 replies | |
DEBUG: 0. Hello | |
DEBUG: 1. World | |
DEBUG: ------------------------------------------------------------ | |
DEBUG: now, multibulkRepliesExpected: null | |
DEBUG: BYTE '\n' | |
DEBUG: SKIP! |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment