Created
October 24, 2012 17:39
-
-
Save jeffkistler/3947556 to your computer and use it in GitHub Desktop.
Mock Redis
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import inspect | |
from redis.exceptions import ResponseError | |
class MockRedis(object): | |
def __init__(self, database=None): | |
self.database = database or {} | |
def _set(self, key, value, ttl=None): | |
self.database[key] = (value, time.time(), ttl) | |
def _get(self, key): | |
if key not in self.database: | |
return None | |
value, inserted, ttl = self.database[key] | |
if ttl and time.time() - inserted > ttl: | |
del self.database[key] | |
return None | |
return value | |
def _assert_list(self, key): | |
value = self._get(key) | |
if value is not None and not isinstance(value, list): | |
raise ResponseError() | |
return value | |
def _update(self, key, value): | |
if key in self.database: | |
old, inserted, ttl = self.database[key] | |
self.database[key] = value, inserted, ttl | |
def get(self, key): | |
value = self._get(key) | |
if value is not None and not isinstance(value, (int, float, basestring)): | |
raise ResponseError() | |
return value | |
def mget(self, keys): | |
return [self.get(key) for key in keys] | |
def set(self, key, value): | |
self._set(key, value) | |
return True | |
def setx(self, key, value, ttl=None): | |
self._set(key, value, ttl) | |
return True | |
def mset(self, mapping): | |
for key, value in mapping.iteritems(): | |
self._set(key, value) | |
def delete(self, key): | |
if self.exists(key): | |
del self.database[key] | |
return True | |
return False | |
def llen(self, key): | |
value = self._assert_list(key) | |
if value is None: | |
return 0 | |
return len(value) | |
def lpush(self, key, *values): | |
value = self._assert_list(key) | |
if value is None: | |
self._set(key, list(reversed(values))) | |
else: | |
value.insert(0, reversed(values)) | |
self._update(key, value) | |
return len(values) | |
def rpush(self, key, *values): | |
value = self._assert_list(key) | |
if value is None: | |
self._set(key, list(reversed(values))) | |
else: | |
value.extend(reversed(values)) | |
return len(values) | |
def lpop(self, key): | |
value = self._assert_list(key) | |
ret_val = None | |
if value: | |
ret_val = value.pop(0) | |
self._update(key, value) | |
return ret_val | |
def rpop(self, key): | |
value = self._assert_list(key) | |
if value: | |
ret_val = value.pop() | |
self._update(key, value) | |
return ret_val | |
def lpushx(self, key, value): | |
if self.exists(key): | |
self.lpush(key, value) | |
return 1 | |
return 0 | |
def rpushx(self, key, value): | |
if self.exists(key): | |
self.rpush(key, value) | |
return 1 | |
return 0 | |
def rpoplpush(self, key, other_key): | |
value = self.rpop(key) | |
self.lpush(other_key, value) | |
return value | |
def exists(self, key): | |
value = self._get(key) | |
if value is not None: | |
return True | |
return False | |
def expire(self, key, ttl=None): | |
value = self._get(key) | |
if value is None: | |
return False | |
value, mset, expires = self.database[key] | |
self._set(key, value, ttl) | |
def pipeline(self): | |
return MockPipeline(self) | |
class MockPipeline(object): | |
def __init__(self, client): | |
self.client = client | |
self.command_queue = [] | |
def __getattr__(self, name): | |
if not hasattr(self.client, name): | |
raise AttributeError | |
method = getattr(self.client, name) | |
def wrap(*args, **kwargs): | |
self.command_queue.append((name, args, kwargs)) | |
return self | |
return wrap | |
def execute(self): | |
commands = self.command_queue | |
results = [] | |
for name, args, kwargs in commands: | |
results.append(getattr(self.client, name)(*args, **kwargs)) | |
return results |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment