Created
October 26, 2010 02:10
-
-
Save mranney/646200 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/*global Buffer require exports console setTimeout */ | |
var net = require("net"), | |
sys = require("sys"), | |
events = require("events"), | |
default_port = 6379, | |
default_host = "127.0.0.1", | |
commands; | |
exports.debug_mode = false; | |
function RedisReplyParser() { | |
this.reset(); | |
events.EventEmitter.call(this); | |
} | |
sys.inherits(RedisReplyParser, events.EventEmitter); | |
// Buffer.toString() is quite slow for small strings | |
function small_toString(buf) { | |
var tmp = ""; | |
for (var i = 0, il = buf.end; i < il; i++) { | |
tmp += String.fromCharCode(buf[i]); | |
} | |
return tmp; | |
} | |
function to_array(args) { | |
var len = args.length, | |
arr = new Array(len), i; | |
for (i = 0; i < len; i += 1) { | |
arr[i] = args[i]; | |
} | |
return arr; | |
} | |
// Reset parser to it's original state. | |
RedisReplyParser.prototype.reset = function() { | |
this.state = "type"; | |
this.return_buffer = new Buffer(16384); // for holding replies, might grow | |
this.tmp_buffer = new Buffer(128); // for holding size fields | |
this.multi_bulk_length = 0; | |
this.multi_bulk_replies = null; | |
this.multi_bulk_nested_length = 0; | |
this.multi_bulk_nested_replies = null; | |
} | |
RedisReplyParser.prototype.execute = function (incoming_buf) { | |
var pos = 0, bd_tmp, bd_str, i; | |
//, state_times = {}, start_execute = new Date(), start_switch, end_switch, old_state; | |
//start_switch = new Date(); | |
while (pos < incoming_buf.length) { | |
// old_state = this.state; | |
// console.log("execute: " + this.state + ", " + pos + "/" + incoming_buf.length + ", " + String.fromCharCode(incoming_buf[pos])); | |
switch (this.state) { | |
case "type": | |
this.type = incoming_buf[pos]; | |
pos += 1; | |
switch (this.type) { | |
case 43: // + | |
this.state = "single line"; | |
this.return_buffer.end = 0; | |
break; | |
case 42: // * | |
this.state = "multi bulk count"; | |
this.tmp_buffer.end = 0; | |
break; | |
case 58: // : | |
this.state = "integer line"; | |
this.return_buffer.end = 0; | |
break; | |
case 36: // $ | |
this.state = "bulk length"; | |
this.tmp_buffer.end = 0; | |
break; | |
case 45: // - | |
this.state = "error line"; | |
this.return_buffer.end = 0; | |
break; | |
default: | |
this.state = "unknown type"; | |
} | |
break; | |
case "integer line": | |
if (incoming_buf[pos] === 13) { | |
this.send_reply(+small_toString(this.return_buffer)); | |
this.state = "final lf"; | |
} else { | |
this.return_buffer[this.return_buffer.end] = incoming_buf[pos]; | |
this.return_buffer.end += 1; | |
} | |
pos += 1; | |
break; | |
case "error line": | |
if (incoming_buf[pos] === 13) { | |
this.send_error(this.return_buffer.toString("ascii", 0, this.return_buffer.end)); | |
this.state = "final lf"; | |
} else { | |
this.return_buffer[this.return_buffer.end] = incoming_buf[pos]; | |
this.return_buffer.end += 1; | |
} | |
pos += 1; | |
break; | |
case "single line": | |
if (incoming_buf[pos] === 13) { | |
if (this.return_buffer.end > 10) { | |
bd_str = this.return_buffer.toString("utf8", 0, this.return_buffer.end); | |
} else { | |
bd_str = small_toString(this.return_buffer); | |
} | |
this.send_reply(bd_str); | |
this.state = "final lf"; | |
} else { | |
this.return_buffer[this.return_buffer.end] = incoming_buf[pos]; | |
this.return_buffer.end += 1; | |
// TODO - check for return_buffer overflow and then grow, copy, continue, and drink. | |
} | |
pos += 1; | |
break; | |
case "multi bulk count": | |
if (incoming_buf[pos] === 13) { // \r | |
this.state = "multi bulk count lf"; | |
} else { | |
this.tmp_buffer[this.tmp_buffer.end] = incoming_buf[pos]; | |
this.tmp_buffer.end += 1; | |
} | |
pos += 1; | |
break; | |
case "multi bulk count lf": | |
if (incoming_buf[pos] === 10) { // \n | |
if (this.multi_bulk_length) { // nested multi-bulk | |
this.multi_bulk_nested_length = this.multi_bulk_length; | |
this.multi_bulk_nested_replies = this.multi_bulk_replies; | |
} | |
this.multi_bulk_length = +small_toString(this.tmp_buffer); | |
this.multi_bulk_replies = []; | |
this.state = "type"; | |
if (0 === this.multi_bulk_length) { | |
this.send_reply(null); | |
} | |
} else { | |
this.emit("error", new Error("didn't see LF after NL reading multi bulk count")); | |
this.reset(); | |
return; | |
} | |
pos += 1; | |
break; | |
case "bulk length": | |
if (incoming_buf[pos] === 13) { // \r | |
this.state = "bulk lf"; | |
} else { | |
this.tmp_buffer[this.tmp_buffer.end] = incoming_buf[pos]; | |
this.tmp_buffer.end += 1; | |
} | |
pos += 1; | |
break; | |
case "bulk lf": | |
if (incoming_buf[pos] === 10) { // \n | |
this.bulk_length = +small_toString(this.tmp_buffer); | |
if (this.bulk_length === -1) { | |
this.send_reply(null); | |
this.state = "type"; | |
} else if (this.bulk_length === 0) { | |
this.send_reply(new Buffer("")); | |
this.state = "final cr"; | |
} else { | |
this.state = "bulk data"; | |
if (this.bulk_length > this.return_buffer.length) { | |
if (exports.debug_mode) { | |
console.log("Growing return_buffer from " + this.return_buffer.length + " to " + this.bulk_length); | |
} | |
this.return_buffer = new Buffer(this.bulk_length); | |
// home the old one gets cleaned up somehow | |
} | |
this.return_buffer.end = 0; | |
} | |
} else { | |
this.emit("error", new Error("didn't see LF after NL while reading bulk length")); | |
this.reset(); | |
return; | |
} | |
pos += 1; | |
break; | |
case "bulk data": | |
this.return_buffer[this.return_buffer.end] = incoming_buf[pos]; | |
this.return_buffer.end += 1; | |
pos += 1; | |
// TODO - should be faster to use Bufer.copy() here, especially if the response is large. | |
// However, when the response is small, Buffer.copy() seems a lot slower. Computers are hard. | |
if (this.return_buffer.end === this.bulk_length) { | |
bd_tmp = new Buffer(this.bulk_length); | |
if (this.bulk_length > 10) { | |
this.return_buffer.copy(bd_tmp, 0, 0, this.bulk_length); | |
} else { | |
for (var i = 0, il = this.bulk_length; i < il; i++) { | |
bd_tmp[i] = this.return_buffer[i]; | |
} | |
} | |
this.send_reply(bd_tmp); | |
this.state = "final cr"; | |
} | |
break; | |
case "final cr": | |
if (incoming_buf[pos] === 13) { // \r | |
this.state = "final lf"; | |
pos += 1; | |
} else { | |
this.emit("error", new Error("saw " + incoming_buf[pos] + " when expecting final CR")); | |
this.reset(); | |
return; | |
} | |
break; | |
case "final lf": | |
if (incoming_buf[pos] === 10) { // \n | |
this.state = "type"; | |
pos += 1; | |
} else { | |
this.emit("error", new Error("saw " + incoming_buf[pos] + " when expecting final LF")); | |
this.reset(); | |
return; | |
} | |
break; | |
default: | |
throw new Error("invalid state " + this.state); | |
} | |
// end_switch = new Date(); | |
// if (state_times[old_state] === undefined) { | |
// state_times[old_state] = 0; | |
// } | |
// state_times[old_state] += (end_switch - start_switch); | |
// start_switch = end_switch; | |
} | |
// console.log("execute ran for " + (Date.now() - start_execute) + " ms, on " + incoming_buf.length + " Bytes. "); | |
// Object.keys(state_times).forEach(function (state) { | |
// console.log(" " + state + ": " + state_times[state]); | |
// }); | |
}; | |
RedisReplyParser.prototype.send_error = function (reply) { | |
if (this.multi_bulk_length > 0 || this.multi_bulk_nested_length > 0) { | |
// TODO - can this happen? Seems like maybe not. | |
this.add_multi_bulk_reply(reply); | |
} else { | |
this.emit("reply error", reply); | |
} | |
}; | |
RedisReplyParser.prototype.send_reply = function (reply) { | |
if (this.multi_bulk_length > 0 || this.multi_bulk_nested_length > 0) { | |
this.add_multi_bulk_reply(reply); | |
} else { | |
this.emit("reply", reply); | |
} | |
}; | |
RedisReplyParser.prototype.add_multi_bulk_reply = function (reply) { | |
if (this.multi_bulk_replies) { | |
this.multi_bulk_replies.push(reply); | |
// use "less than" here because a nil mb reply claims "0 length", but we need 1 slot to hold it | |
if (this.multi_bulk_replies.length < this.multi_bulk_length) { | |
return; | |
} | |
} else { | |
this.multi_bulk_replies = reply; | |
} | |
if (this.multi_bulk_nested_length) { | |
this.multi_bulk_nested_replies.push(this.multi_bulk_replies); | |
this.multi_bulk_length = 0; | |
delete this.multi_bulk_replies; | |
if (this.multi_bulk_nested_length === this.multi_bulk_nested_replies.length) { | |
this.emit("reply", this.multi_bulk_nested_replies); | |
this.multi_bulk_nested_length = 0; | |
this.multi_bulk_nested_replies = null; | |
} | |
} else { | |
this.emit("reply", this.multi_bulk_replies); | |
this.multi_bulk_length = 0; | |
this.multi_bulk_replies = null; | |
} | |
}; | |
// Queue class adapted from Tim Caswell's pattern library | |
// http://github.com/creationix/pattern/blob/master/lib/pattern/queue.js | |
var Queue = function () { | |
this.tail = []; | |
this.head = to_array(arguments); | |
this.offset = 0; | |
}; | |
Queue.prototype.shift = function () { | |
if (this.offset === this.head.length) { | |
var tmp = this.head; | |
tmp.length = 0; | |
this.head = this.tail; | |
this.tail = tmp; | |
this.offset = 0; | |
if (this.head.length === 0) { | |
return; | |
} | |
} | |
return this.head[this.offset++]; | |
}; | |
Queue.prototype.push = function (item) { | |
return this.tail.push(item); | |
}; | |
Queue.prototype.forEach = function (fn, thisv) { | |
var array = this.head.slice(this.offset); | |
array.push.apply(array, this.tail); | |
if (thisv) { | |
for (var i = 0, il = array.length; i < il; i++) { | |
fn.call(thisv, array[i], i, array); | |
} | |
} else { | |
for (var i = 0, il = array.length; i < il; i++) { | |
fn(array[i], i, array); | |
} | |
} | |
return array; | |
}; | |
Object.defineProperty(Queue.prototype, 'length', { | |
get: function () { | |
return this.head.length - this.offset + this.tail.length; | |
} | |
}); | |
function RedisClient(stream) { | |
events.EventEmitter.call(this); | |
this.stream = stream; | |
this.connected = false; | |
this.connections = 0; | |
this.attempts = 1; | |
this.command_queue = new Queue(); // holds sent commands to de-pipeline them | |
this.offline_queue = new Queue(); // holds commands issued but not able to be sent | |
this.commands_sent = 0; | |
this.retry_delay = 250; | |
this.retry_backoff = 1.7; | |
this.subscriptions = false; | |
this.closing = false; | |
var self = this; | |
this.stream.on("connect", function () { | |
if (exports.debug_mode) { | |
console.log("Stream connected"); | |
} | |
self.connected = true; | |
self.connections += 1; | |
self.command_queue = new Queue(); | |
self.reply_parser = new RedisReplyParser(); | |
// "reply error" is an error sent back by redis | |
self.reply_parser.on("reply error", function (reply) { | |
self.return_error(reply); | |
}); | |
self.reply_parser.on("reply", function (reply) { | |
self.return_reply(reply); | |
}); | |
// "error" is bad. Somehow the parser got confused. It'll try to reset and continue. | |
self.reply_parser.on("error", function (err) { | |
self.emit("error", new Error("Redis reply parser error: " + err.stack)); | |
}); | |
self.retry_timer = null; | |
self.retry_delay = 250; | |
self.stream.setNoDelay(); | |
self.stream.setTimeout(0); | |
// give connect listeners a chance to run first in case they need to auth | |
self.emit("connect"); | |
var command_obj; | |
while (self.offline_queue.length > 0) { | |
command_obj = self.offline_queue.shift(); | |
if (exports.debug_mode) { | |
console.log("Sending offline command: " + command_obj.command); | |
} | |
self.send_command(command_obj.command, command_obj.args, command_obj.callback); | |
} | |
}); | |
this.stream.on("data", function (buffer_from_socket) { | |
self.on_data(buffer_from_socket); | |
}); | |
this.stream.on("error", function (msg) { | |
if (this.closing) { | |
return; | |
} | |
if (exports.debug_mode) { | |
console.warn("Connecting to redis server: " + msg); | |
} | |
self.offline_queue.forEach(function (args) { | |
if (typeof args[2] === "function") { | |
args[2]("Server connection could not be established"); | |
} | |
}); | |
self.connected = false; | |
self.emit("error", msg); | |
}); | |
this.stream.on("close", function () { | |
self.connection_gone("close"); | |
}); | |
this.stream.on("end", function () { | |
self.connection_gone("end"); | |
}); | |
events.EventEmitter.call(this); | |
} | |
sys.inherits(RedisClient, events.EventEmitter); | |
exports.RedisClient = RedisClient; | |
RedisClient.prototype.connection_gone = function (why) { | |
var self = this; | |
// If a retry is already in progress, just let that happen | |
if (self.retry_timer) { | |
return; | |
} | |
// Note that this may trigger another "close" or "end" event | |
self.stream.destroy(); | |
if (exports.debug_mode) { | |
console.warn("Redis connection is gone from " + why + " event."); | |
} | |
self.connected = false; | |
self.subscriptions = false; | |
self.emit("end"); | |
self.command_queue.forEach(function (args) { | |
if (typeof args[2] === "function") { | |
args[2]("Server connection closed"); | |
} | |
}); | |
// If this is a requested shutdown, then don't retry | |
if (self.closing) { | |
self.retry_timer = null; | |
return; | |
} | |
if (exports.debug_mode) { | |
console.log("Retry conneciton in " + self.retry_delay + " ms"); | |
} | |
self.attempts += 1; | |
self.emit("reconnecting", "delay " + self.retry_delay + ", attempt " + self.attempts); | |
self.retry_timer = setTimeout(function () { | |
if (exports.debug_mode) { | |
console.log("Retrying connection..."); | |
} | |
self.retry_delay = self.retry_delay * self.retry_backoff; | |
self.stream.connect(self.port, self.host); | |
self.retry_timer = null; | |
}, self.retry_delay); | |
}; | |
RedisClient.prototype.on_data = function (data) { | |
if (exports.debug_mode) { | |
console.log("on_data: " + data.toString()); | |
} | |
try { | |
this.reply_parser.execute(data); | |
} catch (err) { | |
this.emit("error", err); // pass the error along | |
} | |
}; | |
RedisClient.prototype.return_error = function (err) { | |
var command_obj = this.command_queue.shift(); | |
if (command_obj && typeof command_obj.callback === "function") { | |
command_obj.callback(err); | |
} else { | |
console.log("no callback to send error: " + sys.inspect(err)); | |
// this will probably not make it anywhere useful, but we might as well throw | |
throw new Error(err); | |
} | |
}; | |
RedisClient.prototype.return_reply = function (reply) { | |
var command_obj = this.command_queue.shift(), | |
obj, i, len, key, val, type; | |
if (command_obj) { | |
if (typeof command_obj.callback === "function") { | |
// HGETALL special case replies with keyed Buffers | |
if (reply && 'HGETALL' === command_obj.command) { | |
obj = {}; | |
for (i = 0, len = reply.length; i < len; i += 2) { | |
key = reply[i].toString(); | |
val = reply[i + 1]; | |
obj[key] = val; | |
} | |
reply = obj; | |
} | |
command_obj.callback(null, reply); | |
} else if (exports.debug_mode) { | |
console.log("no callback for reply: " + reply.toString()); | |
} | |
} else if (this.subscriptions) { | |
if (Array.isArray(reply)) { | |
type = reply[0].toString(); | |
if (type === "message") { | |
this.emit("message", reply[1].toString(), reply[2]); // channel, message | |
} else if (type === "pmessage") { | |
this.emit("pmessage", reply[1].toString(), reply[2].toString(), reply[3]); // pattern, channel, message | |
} else if (type === "subscribe" || type === "unsubscribe" || type === "psubscribe" || type === "punsubscribe") { | |
if (reply[2] === 0) { | |
this.subscriptions = false; | |
if (this.debug_mode) { | |
console.log("All subscriptions removed, exiting pub/sub mode"); | |
} | |
} | |
this.emit(type, reply[1].toString(), reply[2]); // channel, count | |
} else { | |
throw new Error("subscriptions are active but got unknown reply type " + type); | |
} | |
} else { | |
throw new Error("subscriptions are active but got an invalid reply: " + reply); | |
} | |
} | |
}; | |
RedisClient.prototype.send_command = function () { | |
var command, callback, args, this_args, command_obj, | |
elem_count, stream = this.stream, buffer_args, command_str = ""; | |
this_args = to_array(arguments); | |
if (this_args.length === 0) { | |
throw new Error("send_command: not enough arguments"); | |
} | |
command = this_args[0]; | |
if (this_args[1] && Array.isArray(this_args[1])) { | |
args = this_args[1]; | |
if (typeof this_args[2] === "function") { | |
callback = this_args[2]; | |
} | |
} else { | |
if (typeof this_args[this_args.length - 1] === "function") { | |
callback = this_args[this_args.length - 1]; | |
args = this_args.slice(1, this_args.length - 1); | |
} else { | |
args = this_args.slice(1, this_args.length); | |
} | |
} | |
if (typeof command !== "string") { | |
throw new Error("First argument of send_command must be the command name"); | |
} | |
command_obj = { | |
command: command, | |
args: args, | |
callback: callback | |
}; | |
if (! this.connected) { | |
if (exports.debug_mode) { | |
console.log("Queueing " + command + " for next server connection."); | |
} | |
this.offline_queue.push(command_obj); | |
return; | |
} | |
if (command === "SUBSCRIBE" || command === "PSUBSCRIBE" || command === "UNSUBSCRIBE" || command === "PUNSUBSCRIBE") { | |
if (this.subscriptions === false && exports.debug_mode) { | |
console.log("Entering pub/sub mode from " + command); | |
} | |
this.subscriptions = true; | |
} else { | |
if (command === "QUIT") { | |
this.closing = true; | |
} else if (this.subscriptions === true) { | |
throw new Error("Connection in pub/sub mode, only pub/sub commands may be used"); | |
} | |
this.command_queue.push(command_obj); | |
} | |
this.commands_sent += 1; | |
elem_count = 1; | |
buffer_args = false; | |
elem_count += args.length; | |
buffer_args = args.some(function (arg) { | |
// this is clever, but might be slow | |
return arg instanceof Buffer; | |
}); | |
// Always use "Multi bulk commands", but if passed any Buffer args, then do multiple writes, one for each arg | |
// This means that using Buffers in commands is going to be slower, so use Strings if you don't need binary. | |
command_str = "*" + elem_count + "\r\n$" + command.length + "\r\n" + command + "\r\n"; | |
if (! buffer_args) { // Build up a string and send entire command in one write | |
for (var i = 0, il = args.length, arg; i < il; i++) { | |
arg = args[i]; | |
if (typeof arg !== "string") { | |
arg = String(arg); | |
} | |
command_str += "$" + Buffer.byteLength(arg) + "\r\n" + arg + "\r\n"; | |
} | |
if (exports.debug_mode) { | |
console.log("send command: " + command_str); | |
} | |
// Need to catch "Stream is not writable" exception here and error everybody in the command queue out | |
stream.write(command_str); | |
} else { | |
if (exports.debug_mode) { | |
console.log("send command: " + command_str); | |
console.log("send command has Buffer arguments"); | |
} | |
stream.write(command_str); | |
for (var i = 0, il = args.length, arg; i < il; i++) { | |
arg = args[i]; | |
if (arg.length === undefined) { | |
arg = String(arg); | |
} | |
if (arg instanceof Buffer) { | |
if (arg.length === 0) { | |
if (exports.debug_mode) { | |
console.log("Using empty string for 0 length buffer"); | |
} | |
stream.write("$0\r\n\r\n"); | |
} else { | |
stream.write("$" + arg.length + "\r\n"); | |
stream.write(arg); | |
stream.write("\r\n"); | |
} | |
} else { | |
stream.write("$" + Buffer.byteLength(arg) + "\r\n" + arg + "\r\n"); | |
} | |
} | |
} | |
}; | |
RedisClient.prototype.end = function () { | |
this.stream._events = {}; | |
this.connected = false; | |
return this.stream.end(); | |
}; | |
// http://code.google.com/p/redis/wiki/CommandReference | |
commands = [ | |
// Connection handling | |
"QUIT", "AUTH", | |
// Commands operating on all value types | |
"EXISTS", "DEL", "TYPE", "KEYS", "RANDOMKEY", "RENAME", "RENAMENX", "DBSIZE", "EXPIRE", "TTL", "SELECT", | |
"MOVE", "FLUSHDB", "FLUSHALL", | |
// Commands operating on string values | |
"SET", "GET", "GETSET", "MGET", "SETNX", "SETEX", "MSET", "MSETNX", "INCR", "INCRBY", "DECR", "DECRBY", "APPEND", "SUBSTR", | |
// Commands operating on lists | |
"RPUSH", "LPUSH", "LLEN", "LRANGE", "LTRIM", "LINDEX", "LSET", "LREM", "LPOP", "RPOP", "BLPOP", "BRPOP", "RPOPLPUSH", | |
// Commands operating on sets | |
"SADD", "SREM", "SPOP", "SMOVE", "SCARD", "SISMEMBER", "SINTER", "SINTERSTORE", "SUNION", "SUNIONSTORE", "SDIFF", "SDIFFSTORE", | |
"SMEMBERS", "SRANDMEMBER", | |
// Commands operating on sorted zsets (sorted sets) | |
"ZADD", "ZREM", "ZINCRBY", "ZRANK", "ZREVRANK", "ZRANGE", "ZREVRANGE", "ZRANGEBYSCORE", "ZCOUNT", "ZCARD", "ZSCORE", | |
"ZREMRANGEBYRANK", "ZREMRANGEBYSCORE", "ZUNIONSTORE", "ZINTERSTORE", | |
// Commands operating on hashes | |
"HSET", "HSETNX", "HGET", "HMGET", "HMSET", "HINCRBY", "HEXISTS", "HDEL", "HLEN", "HKEYS", "HVALS", "HGETALL", | |
// Sorting | |
"SORT", | |
// Persistence control commands | |
"SAVE", "BGSAVE", "LASTSAVE", "SHUTDOWN", "BGREWRITEAOF", | |
// Remote server control commands | |
"INFO", "MONITOR", "SLAVEOF", "CONFIG", | |
// Publish/Subscribe | |
"PUBLISH", "SUBSCRIBE", "PSUBSCRIBE", "UNSUBSCRIBE", "PUNSUBSCRIBE", | |
// Undocumented commands | |
"PING", | |
// Redisql commands | |
"CHANGEDB", | |
"CREATE", "DROP", "DUMP", "DESC", | |
"INSERT", "UPDATE", "DELETE", "SCANSELECT", | |
"NORM", "DENORM" | |
]; | |
commands.forEach(function (command) { | |
RedisClient.prototype[command] = function () { | |
var args = to_array(arguments); | |
args.unshift(command); // put command at the beginning | |
console.log("Sending " + command + " with args " + JSON.stringify(args)); | |
this.send_command.apply(this, args); | |
}; | |
RedisClient.prototype[command.toLowerCase()] = RedisClient.prototype[command]; | |
}); | |
function Multi(client, args) { | |
this.client = client; | |
this.queue = [["MULTI"]]; | |
if (Array.isArray(args)) { | |
this.queue = this.queue.concat(args); | |
} | |
} | |
commands.forEach(function (command) { | |
Multi.prototype[command.toLowerCase()] = function () { | |
var args = to_array(arguments); | |
args.unshift(command); | |
this.queue.push(args); | |
return this; | |
}; | |
}); | |
Multi.prototype.exec = function(callback) { | |
var done = false, self = this; | |
// drain queue, callback will catch "QUEUED" or error | |
// Can't use a for loop here, as we need closure around the index. | |
this.queue.forEach(function(args, index) { | |
var command = args[0]; | |
if (typeof args[args.length - 1] === "function") { | |
args = args.slice(1, -1); | |
} else { | |
args = args.slice(1); | |
} | |
if (args.length === 1 && Array.isArray(args[0])) { | |
args = args[0]; | |
} | |
this.client.send_command(command, args, function (err, reply){ | |
if (err) { | |
var cur = self.queue[index]; | |
if (typeof cur[cur.length -1] === "function") { | |
cur[cur.length - 1](err); | |
} else { | |
throw new Error(err); | |
} | |
self.queue.splice(index, 1); | |
} | |
}); | |
}, this); | |
this.client.send_command("EXEC", function (err, replies) { | |
if (err) { | |
if (callback) { | |
callback(new Error(err)); | |
} else { | |
throw new Error(err); | |
} | |
} | |
for (var i = 1, il = self.queue.length; i < il; i++) { | |
var reply = replies[i - 1], | |
args = self.queue[i]; | |
// Convert HGETALL reply to object | |
if (reply && args[0] === "HGETALL") { | |
obj = {}; | |
for (var j = 0, jl = reply.length; j < jl; j += 2) { | |
key = reply[j].toString(); | |
val = reply[j + 1]; | |
obj[key] = val; | |
} | |
replies[i - 1] = reply = obj; | |
} | |
if (typeof args[args.length - 1] === "function") { | |
args[args.length - 1](null, reply); | |
} | |
} | |
if (callback) { | |
callback(null, replies); | |
} | |
}); | |
}; | |
RedisClient.prototype.multi = function (args) { | |
return new Multi(this, args); | |
}; | |
RedisClient.prototype.MULTI = function (args) { | |
return new Multi(this, args); | |
}; | |
exports.createClient = function (port_arg, host_arg, options) { | |
var port = port_arg || default_port, | |
host = host_arg || default_host, | |
red_client, net_client; | |
net_client = net.createConnection(port, host); | |
red_client = new RedisClient(net_client); | |
red_client.port = port; | |
red_client.host = host; | |
return red_client; | |
}; | |
exports.print = function (err, reply) { | |
if (err) { | |
console.log("Error: " + err); | |
} else { | |
console.log("Reply: " + reply); | |
} | |
}; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var redisql = require("./redisql"); | |
var client = redisql.createClient(); | |
var init = true; | |
var verbose = false; | |
redisql.debug_mode = true; | |
function print_response(obj) { | |
var lines = []; | |
Object.keys(obj).forEach(function (key) { | |
lines.push(" " + key + ": " + obj[key].toString()); | |
}); | |
console.log("Response hash: \n" + lines.join("\n")); | |
} | |
function run_test() { | |
if (init) { | |
console.log("Initializing"); | |
client.flushdb(); | |
if (verbose) { | |
console.log("First populate user:id:[name,age,status]"); | |
} | |
client.set("user:1:name", "bill"); | |
client.set("user:1:age", "33"); | |
client.set("user:1:status", "member"); | |
if (verbose) { | |
console.log("Then populate user:id:address[street,city,zipcode]"); | |
} | |
client.set("user:1:address:street", "12345 main st"); | |
client.set("user:1:address:city", "capitol city"); | |
client.set("user:1:address:zipcode", "55566"); | |
if (verbose) { | |
console.log("Then populate user:id:payment[type,account]"); | |
} | |
client.set("user:1:payment:type", "credit card"); | |
client.set("user:1:payment:account", "1234567890"); | |
client.set("user:2:name", "jane"); | |
client.set("user:2:age", "22"); | |
client.set("user:2:status", "premium"); | |
client.set("user:2:address:street", "345 side st"); | |
client.set("user:2:address:city", "capitol city"); | |
client.set("user:2:address:zipcode", "55566"); | |
client.set("user:2:payment:type", "checking"); | |
client.set("user:2:payment:account", "44441111"); | |
client.set("user:3:name", "ken"); | |
client.set("user:3:age", "44"); | |
client.set("user:3:status", "guest"); | |
client.set("user:3:address:street", "876 big st"); | |
client.set("user:3:address:city", "houston"); | |
client.set("user:3:address:zipcode", "87654"); | |
client.set("user:3:payment:type", "cash"); | |
if (verbose) { | |
console.log("Keys are now populated"); | |
console.log(""); | |
console.log("Finally search through all redis keys using "); | |
console.log(" the primary wildcard:\"user\" "); | |
console.log(" and then search through those results using:"); | |
console.log(" 1.) the secondary wildcard: \"*:address\" "); | |
console.log(" 2.) the secondary wildcard: \"*:payment\" "); | |
console.log(" 3.) non matching stil match the primary wildcard"); | |
console.log(""); | |
console.log("The 3 results will be normalised into the tables:"); | |
console.log(" 1.) user_address"); | |
console.log(" 2.) user_payment"); | |
console.log(" 3.) user"); | |
} | |
} | |
client.norm("user", "address,payment", function (err, res) { | |
if (err) { throw err; } | |
console.log("Response: " + res); | |
process.exit(); | |
}); | |
client.select("user.pk,user.name,user.status,user_address.city,user_address.street,user_address.pk,user_address.zipcode", "user,user_address", "user.pk = user_address.pk AND user.pk BETWEEN 1 AND 5", redisql.print); | |
if (verbose) { | |
console.log("\n\n"); | |
console.log("If pure lookup speed of a SINGLE column is the dominant use case"); | |
console.log("We can now denorm the redisql tables into redis hash-tables"); | |
console.log("which are faster for this use-case"); | |
console.log(""); | |
console.log("denorm user \user:* "); | |
} | |
client.denorm("user", 'user:*', redisql.print); | |
console.log("HGETALL user:1 "); | |
client.hgetall("user:1", function (err, res) { | |
if (err) { | |
throw err; | |
} | |
console.log("Printing response for user:1"); | |
print_response(res); | |
}); | |
console.log("denorm user_payment \user:*:payment "); | |
client.denorm("user_payment", 'user:*:payment', redisql.print); | |
console.log("HGETALL user:2:payment "); | |
client.hgetall("user:2:payment", function (err, res) { | |
if (err) { | |
throw err; | |
} | |
print_response(res); | |
}); | |
console.log("denorm user \user:*:address "); | |
client.denorm("user_address", 'user:*:address', redisql.print); | |
console.log("HGETALL user:3:address "); | |
client.hgetall("user:3:address", function (err, res) { | |
if (err) { | |
throw err; | |
} | |
print_response(res); | |
client.quit(); | |
}); | |
} | |
console.log("Connecting to RediSQL server..."); | |
client.on("connect", run_test); | |
client.on("error", function (e) { | |
console.warn("Error connecting to RediSQL server: " + e); | |
process.exit(1); | |
}); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var redis = require("redis"), | |
client = redis.createClient(), | |
commands; | |
redis.debug_mode = true; | |
// helper borrowed from node_redis | |
function to_array(args) { | |
var i; | |
var len = args.length; | |
var arr = new Array(len); | |
for (i = 0; i < len; i += 1) { | |
arr[i] = args[i]; | |
} | |
return arr; | |
} | |
// new commands that we'll be adding | |
commands = [ | |
"CHANGEDB", | |
"DUMP", "DESC", | |
"NORM", "DENORM" | |
]; | |
// merge these in with the RedisClient prototype in both upper and lower case | |
commands.forEach(function (command) { | |
redis.RedisClient.prototype[command] = function () { | |
var args = to_array(arguments); | |
args.unshift(command); // put command at the beginning | |
// remove this when everything works | |
console.log("Sending new command " + command + | |
" with args " + JSON.stringify(args)); | |
this.send_command.apply(this, args); | |
}; | |
redis.RedisClient.prototype[command.toLowerCase()] = | |
redis.RedisClient.prototype[command]; | |
}); | |
// CREATE | |
redis.RedisClient.prototype["CREATE"] = function () { | |
var args = to_array(arguments); | |
var mod_args; | |
if (args[0].toLowerCase() === "table") { | |
mod_args = "TABLE " + args[1] + " (" + args[2] + ")"; | |
} else if (args[0].toLowerCase() === "index") { | |
mod_args = "INDEX " + args[1] + " ON " + args[2] + " (" + args[3] + ")"; | |
} else { // usage error | |
throw new Error("Bad args to \"CREATE\" " + args[0] + | |
", must be either \"TABLE\" OR \"INDEX\""); | |
} | |
var command = "CREATE"; | |
var sargs = mod_args.split(' ');; | |
sargs.unshift(command); // put command at the beginning | |
console.log("Sending " + command + " with args " + JSON.stringify(sargs)); | |
this.send_command.apply(this, sargs); | |
}; | |
redis.RedisClient.prototype["create"] = redis.RedisClient.prototype["CREATE"]; | |
// DROP | |
redis.RedisClient.prototype["DROP"] = function () { | |
var args = to_array(arguments); | |
var mod_args; | |
if (args[0].toLowerCase() === "table") { | |
mod_args = "TABLE " + args[1]; | |
} else if (args[0].toLowerCase() === "index") { | |
mod_args = "INDEX " + args[1]; | |
} else { // usage error | |
throw new Error("Bad args to \"DROP\" " + args[0] + | |
", must be either \"TABLE\" OR \"INDEX\""); | |
} | |
var command = "DROP"; | |
var sargs = mod_args.split(' ');; | |
sargs.unshift(command); // put command at the beginning | |
console.log("Sending " + command + " with args " + JSON.stringify(sargs)); | |
this.send_command.apply(this, sargs); | |
}; | |
redis.RedisClient.prototype["drop"] = redis.RedisClient.prototype["DROP"]; | |
// SELECT | |
redis.RedisClient.prototype["SELECT"] = function () { | |
var args = to_array(arguments); | |
var mod_args; | |
if (args.length != 1) { // rewrite Redisql SELECT * FROM tbl WHERE id = 4 | |
mod_args = args[0] + " FROM " + args[1] + " WHERE " + args[2]; | |
} else { // redis SELECT DB | |
mod_args = arguments; | |
} | |
var command = "SELECT"; | |
var sargs = mod_args.split(' ');; | |
sargs.unshift(command); // put command at the beginning | |
console.log("Sending " + command + " with args " + JSON.stringify(sargs)); | |
this.send_command.apply(this, sargs); | |
}; | |
redis.RedisClient.prototype["select"] = redis.RedisClient.prototype["SELECT"]; | |
// SCANSELECT | |
redis.RedisClient.prototype["SCANSELECT"] = function () { | |
var args = to_array(arguments); | |
var mod_args = args[0] + " FROM " + args[1];; | |
if (args.length > 3) { | |
mod_args += " WHERE " + args[2]; | |
} | |
var command = "SCANSELECT"; | |
var sargs = mod_args.split(' ');; | |
sargs.unshift(command); // put command at the beginning | |
console.log("Sending " + command + " with args " + JSON.stringify(sargs)); | |
this.send_command.apply(this, sargs); | |
}; | |
redis.RedisClient.prototype["scanselect"] = | |
redis.RedisClient.prototype["SCANSELECT"]; | |
// INSERT | |
redis.RedisClient.prototype["INSERT"] = function () { | |
var args = to_array(arguments); | |
var mod_args = "INTO " + args[0] + " VALUES"; | |
var command = "INSERT"; | |
var sargs = mod_args.split(' ');; | |
sargs.unshift(command); // put command at the beginning | |
sargs.push(args[1]); // put val_list at end as single argument | |
console.log("Sending " + command + " with args " + JSON.stringify(sargs)); | |
this.send_command.apply(this, sargs); | |
}; | |
redis.RedisClient.prototype["insert"] = redis.RedisClient.prototype["INSERT"]; | |
// DELETE | |
redis.RedisClient.prototype["DELETE"] = function () { | |
var args = to_array(arguments); | |
var mod_args = "FROM " + args[0] + " WHERE " + args[1]; | |
var command = "DELETE"; | |
var sargs = mod_args.split(' ');; | |
sargs.unshift(command); // put command at the beginning | |
console.log("Sending " + command + " with args " + JSON.stringify(sargs)); | |
this.send_command.apply(this, sargs); | |
}; | |
redis.RedisClient.prototype["delete"] = redis.RedisClient.prototype["DELETE"]; | |
// UPDATE | |
redis.RedisClient.prototype["UPDATE"] = function () { | |
var args = to_array(arguments); | |
var mod_args = args[0] + " SET" | |
var sargs = mod_args.split(' ');; | |
sargs.push(args[1]); // push val_list at end as single argument | |
sargs.push("WHERE"); | |
var wargs = args[2].split(' '); | |
for (var i = 0; i < wargs.length; i++) { | |
sargs.push(wargs[i]); | |
}; | |
var command = "UPDATE"; | |
sargs.unshift(command); // put command at the beginning | |
console.log("Sending " + command + " with args " + JSON.stringify(sargs)); | |
this.send_command.apply(this, sargs); | |
}; | |
redis.RedisClient.prototype["update"] = redis.RedisClient.prototype["UPDATE"]; | |
exports.createClient = function (port_arg, host_arg) { | |
return redis.createClient(port_arg, host_arg); | |
}; | |
exports.print = redis.print; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var redis = require("./redisql"), | |
sys = require('sys'), | |
client = redis.createClient(); | |
client.flushdb(); | |
client.create("TABLE", "worker", "id int,division int,health int,salary TEXT, name TEXT", redis.print); | |
client.create("INDEX", "worker:division:index", "worker", "division", redis.print); | |
client.create("INDEX", "worker:health:index", "worker", "health", redis.print); | |
client.insert("worker", "(1,11,2,60000.66,jim)", redis.print); | |
client.insert("worker", "(2,22,1,30000.33,jack)", redis.print); | |
client.insert("worker", "(3,33,4,90000.99,bob)", redis.print); | |
client.insert("worker", "(4,44,3,70000.77,bill)", redis.print); | |
client.insert("worker", "(6,66,1,12000.99,jan)", redis.print); | |
client.insert("worker", "(7,66,1,11000.99,beth)", redis.print); | |
client.insert("worker", "(8,11,2,68888.99,mac)", redis.print); | |
client.insert("worker", "(9,22,1,31111.99,ken)", redis.print); | |
client.insert("worker", "(10,33,4,111111.99,seth)", redis.print); | |
client.scanselect("*", "worker", redis.print); | |
client.scanselect("*", "worker", "name=bill", redis.print); | |
client.select("*", "worker", "id=1", redis.print); | |
client.update("worker", "name=JIM", "id = 1", redis.print); | |
client.select("*", "worker", "id = 1", redis.print); | |
client.delete("worker", "id = 2", redis.print); | |
client.select("*", "worker", "id = 2", redis.print); | |
client.desc("worker", redis.print); | |
client.dump("worker", redis.print); | |
client.drop("index", "worker:health:index", redis.print); | |
client.drop("table", "worker", function (err, res) { | |
redis.print(); | |
client.quit(); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment