Created
July 31, 2015 07:10
-
-
Save moonbingbing/9915c66346e8fddcefb5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
local redis_c = require "resty.redis" | |
local ok, new_tab = pcall(require, "table.new") | |
if not ok or type(new_tab) ~= "function" then | |
new_tab = function (narr, nrec) return {} end | |
end | |
local _M = new_tab(0, 155) | |
_M._VERSION = '0.01' | |
local commands = { | |
"append", "auth", "bgrewriteaof", | |
"bgsave", "bitcount", "bitop", | |
"blpop", "brpop", | |
"brpoplpush", "client", "config", | |
"dbsize", | |
"debug", "decr", "decrby", | |
"del", "discard", "dump", | |
"echo", | |
"eval", "exec", "exists", | |
"expire", "expireat", "flushall", | |
"flushdb", "get", "getbit", | |
"getrange", "getset", "hdel", | |
"hexists", "hget", "hgetall", | |
"hincrby", "hincrbyfloat", "hkeys", | |
"hlen", | |
"hmget", "hmset", "hscan", | |
"hset", | |
"hsetnx", "hvals", "incr", | |
"incrby", "incrbyfloat", "info", | |
"keys", | |
"lastsave", "lindex", "linsert", | |
"llen", "lpop", "lpush", | |
"lpushx", "lrange", "lrem", | |
"lset", "ltrim", "mget", | |
"migrate", | |
"monitor", "move", "mset", | |
"msetnx", "multi", "object", | |
"persist", "pexpire", "pexpireat", | |
"ping", "psetex", "psubscribe", | |
"pttl", | |
"publish", --[[ "punsubscribe", ]] "pubsub", | |
"quit", | |
"randomkey", "rename", "renamenx", | |
"restore", | |
"rpop", "rpoplpush", "rpush", | |
"rpushx", "sadd", "save", | |
"scan", "scard", "script", | |
"sdiff", "sdiffstore", | |
"select", "set", "setbit", | |
"setex", "setnx", "setrange", | |
"shutdown", "sinter", "sinterstore", | |
"sismember", "slaveof", "slowlog", | |
"smembers", "smove", "sort", | |
"spop", "srandmember", "srem", | |
"sscan", | |
"strlen", --[[ "subscribe", ]] "sunion", | |
"sunionstore", "sync", "time", | |
"ttl", | |
"type", --[[ "unsubscribe", ]] "unwatch", | |
"watch", "zadd", "zcard", | |
"zcount", "zincrby", "zinterstore", | |
"zrange", "zrangebyscore", "zrank", | |
"zrem", "zremrangebyrank", "zremrangebyscore", | |
"zrevrange", "zrevrangebyscore", "zrevrank", | |
"zscan", | |
"zscore", "zunionstore", "evalsha" | |
} | |
local mt = { __index = _M } | |
local function is_redis_null( res ) | |
if type(res) == "table" then | |
for k,v in pairs(res) do | |
if v ~= ngx.null then | |
return false | |
end | |
end | |
return true | |
elseif res == ngx.null then | |
return true | |
elseif res == nil then | |
return true | |
end | |
return false | |
end | |
-- change connect address as you need | |
function _M.connect_mod( self, redis ) | |
redis:set_timeout(self.timeout) | |
return redis:connect("127.0.0.1", 6379) | |
end | |
function _M.set_keepalive_mod( redis ) | |
-- put it into the connection pool of size 100, with 60 seconds max idle time | |
return redis:set_keepalive(60000, 1000) | |
end | |
function _M.init_pipeline( self ) | |
self._reqs = {} | |
end | |
function _M.commit_pipeline( self ) | |
local reqs = self._reqs | |
if nil == reqs or 0 == #reqs then | |
return {}, "no pipeline" | |
else | |
self._reqs = nil | |
end | |
local redis, err = redis_c:new() | |
if not redis then | |
return nil, err | |
end | |
local ok, err = self:connect_mod(redis) | |
if not ok then | |
return {}, err | |
end | |
redis:init_pipeline() | |
for _, vals in ipairs(reqs) do | |
local fun = redis[vals[1]] | |
table.remove(vals , 1) | |
fun(redis, unpack(vals)) | |
end | |
local results, err = redis:commit_pipeline() | |
if not results or err then | |
return {}, err | |
end | |
if is_redis_null(results) then | |
results = {} | |
ngx.log(ngx.WARN, "is null") | |
end | |
-- table.remove (results , 1) | |
self.set_keepalive_mod(redis) | |
for i,value in ipairs(results) do | |
if is_redis_null(value) then | |
results[i] = nil | |
end | |
end | |
return results, err | |
end | |
function _M.subscribe( self, channel ) | |
local redis, err = redis_c:new() | |
if not redis then | |
return nil, err | |
end | |
local ok, err = self:connect_mod(redis) | |
if not ok or err then | |
return nil, err | |
end | |
local res, err = redis:subscribe(channel) | |
if not res then | |
return nil, err | |
end | |
res, err = redis:read_reply() | |
if not res then | |
return nil, err | |
end | |
redis:unsubscribe(channel) | |
self.set_keepalive_mod(redis) | |
return res, err | |
end | |
local function do_command(self, cmd, ... ) | |
if self._reqs then | |
table.insert(self._reqs, {cmd, ...}) | |
return | |
end | |
local redis, err = redis_c:new() | |
if not redis then | |
return nil, err | |
end | |
local ok, err = self:connect_mod(redis) | |
if not ok or err then | |
return nil, err | |
end | |
local fun = redis[cmd] | |
local result, err = fun(redis, ...) | |
if not result or err then | |
-- ngx.log(ngx.ERR, "pipeline result:", result, " err:", err) | |
return nil, err | |
end | |
if is_redis_null(result) then | |
result = nil | |
end | |
self.set_keepalive_mod(redis) | |
return result, err | |
end | |
function _M.new(self, opts) | |
opts = opts or {} | |
local timeout = (opts.timeout and opts.timeout * 1000) or 1000 | |
local db_index= opts.db_index or 0 | |
for i = 1, #commands do | |
local cmd = commands[i] | |
_M[cmd] = | |
function (self, ...) | |
return do_command(self, cmd, ...) | |
end | |
end | |
return setmetatable({ | |
timeout = timeout, | |
db_index = db_index, | |
_reqs = nil }, mt) | |
end | |
return _M |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
这个只能在没有区分数据库的情况下使用,如果业务逻辑上面需要区分数据库:先调用redis_db:select(db_index),再redis_db:commond,很有可能直接操作了别的数据库。