openresty 因其非阻塞的调用,令服务器拥有高性能高并发,当涉及到数据库操作时,更应该选择有高速读写速度的redis进行数据处理。避免其应为读写数据而造成瓶颈。
openresty 默认就带了redis的库,这里先梳理下其自带redis连接库的操作流程,再根据存在问题进行二次封装。
自带redis连接库的操作流程
首先是连接redis
local redis = require "resty.redis" local redisCache = redis.new() local ok ,err = redisCache.connect(redisCache,127.0.0.1,6379) redisCache:set_timeout(1000) if not ok then ngx.log(ngx.ERR, "failed to connect: ", err) return end
简单的reids操作
local code , err = redisCache:get("code") local res , err = redisCache:incr("code") local ok , err = redisCache:set("code",1)
然后操作后要进行close
ok , err = redisCache:close()
如果项目中有大量redis操作,使用这个标准库就变成会有大量的代码在重复创建连接-->数据操作-->关闭连接(或放到连接池),甚至还要考虑不同的 return 情况做不同处理。
redis 库的二次封装
针对以上问题,也通过学习openresty的最佳实践,直接用上其中已经做好的二次封装。这个封装是要实现一下目的
- new、connect 函数合体,使用时只负责申请,尽量少关心什么时候具体连接、释放;
- 默认 redis 数据库连接地址,但是允许自定义;
- 每次 redis 使用完毕,自动释放 redis 连接到连接池供其他请求复用;
- 要支持 redis 的重要优化手段 pipeline;
我只理解了前三点,最后的pipeline也还不是很懂,先贴上代码:
local redis_c = require "resty.redis" local ok, new_tab = pcall(require, "table.new") if not ok or type(new_tab) ~= "function" then new_tab = function (narr, nrec) return {} end end local _M = new_tab(0, 155) _M._VERSION = ‘0.01‘ local commands = { "append", "auth", "bgrewriteaof", "bgsave", "bitcount", "bitop", "blpop", "brpop", "brpoplpush", "client", "config", "dbsize", "debug", "decr", "decrby", "del", "discard", "dump", "echo", "eval", "exec", "exists", "expire", "expireat", "flushall", "flushdb", "get", "getbit", "getrange", "getset", "hdel", "hexists", "hget", "hgetall", "hincrby", "hincrbyfloat", "hkeys", "hlen", "hmget", "hmset", "hscan", "hset", "hsetnx", "hvals", "incr", "incrby", "incrbyfloat", "info", "keys", "lastsave", "lindex", "linsert", "llen", "lpop", "lpush", "lpushx", "lrange", "lrem", "lset", "ltrim", "mget", "migrate", "monitor", "move", "mset", "msetnx", "multi", "object", "persist", "pexpire", "pexpireat", "ping", "psetex", "psubscribe", "pttl", "publish", --[[ "punsubscribe", ]] "pubsub", "quit", "randomkey", "rename", "renamenx", "restore", "rpop", "rpoplpush", "rpush", "rpushx", "sadd", "save", "scan", "scard", "script", "sdiff", "sdiffstore", "select", "set", "setbit", "setex", "setnx", "setrange", "shutdown", "sinter", "sinterstore", "sismember", "slaveof", "slowlog", "smembers", "smove", "sort", "spop", "srandmember", "srem", "sscan", "strlen", --[[ "subscribe", ]] "sunion", "sunionstore", "sync", "time", "ttl", "type", --[[ "unsubscribe", ]] "unwatch", "watch", "zadd", "zcard", "zcount", "zincrby", "zinterstore", "zrange", "zrangebyscore", "zrank", "zrem", "zremrangebyrank", "zremrangebyscore", "zrevrange", "zrevrangebyscore", "zrevrank", "zscan", "zscore", "zunionstore", "evalsha" } local mt = { __index = _M } local function is_redis_null( res ) if type(res) == "table" then for k,v in pairs(res) do if v ~= ngx.null then return false end end return true elseif res == ngx.null then return true elseif res == nil then return true end return false end -- change connect address as you need function _M.connect_mod( self, redis ) redis:set_timeout(self.timeout) return redis:connect(self.db_host, self.db_port) end function _M.set_keepalive_mod( redis ) -- put it into the connection pool of size 100, with 60 seconds max idle time return redis:set_keepalive(60000, 1000) end function _M.init_pipeline( self ) self._reqs = {} end function _M.commit_pipeline( self ) local reqs = self._reqs if nil == reqs or 0 == #reqs then return {}, "no pipeline" else self._reqs = nil end local redis, err = redis_c:new() if not redis then return nil, err end local ok, err = self:connect_mod(redis) if not ok then return {}, err end redis:init_pipeline() for _, vals in ipairs(reqs) do local fun = redis[vals[1]] table.remove(vals , 1) fun(redis, unpack(vals)) end local results, err = redis:commit_pipeline() if not results or err then return {}, err end if is_redis_null(results) then results = {} ngx.log(ngx.WARN, "is null") end -- table.remove (results , 1) self.set_keepalive_mod(redis) for i,value in ipairs(results) do if is_redis_null(value) then results[i] = nil end end return results, err end function _M.subscribe( self, channel ) local redis, err = redis_c:new() if not redis then return nil, err end local ok, err = self:connect_mod(redis) if not ok or err then return nil, err end local res, err = redis:subscribe(channel) if not res then return nil, err end local function do_read_func ( do_read ) if do_read == nil or do_read == true then res, err = redis:read_reply() if not res then return nil, err end return res end redis:unsubscribe(channel) self.set_keepalive_mod(redis) return end return do_read_func end local function do_command(self, cmd, ... ) if self._reqs then table.insert(self._reqs, {cmd, ...}) return end local redis, err = redis_c:new() if not redis then return nil, err end local ok, err = self:connect_mod(redis) if not ok or err then return nil, err end local fun = redis[cmd] local result, err = fun(redis, ...) if not result or err then -- ngx.log(ngx.ERR, "pipeline result:", result, " err:", err) return nil, err end if is_redis_null(result) then result = nil end self.set_keepalive_mod(redis) return result, err end function _M.new(self, opts) opts = opts or {} local timeout = (opts.timeout and opts.timeout * 1000) or 1000 local db_index= opts.db_index or 0 local db_host= opts.host or ‘127.0.0.1‘ local db_port= opts.port or 6379 for i = 1, #commands do local cmd = commands[i] _M[cmd] = function (self, ...) return do_command(self, cmd, ...) end end return setmetatable({ timeout = timeout, db_index = db_index, db_host = db_host, db_port = db_port, _reqs = nil }, mt) end return _M
这个二次封装除了解决上面几个问题,其实还重写了两个方法,不过订阅和发布暂时没用到,所以还没去细看这一部分。
使用示例
_config.redisConfig = { timeout = 1000, host = ‘127.0.0.1‘, port = 6379, } local redis = require (ngx.var.SERVER_DIR .. ".common.myRedis") local redisCache = redis:new(config.redisConfig) local accessDetails , err = redisCache:get("code")
甚至可以直接
local redisCache = redis:new()