Skip to content

Instantly share code, notes, and snippets.

@JakSprats
Created November 3, 2010 20:35
Show Gist options
  • Save JakSprats/661667 to your computer and use it in GitHub Desktop.
Save JakSprats/661667 to your computer and use it in GitHub Desktop.
redis.c
long fakeClientPipe(redisClient *c,
redisClient *rfc,
void *wfc, /* can be redisClient,list,LuaState */
int is_ins,
flag *flg,
bool (* adder)
(redisClient *c, void *x, robj *key, long *l, int b, int n),
bool (* emptyer) (redisClient *c)) {
struct redisCommand *cmd = lookupCommand(rfc->argv[0]->ptr);
cmd->proc(rfc);
listNode *ln;
*flg = PIPE_NONE_FLAG;
int nlines = 0;
long card = 1; /* ZER0 as pk can cause problems */
bool fline = 1;
listIter *li = listGetIterator(rfc->reply, AL_START_HEAD);
while((ln = listNext(li)) != NULL) {
robj *o = ln->value;
sds s = o->ptr;
bool o_fl = fline;
fline = 0;
//RL4 "PIPE: %s", s);
/* ignore protocol, we just want data */
if (*s == '\r' && *(s + 1) == '\n') continue;
/* TODO introduce more state -> data starting w/ '\r\n' ignored */
if (o_fl) {
if (*s == '-') {
*flg = PIPE_ERR_FLAG;
if (!(*adder)(c, wfc, o, &card, is_ins, nlines)) return -1;
break; /* error */
}
if (*s == '+') {
*flg = PIPE_ONE_LINER_FLAG;
if (!(*adder)(c, wfc, o, &card, is_ins, nlines)) return -1;
break; /* OK */
}
if (*s == ':') {
char *x = s + 1;
char *y = strchr(x, '\r'); /* ignore the final \r\n */
robj *r = createStringObject(x, y - x);
if (!(*adder)(c, wfc, r, &card, is_ins, nlines)) return -1;
break; /* single integer reply */
}
if (*s == '*') {
nlines = atoi(s+1); /* some pipes need to know num_lines */
if (nlines == 0) {
*flg = PIPE_EMPTY_SET_FLAG;
break;
}
continue;
}
}
if (*s == '$') { /* parse doubles which are w/in this list element */
if (*(s + 1) == '-') continue; /* $-1 -> nil */
char *x = strchr(s, '\r');
uint32 llen = x - s;
if (llen + 2 < sdslen(s)) { /* got a double */
x += 2; /* move past \r\n */
char *y = strchr(x, '\r'); /* ignore the final \r\n */
robj *r = createStringObject(x, y - x);
if (!(*adder)(c, wfc, r, &card, is_ins, nlines)) return -1;
}
continue;
}
/* all ranges are single */
if (!(*adder)(c, wfc, o, &card, is_ins, nlines)) return -1;
}
listReleaseIterator(li);
if (card == 1) { /* empty response from rfc */
if (!(*emptyer)(c)) return -1;
}
return card - 1; /* started at 1 */
}
void luaCommand(redisClient *c) {
LuaClient = c; /* used in func redisLua */
int s = luaL_dostring(Lua, c->argv[1]->ptr);
if (s) {
const char *x = lua_tostring(Lua, -1);
lua_pop(Lua, 1);
addReplySds(c, sdscatprintf(sdsempty(), "-ERR: Lua error: %s \r\n", x));
return;
}
int lret = lua_gettop(Lua);
if (LuaFlag == PIPE_EMPTY_SET_FLAG) {
addReply(c, shared.emptymultibulk);
} else if (LuaFlag == PIPE_ONE_LINER_FLAG || LuaFlag == PIPE_ERR_FLAG) {
char *x = (char *)lua_tostring(Lua, -1);
lua_pop(Lua, 1);
addReplySds(c, sdsnewlen(x, strlen(x)));
} else if (!lret) {
addReply(c, shared.nullbulk);
} else {
if (!lua_istable(Lua, -1)) { /* single line return */
char *x = (char *)lua_tostring(Lua, -1);
lua_pop(Lua, 1);
robj *r = createStringObject(x, strlen(x));
addReplyBulk(c, r);
decrRefCount(r);
} else {
const int len = lua_objlen(Lua, -1 );
addReplySds(c, sdscatprintf(sdsempty(), "*%d\r\n", len));
for ( int i = 1; i <= len; ++i ) {
lua_pushinteger(Lua, i);
lua_gettable(Lua, -2);
char *x = (char *)lua_tostring(Lua, -1);
robj *r = createStringObject(x, strlen(x));
addReplyBulk(c, r);
decrRefCount(r);
lua_pop(Lua, 1);
}
lua_pop(Lua, 1);
}
}
}
static bool luaLine(redisClient *c,
void *x,
robj *key,
long *card,
int i,
int n) {
c = NULL; i = 0; /* compiler warning */
lua_State *L = (lua_State *)x;
//RL4 "luaLine: %s", key->ptr);
if (n > 1) {
if (*card == 1) lua_newtable(L);
lua_pushnumber(L, *card);
lua_pushstring(L, key->ptr);
lua_settable(L, -3);
} else {
lua_pushstring(L, key->ptr);
}
*card = *card + 1;
return 1;
}
static int redisLuaArityErr(lua_State *L, char *name) {
char buf[64];
sprintf(buf, "-ERR wrong number of arguments for '%s' command\r\n", name);
lua_pushstring(L, buf);
LuaFlag = PIPE_ERR_FLAG;
return 1;
}
int redisLua(lua_State *L) {
LuaFlag = PIPE_NONE_FLAG;
int argc = lua_gettop(L);
const char *arg1 = lua_tostring(L, 1);
if (!arg1) {
return redisLuaArityErr(L, NULL);
}
redisCommand *cmd = lookupCommand((char *)arg1);
if (!cmd) {
char buf[64];
sprintf(buf, "-ERR: Unknown command '%s'\r\n", arg1);
lua_pushstring(L, buf);
LuaFlag = PIPE_ERR_FLAG;
return 1;
} else if ((cmd->arity > 0 && cmd->arity != argc) || (argc < -cmd->arity)) {
return redisLuaArityErr(L, cmd->name);
}
if (server.maxmemory && (cmd->flags & REDIS_CMD_DENYOOM) &&
zmalloc_used_memory() > server.maxmemory) {
char *buf =
"-ERR command not allowed when used memory > 'maxmemory'\r\n";
lua_pushstring(L, buf);
LuaFlag = PIPE_ERR_FLAG;
return 1;
}
if (server.vm_enabled &&
server.vm_max_threads > 0 &&
blockClientOnSwappedKeys(LuaClient, cmd)) return 1;
long ok = 0; /* must come before first goto */
redisClient *rfc = rsql_createFakeClient();
robj **rargv = zmalloc(sizeof(robj *) * argc);
rfc->argv = rargv;
for (int i = 0; i < argc; i++) {
if (!lua_isstring(L, i + 1)) {
char *lbuf = "args must be strings";
luaL_argerror (L, i, lbuf);
LuaFlag = PIPE_ERR_FLAG;
ok = 1;
goto redis_lua_err;
}
char *arg = (char *)lua_tostring(L, i + 1);
rfc->argv[i] = createStringObject(arg, strlen(arg));
}
rfc->argc = argc;
ok = fakeClientPipe(LuaClient, rfc, L, 0, &LuaFlag, luaLine, emptyNoop);
redis_lua_err:
// TODO free argv[]'s elements???
zfree(rargv);
rsql_freeFakeClient(rfc);
return ok ? 1 : 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment