Created
September 13, 2010 22:15
-
-
Save ketralnis/578165 to your computer and use it in GitHub Desktop.
simplelrucache.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
A simple LRU cache | |
""" | |
class NotFound(object): | |
pass | |
class LRUCacheEntry(object): | |
__slots__ = ('prev', # the next most recently used item | |
'next', | |
'key', | |
'value') | |
def __init__(self, next=None, prev=None, key=None, value=None): | |
self.next = next | |
self.prev = prev | |
self.key = key | |
self.value = value | |
def __repr__(self): | |
return '<%s (%r,%r)>' % (self.__class__.__name__, self.key, self.value) | |
class LRUCacheWalker(object): | |
def __init__(self, head): | |
self.head = head | |
def next(self): | |
if self.head is None: | |
raise StopIteration | |
ret = self.head | |
self.head = ret.next | |
return ret | |
def __iter__(self): | |
return self | |
class LRUCache(object): | |
head = None # most recently used LRUCacheEntry; has prev==None | |
tail = None # least recently used LRUCacheEntry; has next==None | |
# (will be None if the cache is one item) | |
table = None # actual hashtable of values for lookups | |
size = 0 # maximum size | |
def __init__(self, size): | |
self.size = size | |
self.table = {} | |
self.head = self.tail = None | |
def get(self, key, default=NotFound): | |
try: | |
obj = self.table[key] | |
self._touch(obj) | |
return obj.value | |
except KeyError: | |
if default is NotFound: | |
raise | |
else: | |
return default | |
def set(self, key, value): | |
try: | |
obj = self.table[key] | |
obj.value = value | |
self._touch(obj) | |
except KeyError: | |
# since it's the newest item it must be the most recently | |
# accessed, so we'll put it at the head | |
newhead = LRUCacheEntry(value=value, key=key) | |
self.table[key] = newhead | |
self._head(newhead) | |
self._cutoff() | |
def delete(self, key): | |
try: | |
obj = self.table[key] | |
self._detach(obj) | |
# and finally remove it from the table | |
del self.table[key] | |
except KeyError: | |
pass | |
def _touch(self, obj): | |
"""Make an existing item be the most recently accessed""" | |
if self.head is obj: | |
# it's already the head so we can do nothing | |
return | |
self._detach(obj) | |
self._head(obj) | |
def _cutoff(self): | |
"""Check if the cache is too big and excise the least recently | |
used items if necessary""" | |
while len(self) > self.size: | |
# this will not behave well with self.size<3 | |
self.delete(self.tail.key) | |
# _detach and _head are doing all of the low-level control of the | |
# linked-list pointers. Don't touch then outside of these | |
# functions because they do things like len==1 special-casing | |
def _detach(self, obj): | |
# it has to already be in the list | |
assert ((len(self) == 1 or not (obj.next is None and obj.prev is None)) | |
and obj.key in self.table) | |
if len(self) == 1: | |
# it's the only item in the list; empty out the list | |
obj.next = obj.prev = self.head = self.tail = None | |
elif len(self) == 2: | |
# we're removing one of two items, so we're turning it | |
# into a one-item cache | |
remaining = self.tail if self.head is obj else self.head | |
obj.next = obj.prev = self.head = self.tail = None | |
remaining.next = remaining.prev = None | |
self.head = remaining | |
elif obj.next is None: | |
# it's the tail | |
self.tail = obj.prev | |
self.tail.next = None # make the new tail be proper | |
obj.prev = None # break the circular reference for GC | |
elif obj.prev is None: | |
# it's the head | |
self.head = obj.next | |
self.head.prev = None | |
obj.next = None | |
else: | |
# it's in the middle | |
# link its neighbours together | |
prev = obj.prev | |
next = obj.next | |
next.prev = prev | |
prev.next = next | |
obj.next = obj.prev = None # orphan it from the list | |
def _head(self, obj): | |
"""Place a non-attached LRUCacheEntry at the head of the | |
list""" | |
assert obj.next is None and obj.prev is None and obj.key in self.table | |
if self.head is None and self.tail is None: | |
# adding to an empty cache | |
self.head = obj | |
obj.next = obj.prev = None | |
self.tail = None | |
elif self.head is not None and self.tail is None: | |
# adding to a one-item cache. This item is already in | |
# self.table but not in the linked list | |
assert len(self) == 2 | |
self.tail = self.head | |
self.tail.prev = obj | |
self.tail.next = None | |
self.head = obj | |
self.head.prev = None | |
self.head.next = self.tail | |
else: | |
oldhead = self.head | |
oldhead.prev = obj | |
obj.next = oldhead | |
obj.prev = None | |
self.head = obj | |
def _walk(self): | |
return LRUCacheWalker(self.head) | |
def keys(self): | |
return self.table.keys() | |
def __in__(self, key): | |
# should checking for in-ness also _touch the key? | |
return key in self.table | |
def __setitem__(self, key, value): | |
return self.set(key, value) | |
def __getitem__(self, key): | |
return self.get(key) | |
def __delete__(self, key): | |
return self.delete(key) | |
def __len__(self): | |
return len(self.table) | |
def __repr__(self): | |
return '<%s(%d/%d)>' % (self.__class__.__name__, | |
len(self), self.size) | |
def test_lrucache(): | |
import random | |
cache = LRUCache(1000) | |
for x in xrange(10000): | |
k = random.random() | |
cache.set(x, k) | |
assert cache.get(x) == k | |
if random.random() > 0.95: | |
cache.delete(random.choice(cache.keys())) | |
keys = cache.keys() | |
random.shuffle(keys) | |
for x in keys: | |
cache.get(x) | |
if __name__ == '__main__': | |
for x in xrange(10000): | |
test_lrucache() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
FWIW http://docs.python.org/dev/library/functools.html#functools.lru_cache