Last active
September 19, 2021 21:48
-
-
Save leafsummer/afc42b7cfabb761d2dd1 to your computer and use it in GitHub Desktop.
custom redis tools
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import redis | |
from ncelery import conf | |
class RedisClient(object): | |
""" | |
Singleton pattern | |
http://stackoverflow.com/questions/42558/python-and-the-singleton-pattern | |
""" | |
_instance = {} | |
def __new__(cls, *args): | |
if str(args) not in cls._instance: | |
cls._instance[str(args)] = super(RedisClient, cls).__new__(cls) | |
return cls._instance[str(args)] | |
def __init__(self, server_conf={'host': 'localhost', 'port': 6379}): | |
self.pool = redis.ConnectionPool(**server_conf) | |
self.rcli = redis.StrictRedis(connection_pool=self.pool) | |
@property | |
def __dict__(self): | |
try: | |
return self.rcli.__dict__ | |
except RuntimeError: | |
raise AttributeError('__dict__') | |
def __repr__(self): | |
return repr(self.rcli) | |
def __dir__(self): | |
try: | |
return dir(self.rcli) | |
except RuntimeError: | |
return [] | |
def __getattr__(self, name): | |
if name == '__members__': | |
return dir(self.rcli) | |
return getattr(self.rcli, name) | |
rc = RedisClient({ | |
'host': 'instance.redis.com', | |
'port': 6379, | |
'db': 0, | |
'password': None | |
}) | |
class RedisQueue(object): | |
""" | |
Args: | |
serializer: | |
the class or module to serialize msgs with, must have | |
methods or functions named ``dumps`` and ``loads`` | |
such as `pickle` and json are good idea | |
""" | |
def __init__(self, name, serializer=None, llen=None): | |
self.name = name | |
self.serializer = serializer | |
self.llen = llen | |
def __len__(self): | |
return rc.llen(self.name) | |
def __iter__(self): | |
for i in rc.lrange(self.name, 0, -1): | |
yield i | |
def __repr__(self): | |
return 'redis.queue: <%s>' % self.name | |
def getlist(self, start=None, end=None): | |
msg_list = [] | |
try: | |
start = start if start is not None else 0 | |
end = end if end is not None else self.__len__() | |
_list = rc.lrange(self.name, start, end) | |
if self.serializer is not None: | |
for msg in _list: | |
msg_list.append(self.serializer.loads(msg)) | |
else: | |
msg_list.extend(_list) | |
except: | |
pass | |
return msg_list | |
def put(self, msgs): | |
if self.serializer is not None: | |
msgs = self.serializer.dumps(msgs) | |
if self.llen is not None: | |
list_len = self.__len__() | |
if list_len >= self.llen: | |
pop_len = list_len - self.llen + 1 | |
for i in range(pop_len): | |
rc.lpop(self.name) | |
msg = rc.rpush(self.name, msgs) | |
return msg | |
def pop(self, block=False, timeout=0): | |
if block: | |
msg = rc.blpop(self.name, timeout) | |
if msg is not None: | |
msg = msg[1] | |
else: | |
msg = rc.lpop(self.name) | |
if self.serializer is not None and msg is not None: | |
return self.serializer.loads(msg) | |
return msg | |
def consume(self, block=True, timeout=0): | |
while 1: | |
try: | |
msg = self.pop(block, timeout) | |
yield msg | |
except KeyboardInterrupt: | |
return | |
def clear(self): | |
if rc.exists(self.name): | |
rc.delete(self.name) | |
class RedisString(object): | |
""" | |
Args: | |
serializer: | |
the class or module to serialize msgs with, must have | |
methods or functions named ``dumps`` and ``loads`` | |
such as `pickle` and json are good idea | |
""" | |
def __init__(self, serializer=None): | |
self.serializer = serializer | |
def __repr__(self): | |
return 'redis.string' | |
def set(self, name, msgs, timeout=None): | |
if self.serializer is not None: | |
msgs = self.serializer.dumps(msgs) | |
rc.set(name, msgs) | |
if timeout is not None: | |
rc.expire(name, int(timeout)) | |
def add(self, name, msgs, timeout=None): | |
if rc.exists(name): | |
return False | |
else: | |
self.set(name, msgs, timeout) | |
return True | |
def get(self, name, block=True): | |
msg = rc.get(name) | |
try: | |
if msg is not None and self.serializer is not None: | |
return self.serializer.loads(msg) | |
except: | |
pass | |
return msg | |
def keys(self, pattern='*'): | |
return rc.keys(pattern) | |
def isexist(self, name): | |
if rc.exists(name): | |
return True | |
else: | |
return False | |
def delete(self, name): | |
if rc.exists(name): | |
rc.delete(name) | |
class RedisSet(object): | |
""" | |
Args: | |
serializer: | |
the class or module to serialize msgs with, must have | |
methods or functions named ``dumps`` and ``loads`` | |
such as `pickle` and json are good idea | |
""" | |
def __init__(self, name, serializer=None): | |
self.name = name | |
self.serializer = serializer | |
def __len__(self): | |
return rc.scard(self.name) | |
def __iter__(self): | |
for i in rc.smembers(self.name): | |
yield i | |
def __repr__(self): | |
return 'redis.set:<%s>' % self.name | |
def isexist(self, msg): | |
if self.serializer is not None: | |
msg = self.serializer.dumps(msg) | |
if rc.sismember(self.name, msg): | |
return True | |
else: | |
return False | |
def add(self, msg): | |
if self.serializer is not None: | |
msg = self.serializer.dumps(msg) | |
flag = rc.sadd(self.name, msg) | |
if flag: | |
return True | |
else: | |
return False | |
def rem(self, msg): | |
if self.serializer is not None: | |
msg = self.serializer.dumps(msg) | |
flag = rc.srem(self.name, msg) | |
if flag: | |
return True | |
else: | |
return False | |
def getlist(self): | |
msg_list = list() | |
_msg_set = rc.smembers(self.name) | |
if self.serializer is not None: | |
for msg in _msg_set: | |
msg_list.append(self.serializer.loads(msg)) | |
else: | |
msg_list.extend(list(_msg_set)) | |
return msg_list | |
if __name__ == '__main__': | |
rq = RedisQueue('client:log:nta:trafficevent') | |
for m in rq.consume(): | |
print len(m), type(m) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment