Skip to content

Instantly share code, notes, and snippets.

@james4388
Last active August 30, 2019 00:59
Show Gist options
  • Save james4388/494493cb1f30b4d21472c4d579dd385f to your computer and use it in GitHub Desktop.
Save james4388/494493cb1f30b4d21472c4d579dd385f to your computer and use it in GitHub Desktop.
Least frequency cache O(1) implementation using nested DoublyLinkedList and dictionary
"""Least frequency cache implementation using nested DoublyLinkedList and dictionary
Use bucket of frequency count B[1] contain all Item(key, value) with frequency 1
DEMO ONLY do not use in prod
Example
B[1] B[5] B[6]
I[1, 1] I[4, 4] I[5, 5]
I[2, 2]
I[3, 3]
get(4) -> 4: Increase count of 4, move to next bucket, remove B[5]
B[1] B[6]
I[1, 1] I[5, 5]
I[2, 2] I[4, 4]
I[3, 3]
get(2) -> 2: Increase count of 2, create B[2], move to next bucket
B[1] B[2] B[6]
I[1, 1] I[2, 2] I[5, 5]
I[3, 3] I[4, 4]
set(1, 4) -> 2: Still count 1 but move to the end of first bucket
B[1] B[2] B[6]
I[3, 3] I[2, 2] I[5, 5]
I[1, 1] I[4, 4]
"""
from typing import Any, Generator, Type
class DLLNode(object):
"""Doubly linked list head/tail Node"""
__slots__ = ('prev', 'next')
def __init__(self) -> None:
if not hasattr(self, 'prev'):
self.prev = None
if not hasattr(self, 'next'):
self.next = None
def remove(self) -> None:
# Exclude head, tail which is instance of this base
if type(self) != DLLNode and self.prev and self.next:
prev_node = self.prev
next_node = self.next
prev_node.next = next_node
next_node.prev = prev_node
self.prev = self.next = None
class LFUNode(DLLNode):
"""Cache node"""
__slots__ = ('key', 'value', 'count')
def __init__(self, key: Any=None, value: Any=None, count: int=1) -> None:
super(LFUNode, self).__init__()
self.key = key
self.value = value
self.count = count
class BucketNode(DLLNode):
"""Counter bucket node"""
__slots__ = ('id', 'items')
def __init__(self, id: int=1) -> None:
super(BucketNode, self).__init__()
self.id = id # count
self.items = DoublyLinkedList()
class DoublyLinkedList(object):
"""Doubly linked list implementation, head is also tail"""
__slots__ = ('head', 'tail')
def __init__(self) -> None:
node = DLLNode() # Dummy head
node.prev = node
node.next = node
self.head = node
self.tail = node
def is_empty(self) -> bool:
return self.head.next == self.tail
def append(self, node: Type[DLLNode]) -> None:
"""Append node to the end"""
if node == self.head:
return
node.remove() # In case node still in some list
tail = self.tail
prev_node = self.tail.prev
prev_node.next = tail.prev = node
node.prev = prev_node
node.next = tail
def insert_after(self, node: Type[DLLNode], target: Type[DLLNode]) -> None:
if node == self.head or node == target:
return
node.remove() # In case node still in some list
next_node = target.next
target.next = next_node.prev = node
node.prev = target
node.next = next_node
def appendleft(self, node: Type[DLLNode]) -> None:
"""Append node to the front"""
self.insert_after(node, self.head)
def remove(self, node: Type[DLLNode]) -> None:
"""Remove node"""
node.remove()
def move_to_front(self, node: Type[DLLNode], reverse: bool=False) -> None:
"""Move node to top or bottom of the list"""
if ((not reverse and node.prev == self.head) or
(reverse and node.next == self.tail)):
return # Already at desired location, nothing to do
if reverse:
self.append(node)
else:
self.appendleft(node)
def items(self, reverse: bool=False) -> Generator:
"""Iterate through all nodes"""
node = self.head.next if not reverse else self.tail.prev
while node and node != self.head:
# Store this in case node get moved to another list
next = node.next
prev = node.prev
yield node
node = next if not reverse else prev
__iter__ = items
def find(self, value: Any, reverse: bool=False, default: Any=None) -> Type[DLLNode]:
for node in self.items(reverse=reverse):
if node and node.value == value:
return node
return default
class LFUCache(DoublyLinkedList):
__slots__ = ('buckets', 'cache', 'size')
def __init__(self, size=100) -> None:
super(LFUCache, self).__init__()
self.size = size
self.cache = {}
self.buckets = {} # Bucket access by count
def update_count(self, node: Type[DLLNode], count: int=1) -> None:
"""Update item frequency and move item to correct bucket"""
old_bucket = self.buckets.get(node.count, None)
node.count += count
node_count = node.count
bucket = self.buckets.get(node_count, None)
if not bucket:
bucket = BucketNode(id=node_count)
if old_bucket:
self.insert_after(bucket, old_bucket)
else:
self.insert_after(bucket, self.head)
self.buckets[node_count] = bucket
bucket.items.append(node)
if old_bucket:
self.remove_empty_bucket(old_bucket)
def get(self, key: Any, default: Any=None) -> Any:
node = self.cache.get(key)
if node:
self.update_count(node)
return node.value
return default
__getitem__ = get
def remove_empty_bucket(self, bucket: Type[BucketNode]) -> None:
if bucket and bucket.items.is_empty():
bucket.remove()
self.buckets.pop(bucket.id, None)
del bucket
def remove_least_fequency(self) -> None:
# First bucket is always min
if self.is_empty():
return
min_bucket = self.head.next
if min_bucket:
if min_bucket.items.is_empty():
self.remove_empty_bucket(min_bucket)
return
min_node = min_bucket.items.head.next
if min_node:
min_node.remove()
self.cache.pop(min_node.key, None)
del min_node
self.remove_empty_bucket(min_bucket)
def put(self, key: Any, value: Any) -> None:
node = self.cache.get(key)
if node:
node.value = value
else:
# Set count to 0, Update count will increase it
node = LFUNode(key=key, value=value, count=1)
if len(self.cache) == self.size: # Limit reached, remove LF item
self.remove_least_fequency()
self.cache[key] = node
# Only make the node more recent, not make it into new bucket
self.update_count(node, count=0)
__setitem__ = put
def visualize(self):
current_size = len(self.cache)
print(f'LFUCache: max {self.size} current {current_size}')
for bucket in self:
print(f'B[{bucket.id}]', [(n.key, n.value) for n in bucket.items])
# Unit test
if __name__ == '__main__':
def test_dll():
a = LFUNode(value=1)
b = LFUNode(value=2)
c = LFUNode(value=3)
d = LFUNode(value=4)
e = LFUNode(value=5)
dl = DoublyLinkedList()
assert dl.is_empty(), 'dl is not empty'
dl2 = DoublyLinkedList()
dl.append(a)
dl.append(b)
dl.append(c)
dl.append(d)
dl.append(e)
assert [n.value for n in dl] == [1,2,3,4,5], 'Content is wrong'
assert not dl.is_empty(), 'dl is empty'
assert dl.find(4).value == 4, 'Find is not working'
assert dl.find(10) is None, 'Find is not working'
for n in dl:
dl2.appendleft(n)
assert [n.value for n in dl2] == [5,4,3,2,1], 'Content is wrong'
assert dl.is_empty(), 'dl is not empty'
assert not dl2.is_empty(), 'dl2 is empty'
a.remove()
b.remove()
assert [n.value for n in dl2] == [5,4,3], 'Content is wrong'
dl.append(a)
dl.append(b)
a.next.remove()
a.next.remove()
a.next.remove()
assert [n.value for n in dl] == [1], 'Content is wrong'
dl.append(a)
dl.append(b)
dl.append(c)
dl.append(d)
dl.append(e)
assert [n.value for n in dl] == [1,2,3,4,5], 'Content is wrong'
dl.insert_after(a, e)
assert [n.value for n in dl] == [2,3,4,5,1], 'Content is wrong'
dl.insert_after(a, e)
assert [n.value for n in dl] == [2,3,4,5,1], 'Content is wrong'
dl.insert_after(c, e)
assert [n.value for n in dl] == [2,4,5,3,1], 'Content is wrong'
# Test LFU
def test_lfu():
lfu = LFUCache(3)
lfu.put(1, 1)
lfu.put(2, 2)
lfu.put(3, 3)
lfu.visualize()
# LFUCache: max 3 current 3
# B[1] [(1, 1), (2, 2), (3, 3)
lfu.put(4, 4) # Evic 1, 1
lfu.visualize()
# LFUCache: max 3 current 3
# B[1] [(2, 2), (3, 3), (4, 4)]
assert lfu[3] == 3, "Where is 3?"
assert lfu[1] is None, "Why 1 is still here"
lfu.visualize()
# LFUCache: max 3 current 3
# B[1] [(2, 2), (4, 4)]
# B[2] [(3, 3)]
lfu[3]
lfu.visualize()
# LFUCache: max 3 current 3
# B[1] [(2, 2), (4, 4)]
# B[3] [(3, 3)]
lfu[5] = 5 # Evic 2
lfu.visualize()
# LFUCache: max 3 current 3
# B[1] [(4, 4), (5, 5)]
# B[3] [(3, 3)]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment