Skip to content

Instantly share code, notes, and snippets.

@horatio-sans-serif
Created April 3, 2010 07:31
Show Gist options
  • Save horatio-sans-serif/354228 to your computer and use it in GitHub Desktop.
Save horatio-sans-serif/354228 to your computer and use it in GitHub Desktop.
// This is a fully interruptable, binary-safe Redis reply parser for Node.js.
// TODO remove debugging statements
var sys = require('sys');
var Buffer = require('buffer').Buffer;
var PLUS = exports.PLUS = 0x2B; // +
var MINUS = exports.MINUS = 0x2D; // -
var DOLLAR = exports.DOLLAR = 0x24; // $
var STAR = exports.STAR = 0x2A; // *
var COLON = exports.COLON = 0x3A; // :
var CR = exports.CR = 0x0D; // \r
var LF = exports.LF = 0x0A; // \n
var NONE = exports.NONE = "NONE";
var BULK = exports.BULK = "BULK";
var MULTIBULK = exports.MULTIBULK = "MULTIBULK";
var INLINE = exports.INLINE = "INLINE";
var INTEGER = exports.INTEGER = "INTEGER";
var ERROR = exports.ERROR = "ERROR";
function filter(c) {
c = c.replace(/\r\n/g, '\\r\\n');
c = c.replace(/\r/g, '\\r');
c = c.replace(/\n/g, '\\n');
return c;
}
function Parser(onReply, maxReplySize) {
this.maxReplySize = maxReplySize || (1024*8); // 8KiB max size by default
this.onReply = onReply;
this.clearState();
this.clearMultiBulkState();
}
Parser.prototype.clearState = function () {
this.type = NONE;
this.valueBufferLen = 0;
this.valueBuffer = new Buffer(this.maxReplySize);
this.bulkLengthExpected = null;
this.skip = 0;
};
Parser.prototype.clearMultiBulkState = function () {
this.multibulkReplies = null;
this.multibulkRepliesExpected = null;
};
Parser.prototype.feed = function (buf) {
sys.debug("============================================================");
sys.debug("Parsing: " + filter(buf.utf8Slice(0,buf.length))); // Assume UTF8 for debugging.
sys.debug("============================================================");
for (var i=0; i < buf.length; ++i) {
sys.debug("BYTE '" + filter(String.fromCharCode(buf[i])) + "'");
if (this.skip > 0) {
this.skip--;
sys.debug("SKIP!");
continue;
}
var typeBefore = this.type;
if (this.type == NONE) {
switch (buf[i]) {
case DOLLAR: this.type = BULK; break;
case STAR: this.type = MULTIBULK; break;
case COLON: this.type = INTEGER; break;
case PLUS: this.type = INLINE; break;
case MINUS: this.type = ERROR; break;
}
}
if (typeBefore != this.type) {
sys.debug("TYPE " + typeBefore + " => " + this.type);
continue;
}
// If the reply is a part of a multi-bulk reply. Save it. If we have
// received all the expected replies of a multi-bulk reply, then
// callback. If the reply is not part of a multi-bulk. Call back
// immediately.
var self = this;
function maybeCallbackWithReply(reply) {
if (self.multibulkReplies != null) {
sys.debug("IN A MULTIBULK");
self.multibulkReplies.push(reply);
if (--self.multibulkRepliesExpected == 0) {
self.onReply({ type:MULTIBULK, val:self.multibulkReplies });
self.clearMultiBulkState();
}
sys.debug('now, multibulkRepliesExpected: ' + self.multibulkRepliesExpected);
} else {
sys.debug("NOT IN A MULTIBULK");
self.onReply(reply);
}
self.clearState();
self.skip = 1; // Skip LF; this is only called from CR handlers
}
switch (buf[i]) {
case CR:
switch (this.type) {
case INLINE:
case ERROR:
// CR denotes end of the inline/error value.
// +OK\r\n
// ^
var slice = this.valueBuffer.slice(0, this.valueBufferLen);
maybeCallbackWithReply({ type:this.type, val:slice });
break;
case INTEGER:
// CR denotes the end of the integer value.
// :42\r\n
// ^
var n = parseInt(this.valueBuffer.asciiSlice(0, this.valueBufferLen), 10);
maybeCallbackWithReply({ type: INTEGER, val:n });
break;
case BULK:
sys.debug("CR in BULK");
if (this.bulkLengthExpected == null) {
// CR denotes end of first line of a bulk reply,
// which is the length of the bulk reply value.
// $5\r\nhello\r\n
// ^
var bulkLengthExpected =
parseInt(this.valueBuffer.asciiSlice(0, this.valueBufferLen), 10);
sys.debug("bulkLengthExpected = " + bulkLengthExpected);
if (bulkLengthExpected <= 0) {
maybeCallbackWithReply({ type:BULK, val:null });
} else {
this.clearState();
this.bulkLengthExpected = bulkLengthExpected;
this.type = BULK;
this.skip = 1; // skip LF
}
} else if (this.valueBufferLen == this.bulkLengthExpected) {
// CR denotes end of the bulk reply value.
// $5\r\nhello\r\n
// ^
var slice = this.valueBuffer.slice(0, this.valueBufferLen);
maybeCallbackWithReply({ type: BULK, val:slice });
} else {
// CR is just an embedded CR and has nothing to do
// with the reply specification.
// $11\r\nhello\rworld\r\n
// ^
this.valueBuffer[this.valueBufferLen++] = buf[i];
}
break;
case MULTIBULK:
// Parse the count which is the number of expected replies
// in the multi-bulk reply.
// *2\r\n$5\r\nhello\r\n$5\r\nworld\r\n
// ^
sys.debug("CR in MULTIBULK");
var multibulkRepliesExpected =
parseInt(this.valueBuffer.asciiSlice(0, this.valueBufferLen), 10);
sys.debug("multibulkRepliesExpected = " + multibulkRepliesExpected);
if (multibulkRepliesExpected <= 0) {
maybeCallbackWithReply({ type:MULTIBULK, val:null });
} else {
this.clearState();
this.type = NONE; // Pick up other types now for subreplies.
this.skip = 1; // skip LF
this.multibulkReplies = [];
this.multibulkRepliesExpected = multibulkRepliesExpected;
}
break;
}
break;
default:
this.valueBuffer[this.valueBufferLen++] = buf[i];
break;
}
if (this.valueBufferLen >= this.maxReplySize)
throw new Error("pending buffer overflow");
if (this.valueBufferLen > 0)
sys.debug("VALBUF \"" + filter(this.valueBuffer.utf8Slice(0, this.valueBufferLen) + "\""));
// sys.debug("LENGTH " + this.bulkLengthExpected);
// sys.debug("COUNT " + this.count);
}
};
function onReply(reply) {
sys.debug("------------------------------------------------------------");
sys.debug(reply.type + " REPLY");
// NB: we just assume that everything is UTF-8 here.
switch (reply.type) {
case INLINE:
case ERROR:
case BULK:
if (reply.val != null)
sys.debug("VALUE = " + reply.val.utf8Slice(0, reply.val.length));
else
sys.debug("NULL VALUE");
break;
case INTEGER:
sys.debug("VALUE = " + reply.val);
break;
case MULTIBULK:
sys.debug("MULTIBULK with " + reply.val.length + " replies");
for (var i=0; i<reply.val.length; ++i)
sys.debug(i + ". " + reply.val[i].val.utf8Slice(0, reply.val[i].val.length));
break;
}
sys.debug("------------------------------------------------------------");
}
function bufferFromString(str, encoding) {
var enc = encoding || 'utf8';
var buf = new Buffer(str.length);
if (enc == 'utf8') buf.utf8Write(str);
else if (enc == 'ascii') buf.asciiWrite(str);
return buf;
}
var p = new Parser(onReply);
p.feed(bufferFromString("+OK\r\n"));
p.feed(bufferFromString("-ERR you suck\r\n"));
p.feed(bufferFromString(":42\r\n"));
p.feed(bufferFromString(":-1\r\n"));
p.feed(bufferFromString("$3\r\nFOO\r\n"));
p.feed(bufferFromString("$-1\r\n"));
p.feed(bufferFromString("$0\r\n"));
p.feed(bufferFromString("*2\r\n$5\r\nHello\r\n$5\r\nWorld\r\n"));
~/projects/redis-node-client/parserwork(nodejs_buffers_and_streams) ⚡ node testbuf2.js
DEBUG: ============================================================
DEBUG: Parsing: +OK\r\n
DEBUG: ============================================================
DEBUG: BYTE '+'
DEBUG: TYPE NONE => INLINE
DEBUG: BYTE 'O'
DEBUG: VALBUF "O"
DEBUG: BYTE 'K'
DEBUG: VALBUF "OK"
DEBUG: BYTE '\r'
DEBUG: NOT IN A MULTIBULK
DEBUG: ------------------------------------------------------------
DEBUG: INLINE REPLY
DEBUG: VALUE = OK
DEBUG: ------------------------------------------------------------
DEBUG: BYTE '\n'
DEBUG: SKIP!
DEBUG: ============================================================
DEBUG: Parsing: -ERR you suck\r\n
DEBUG: ============================================================
DEBUG: BYTE '-'
DEBUG: TYPE NONE => ERROR
DEBUG: BYTE 'E'
DEBUG: VALBUF "E"
DEBUG: BYTE 'R'
DEBUG: VALBUF "ER"
DEBUG: BYTE 'R'
DEBUG: VALBUF "ERR"
DEBUG: BYTE ' '
DEBUG: VALBUF "ERR "
DEBUG: BYTE 'y'
DEBUG: VALBUF "ERR y"
DEBUG: BYTE 'o'
DEBUG: VALBUF "ERR yo"
DEBUG: BYTE 'u'
DEBUG: VALBUF "ERR you"
DEBUG: BYTE ' '
DEBUG: VALBUF "ERR you "
DEBUG: BYTE 's'
DEBUG: VALBUF "ERR you s"
DEBUG: BYTE 'u'
DEBUG: VALBUF "ERR you su"
DEBUG: BYTE 'c'
DEBUG: VALBUF "ERR you suc"
DEBUG: BYTE 'k'
DEBUG: VALBUF "ERR you suck"
DEBUG: BYTE '\r'
DEBUG: NOT IN A MULTIBULK
DEBUG: ------------------------------------------------------------
DEBUG: ERROR REPLY
DEBUG: VALUE = ERR you suck
DEBUG: ------------------------------------------------------------
DEBUG: BYTE '\n'
DEBUG: SKIP!
DEBUG: ============================================================
DEBUG: Parsing: :42\r\n
DEBUG: ============================================================
DEBUG: BYTE ':'
DEBUG: TYPE NONE => INTEGER
DEBUG: BYTE '4'
DEBUG: VALBUF "4"
DEBUG: BYTE '2'
DEBUG: VALBUF "42"
DEBUG: BYTE '\r'
DEBUG: NOT IN A MULTIBULK
DEBUG: ------------------------------------------------------------
DEBUG: INTEGER REPLY
DEBUG: VALUE = 42
DEBUG: ------------------------------------------------------------
DEBUG: BYTE '\n'
DEBUG: SKIP!
DEBUG: ============================================================
DEBUG: Parsing: :-1\r\n
DEBUG: ============================================================
DEBUG: BYTE ':'
DEBUG: TYPE NONE => INTEGER
DEBUG: BYTE '-'
DEBUG: VALBUF "-"
DEBUG: BYTE '1'
DEBUG: VALBUF "-1"
DEBUG: BYTE '\r'
DEBUG: NOT IN A MULTIBULK
DEBUG: ------------------------------------------------------------
DEBUG: INTEGER REPLY
DEBUG: VALUE = -1
DEBUG: ------------------------------------------------------------
DEBUG: BYTE '\n'
DEBUG: SKIP!
DEBUG: ============================================================
DEBUG: Parsing: $3\r\nFOO\r\n
DEBUG: ============================================================
DEBUG: BYTE '$'
DEBUG: TYPE NONE => BULK
DEBUG: BYTE '3'
DEBUG: VALBUF "3"
DEBUG: BYTE '\r'
DEBUG: CR in BULK
DEBUG: bulkLengthExpected = 3
DEBUG: BYTE '\n'
DEBUG: SKIP!
DEBUG: BYTE 'F'
DEBUG: VALBUF "F"
DEBUG: BYTE 'O'
DEBUG: VALBUF "FO"
DEBUG: BYTE 'O'
DEBUG: VALBUF "FOO"
DEBUG: BYTE '\r'
DEBUG: CR in BULK
DEBUG: NOT IN A MULTIBULK
DEBUG: ------------------------------------------------------------
DEBUG: BULK REPLY
DEBUG: VALUE = FOO
DEBUG: ------------------------------------------------------------
DEBUG: BYTE '\n'
DEBUG: SKIP!
DEBUG: ============================================================
DEBUG: Parsing: $-1\r\n
DEBUG: ============================================================
DEBUG: BYTE '$'
DEBUG: TYPE NONE => BULK
DEBUG: BYTE '-'
DEBUG: VALBUF "-"
DEBUG: BYTE '1'
DEBUG: VALBUF "-1"
DEBUG: BYTE '\r'
DEBUG: CR in BULK
DEBUG: bulkLengthExpected = -1
DEBUG: NOT IN A MULTIBULK
DEBUG: ------------------------------------------------------------
DEBUG: BULK REPLY
DEBUG: NULL VALUE
DEBUG: ------------------------------------------------------------
DEBUG: BYTE '\n'
DEBUG: SKIP!
DEBUG: ============================================================
DEBUG: Parsing: $0\r\n
DEBUG: ============================================================
DEBUG: BYTE '$'
DEBUG: TYPE NONE => BULK
DEBUG: BYTE '0'
DEBUG: VALBUF "0"
DEBUG: BYTE '\r'
DEBUG: CR in BULK
DEBUG: bulkLengthExpected = 0
DEBUG: NOT IN A MULTIBULK
DEBUG: ------------------------------------------------------------
DEBUG: BULK REPLY
DEBUG: NULL VALUE
DEBUG: ------------------------------------------------------------
DEBUG: BYTE '\n'
DEBUG: SKIP!
DEBUG: ============================================================
DEBUG: Parsing: *2\r\n$5\r\nHello\r\n$5\r\nWorld\r\n
DEBUG: ============================================================
DEBUG: BYTE '*'
DEBUG: TYPE NONE => MULTIBULK
DEBUG: BYTE '2'
DEBUG: VALBUF "2"
DEBUG: BYTE '\r'
DEBUG: CR in MULTIBULK
DEBUG: multibulkRepliesExpected = 2
DEBUG: BYTE '\n'
DEBUG: SKIP!
DEBUG: BYTE '$'
DEBUG: TYPE NONE => BULK
DEBUG: BYTE '5'
DEBUG: VALBUF "5"
DEBUG: BYTE '\r'
DEBUG: CR in BULK
DEBUG: bulkLengthExpected = 5
DEBUG: BYTE '\n'
DEBUG: SKIP!
DEBUG: BYTE 'H'
DEBUG: VALBUF "H"
DEBUG: BYTE 'e'
DEBUG: VALBUF "He"
DEBUG: BYTE 'l'
DEBUG: VALBUF "Hel"
DEBUG: BYTE 'l'
DEBUG: VALBUF "Hell"
DEBUG: BYTE 'o'
DEBUG: VALBUF "Hello"
DEBUG: BYTE '\r'
DEBUG: CR in BULK
DEBUG: IN A MULTIBULK
DEBUG: now, multibulkRepliesExpected: 1
DEBUG: BYTE '\n'
DEBUG: SKIP!
DEBUG: BYTE '$'
DEBUG: TYPE NONE => BULK
DEBUG: BYTE '5'
DEBUG: VALBUF "5"
DEBUG: BYTE '\r'
DEBUG: CR in BULK
DEBUG: bulkLengthExpected = 5
DEBUG: BYTE '\n'
DEBUG: SKIP!
DEBUG: BYTE 'W'
DEBUG: VALBUF "W"
DEBUG: BYTE 'o'
DEBUG: VALBUF "Wo"
DEBUG: BYTE 'r'
DEBUG: VALBUF "Wor"
DEBUG: BYTE 'l'
DEBUG: VALBUF "Worl"
DEBUG: BYTE 'd'
DEBUG: VALBUF "World"
DEBUG: BYTE '\r'
DEBUG: CR in BULK
DEBUG: IN A MULTIBULK
DEBUG: ------------------------------------------------------------
DEBUG: MULTIBULK REPLY
DEBUG: MULTIBULK with 2 replies
DEBUG: 0. Hello
DEBUG: 1. World
DEBUG: ------------------------------------------------------------
DEBUG: now, multibulkRepliesExpected: null
DEBUG: BYTE '\n'
DEBUG: SKIP!
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment