Created
July 21, 2012 19:05
-
-
Save jzempel/3156848 to your computer and use it in GitHub Desktop.
LRU Cache
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
LRU Cache. | |
:author: Jonathan Zempel | |
""" | |
from threading import Lock | |
from unittest import main, TestCase | |
class Node(object): | |
"""Doubly-linked node. | |
:param data: The data for this node. | |
""" | |
def __init__(self, data): | |
self.next = None | |
self.previous = None | |
self.data = data | |
def link(self, node): | |
"""Link the given node after this node. | |
:param node: The node to link. | |
""" | |
node.next = self.next | |
node.previous = self | |
if self.next: | |
self.next.previous = node | |
self.next = node | |
def unlink(self): | |
"""Unlink this node. | |
""" | |
if self.previous: | |
self.previous.next = self.next | |
if self.next: | |
self.next.previous = self.previous | |
self.next = None | |
self.previous = None | |
class LRUCache(object): | |
"""Implementation of an LRU cache. | |
:param maximum: Default `10000`. Maximum cache size. | |
""" | |
KEY = 0 | |
VALUE = 1 | |
def __init__(self, maximum=10000): | |
self.cache = {} | |
self.head = Node(None) | |
self.tail = None | |
self.maximum = maximum | |
self.lock = Lock() | |
def __len__(self): | |
"""The current length of this cache. | |
""" | |
return len(self.cache) | |
def _trim(self): | |
"""Trim this cache to the specified maximum. | |
""" | |
while len(self) > self.maximum: | |
node = self.tail | |
key = node.data[self.KEY] | |
self.tail = node.previous | |
node.unlink() | |
del self.cache[key] | |
del node | |
def insert(self, key, value): | |
"""Insert a value for the given key into this cache. | |
:param key: The cache key. | |
:param value: The value to associate with the key. | |
""" | |
with self.lock: | |
data = (key, value) | |
if key in self.cache: | |
node = self.cache[key] | |
if node != self.head.next: | |
node.unlink() | |
self.head.link(node) | |
node.data = data | |
else: | |
node = Node(data) | |
self.head.link(node) | |
self.cache[key] = node | |
if self.tail is None: | |
self.tail = node | |
self._trim() | |
def lookup(self, key): | |
"""Lookup a value for the given key. Returns ``None`` if there is no | |
cached value for the given key. | |
:param key: The key to get a value for. | |
""" | |
node = self.cache.get(key) | |
if node: | |
if node != self.head.next: | |
node.unlink() | |
self.head.link(node) | |
ret_val = node.data[self.VALUE] | |
else: | |
ret_val = None | |
return ret_val | |
class TestLRUCache(TestCase): | |
def test_node(self): | |
node_a = Node('a') | |
node_b = Node('b') | |
node_c = Node('c') | |
node_a.link(node_c) | |
node_a.link(node_b) | |
# Node b is linked between a and c. | |
self.assertEquals(node_a.next.data, 'b') | |
self.assertEquals(node_c.previous.data, 'b') | |
node_b.unlink() | |
# Node a and c are directly linked. | |
self.assertEquals(node_a.next.data, 'c') | |
self.assertEquals(node_c.previous.data, 'a') | |
node_c.unlink() | |
# Node a and c are not linked. | |
self.assertIsNone(node_a.next) | |
self.assertIsNone(node_c.previous) | |
def test_length(self): | |
cache = LRUCache() | |
for key in range(cache.maximum + 1): | |
cache.insert(key, None) | |
# Length does not exceed maximum. | |
self.assertEquals(len(cache), cache.maximum) | |
node = cache.head.next | |
key = node.data[LRUCache.KEY] | |
# Head key equals maximum, indicating 0th element was trimmed. | |
self.assertEquals(key, cache.maximum) | |
def test_insert(self): | |
maximum = 10 | |
total = 100 | |
cache = LRUCache(maximum) | |
for key in range(total): | |
value = chr(key) | |
cache.insert(key, value) | |
node = cache.tail | |
count = 0 | |
while node.previous is not None: | |
key = node.data[LRUCache.KEY] | |
self.assertIn(key, cache.cache) | |
value = node.data[LRUCache.VALUE] | |
expected = chr(total - maximum + count) | |
# LRU cache is ordered descending by keys 90 (stale) to 99 (fresh). | |
self.assertEquals(value, expected) | |
node = node.previous | |
count += 1 | |
def test_lookup(self): | |
maximum = 10 | |
cache = LRUCache(maximum) | |
cache.insert(0, 'a') | |
# Simple lookup is successful. | |
self.assertEquals(cache.lookup(0), 'a') | |
length = ord('z') - ord('a') + 1 | |
for key in range(length): | |
value = chr(key + ord('a')) | |
cache.insert(key, value) | |
# Oldest value is no longer in the cache. | |
self.assertIsNone(cache.lookup(0)) | |
value = cache.head.next.data[LRUCache.VALUE] | |
# Newest value is at the head. | |
self.assertEquals(value, 'z') | |
key = ord('z') - ord('a') - maximum + 1 | |
value = chr(ord('z') - maximum + 1) | |
# Stalest value can be retrieved. | |
self.assertEquals(cache.lookup(key), value) | |
# And, as a result, is now the freshest value. | |
self.assertEquals(cache.head.next.data[LRUCache.VALUE], value) | |
# Ensure that subsequent values overwrite the same key. | |
cache.insert("foo", "bar") | |
cache.insert("foo", "baz") | |
self.assertEquals(cache.lookup("foo"), "baz") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment