Created
October 25, 2015 15:34
-
-
Save BertrandBordage/7f33b3ccf2686e8da9ed to your computer and use it in GitHub Desktop.
Django LocMem cache backend without pickle
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"Thread-safe in-memory cache backend." | |
import time | |
from contextlib import contextmanager | |
from django.core.cache.backends.base import DEFAULT_TIMEOUT, BaseCache | |
from django.utils.synch import RWLock | |
# Global in-memory store of cache data. Keyed by name, to provide | |
# multiple named local memory caches. | |
_caches = {} | |
_expire_info = {} | |
_locks = {} | |
@contextmanager | |
def dummy(): | |
"""A context manager that does nothing special.""" | |
yield | |
class LocMemCache(BaseCache): | |
def __init__(self, name, params): | |
BaseCache.__init__(self, params) | |
self._cache = _caches.setdefault(name, {}) | |
self._expire_info = _expire_info.setdefault(name, {}) | |
self._lock = _locks.setdefault(name, RWLock()) | |
def add(self, key, value, timeout=DEFAULT_TIMEOUT, version=None): | |
key = self.make_key(key, version=version) | |
self.validate_key(key) | |
with self._lock.writer(): | |
if self._has_expired(key): | |
self._set(key, value, timeout) | |
return True | |
return False | |
def get(self, key, default=None, version=None, acquire_lock=True): | |
key = self.make_key(key, version=version) | |
self.validate_key(key) | |
value = None | |
with (self._lock.reader() if acquire_lock else dummy()): | |
if not self._has_expired(key): | |
value = self._cache[key] | |
if value is not None: | |
return value | |
with (self._lock.writer() if acquire_lock else dummy()): | |
try: | |
del self._cache[key] | |
del self._expire_info[key] | |
except KeyError: | |
pass | |
return default | |
def _set(self, key, value, timeout=DEFAULT_TIMEOUT): | |
if len(self._cache) >= self._max_entries: | |
self._cull() | |
self._cache[key] = value | |
self._expire_info[key] = self.get_backend_timeout(timeout) | |
def set(self, key, value, timeout=DEFAULT_TIMEOUT, version=None): | |
key = self.make_key(key, version=version) | |
self.validate_key(key) | |
with self._lock.writer(): | |
self._set(key, value, timeout) | |
def incr(self, key, delta=1, version=None): | |
with self._lock.writer(): | |
value = self.get(key, version=version, acquire_lock=False) | |
if value is None: | |
raise ValueError("Key '%s' not found" % key) | |
new_value = value + delta | |
key = self.make_key(key, version=version) | |
self._cache[key] = new_value | |
return new_value | |
def has_key(self, key, version=None): | |
key = self.make_key(key, version=version) | |
self.validate_key(key) | |
with self._lock.reader(): | |
if not self._has_expired(key): | |
return True | |
with self._lock.writer(): | |
try: | |
del self._cache[key] | |
del self._expire_info[key] | |
except KeyError: | |
pass | |
return False | |
def _has_expired(self, key): | |
exp = self._expire_info.get(key, -1) | |
if exp is None or exp > time.time(): | |
return False | |
return True | |
def _cull(self): | |
if self._cull_frequency == 0: | |
self.clear() | |
else: | |
doomed = [k for (i, k) in enumerate(self._cache) if i % self._cull_frequency == 0] | |
for k in doomed: | |
self._delete(k) | |
def _delete(self, key): | |
try: | |
del self._cache[key] | |
except KeyError: | |
pass | |
try: | |
del self._expire_info[key] | |
except KeyError: | |
pass | |
def delete(self, key, version=None): | |
key = self.make_key(key, version=version) | |
self.validate_key(key) | |
with self._lock.writer(): | |
self._delete(key) | |
def clear(self): | |
self._cache.clear() | |
self._expire_info.clear() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi, I would like to use it in my project. What is the license? Thanks :)