Last active
August 30, 2019 00:59
-
-
Save james4388/494493cb1f30b4d21472c4d579dd385f to your computer and use it in GitHub Desktop.
Least frequency cache O(1) implementation using nested DoublyLinkedList and dictionary
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Least frequency cache implementation using nested DoublyLinkedList and dictionary | |
Use bucket of frequency count B[1] contain all Item(key, value) with frequency 1 | |
DEMO ONLY do not use in prod | |
Example | |
B[1] B[5] B[6] | |
I[1, 1] I[4, 4] I[5, 5] | |
I[2, 2] | |
I[3, 3] | |
get(4) -> 4: Increase count of 4, move to next bucket, remove B[5] | |
B[1] B[6] | |
I[1, 1] I[5, 5] | |
I[2, 2] I[4, 4] | |
I[3, 3] | |
get(2) -> 2: Increase count of 2, create B[2], move to next bucket | |
B[1] B[2] B[6] | |
I[1, 1] I[2, 2] I[5, 5] | |
I[3, 3] I[4, 4] | |
set(1, 4) -> 2: Still count 1 but move to the end of first bucket | |
B[1] B[2] B[6] | |
I[3, 3] I[2, 2] I[5, 5] | |
I[1, 1] I[4, 4] | |
""" | |
from typing import Any, Generator, Type | |
class DLLNode(object): | |
"""Doubly linked list head/tail Node""" | |
__slots__ = ('prev', 'next') | |
def __init__(self) -> None: | |
if not hasattr(self, 'prev'): | |
self.prev = None | |
if not hasattr(self, 'next'): | |
self.next = None | |
def remove(self) -> None: | |
# Exclude head, tail which is instance of this base | |
if type(self) != DLLNode and self.prev and self.next: | |
prev_node = self.prev | |
next_node = self.next | |
prev_node.next = next_node | |
next_node.prev = prev_node | |
self.prev = self.next = None | |
class LFUNode(DLLNode): | |
"""Cache node""" | |
__slots__ = ('key', 'value', 'count') | |
def __init__(self, key: Any=None, value: Any=None, count: int=1) -> None: | |
super(LFUNode, self).__init__() | |
self.key = key | |
self.value = value | |
self.count = count | |
class BucketNode(DLLNode): | |
"""Counter bucket node""" | |
__slots__ = ('id', 'items') | |
def __init__(self, id: int=1) -> None: | |
super(BucketNode, self).__init__() | |
self.id = id # count | |
self.items = DoublyLinkedList() | |
class DoublyLinkedList(object): | |
"""Doubly linked list implementation, head is also tail""" | |
__slots__ = ('head', 'tail') | |
def __init__(self) -> None: | |
node = DLLNode() # Dummy head | |
node.prev = node | |
node.next = node | |
self.head = node | |
self.tail = node | |
def is_empty(self) -> bool: | |
return self.head.next == self.tail | |
def append(self, node: Type[DLLNode]) -> None: | |
"""Append node to the end""" | |
if node == self.head: | |
return | |
node.remove() # In case node still in some list | |
tail = self.tail | |
prev_node = self.tail.prev | |
prev_node.next = tail.prev = node | |
node.prev = prev_node | |
node.next = tail | |
def insert_after(self, node: Type[DLLNode], target: Type[DLLNode]) -> None: | |
if node == self.head or node == target: | |
return | |
node.remove() # In case node still in some list | |
next_node = target.next | |
target.next = next_node.prev = node | |
node.prev = target | |
node.next = next_node | |
def appendleft(self, node: Type[DLLNode]) -> None: | |
"""Append node to the front""" | |
self.insert_after(node, self.head) | |
def remove(self, node: Type[DLLNode]) -> None: | |
"""Remove node""" | |
node.remove() | |
def move_to_front(self, node: Type[DLLNode], reverse: bool=False) -> None: | |
"""Move node to top or bottom of the list""" | |
if ((not reverse and node.prev == self.head) or | |
(reverse and node.next == self.tail)): | |
return # Already at desired location, nothing to do | |
if reverse: | |
self.append(node) | |
else: | |
self.appendleft(node) | |
def items(self, reverse: bool=False) -> Generator: | |
"""Iterate through all nodes""" | |
node = self.head.next if not reverse else self.tail.prev | |
while node and node != self.head: | |
# Store this in case node get moved to another list | |
next = node.next | |
prev = node.prev | |
yield node | |
node = next if not reverse else prev | |
__iter__ = items | |
def find(self, value: Any, reverse: bool=False, default: Any=None) -> Type[DLLNode]: | |
for node in self.items(reverse=reverse): | |
if node and node.value == value: | |
return node | |
return default | |
class LFUCache(DoublyLinkedList): | |
__slots__ = ('buckets', 'cache', 'size') | |
def __init__(self, size=100) -> None: | |
super(LFUCache, self).__init__() | |
self.size = size | |
self.cache = {} | |
self.buckets = {} # Bucket access by count | |
def update_count(self, node: Type[DLLNode], count: int=1) -> None: | |
"""Update item frequency and move item to correct bucket""" | |
old_bucket = self.buckets.get(node.count, None) | |
node.count += count | |
node_count = node.count | |
bucket = self.buckets.get(node_count, None) | |
if not bucket: | |
bucket = BucketNode(id=node_count) | |
if old_bucket: | |
self.insert_after(bucket, old_bucket) | |
else: | |
self.insert_after(bucket, self.head) | |
self.buckets[node_count] = bucket | |
bucket.items.append(node) | |
if old_bucket: | |
self.remove_empty_bucket(old_bucket) | |
def get(self, key: Any, default: Any=None) -> Any: | |
node = self.cache.get(key) | |
if node: | |
self.update_count(node) | |
return node.value | |
return default | |
__getitem__ = get | |
def remove_empty_bucket(self, bucket: Type[BucketNode]) -> None: | |
if bucket and bucket.items.is_empty(): | |
bucket.remove() | |
self.buckets.pop(bucket.id, None) | |
del bucket | |
def remove_least_fequency(self) -> None: | |
# First bucket is always min | |
if self.is_empty(): | |
return | |
min_bucket = self.head.next | |
if min_bucket: | |
if min_bucket.items.is_empty(): | |
self.remove_empty_bucket(min_bucket) | |
return | |
min_node = min_bucket.items.head.next | |
if min_node: | |
min_node.remove() | |
self.cache.pop(min_node.key, None) | |
del min_node | |
self.remove_empty_bucket(min_bucket) | |
def put(self, key: Any, value: Any) -> None: | |
node = self.cache.get(key) | |
if node: | |
node.value = value | |
else: | |
# Set count to 0, Update count will increase it | |
node = LFUNode(key=key, value=value, count=1) | |
if len(self.cache) == self.size: # Limit reached, remove LF item | |
self.remove_least_fequency() | |
self.cache[key] = node | |
# Only make the node more recent, not make it into new bucket | |
self.update_count(node, count=0) | |
__setitem__ = put | |
def visualize(self): | |
current_size = len(self.cache) | |
print(f'LFUCache: max {self.size} current {current_size}') | |
for bucket in self: | |
print(f'B[{bucket.id}]', [(n.key, n.value) for n in bucket.items]) | |
# Unit test | |
if __name__ == '__main__': | |
def test_dll(): | |
a = LFUNode(value=1) | |
b = LFUNode(value=2) | |
c = LFUNode(value=3) | |
d = LFUNode(value=4) | |
e = LFUNode(value=5) | |
dl = DoublyLinkedList() | |
assert dl.is_empty(), 'dl is not empty' | |
dl2 = DoublyLinkedList() | |
dl.append(a) | |
dl.append(b) | |
dl.append(c) | |
dl.append(d) | |
dl.append(e) | |
assert [n.value for n in dl] == [1,2,3,4,5], 'Content is wrong' | |
assert not dl.is_empty(), 'dl is empty' | |
assert dl.find(4).value == 4, 'Find is not working' | |
assert dl.find(10) is None, 'Find is not working' | |
for n in dl: | |
dl2.appendleft(n) | |
assert [n.value for n in dl2] == [5,4,3,2,1], 'Content is wrong' | |
assert dl.is_empty(), 'dl is not empty' | |
assert not dl2.is_empty(), 'dl2 is empty' | |
a.remove() | |
b.remove() | |
assert [n.value for n in dl2] == [5,4,3], 'Content is wrong' | |
dl.append(a) | |
dl.append(b) | |
a.next.remove() | |
a.next.remove() | |
a.next.remove() | |
assert [n.value for n in dl] == [1], 'Content is wrong' | |
dl.append(a) | |
dl.append(b) | |
dl.append(c) | |
dl.append(d) | |
dl.append(e) | |
assert [n.value for n in dl] == [1,2,3,4,5], 'Content is wrong' | |
dl.insert_after(a, e) | |
assert [n.value for n in dl] == [2,3,4,5,1], 'Content is wrong' | |
dl.insert_after(a, e) | |
assert [n.value for n in dl] == [2,3,4,5,1], 'Content is wrong' | |
dl.insert_after(c, e) | |
assert [n.value for n in dl] == [2,4,5,3,1], 'Content is wrong' | |
# Test LFU | |
def test_lfu(): | |
lfu = LFUCache(3) | |
lfu.put(1, 1) | |
lfu.put(2, 2) | |
lfu.put(3, 3) | |
lfu.visualize() | |
# LFUCache: max 3 current 3 | |
# B[1] [(1, 1), (2, 2), (3, 3) | |
lfu.put(4, 4) # Evic 1, 1 | |
lfu.visualize() | |
# LFUCache: max 3 current 3 | |
# B[1] [(2, 2), (3, 3), (4, 4)] | |
assert lfu[3] == 3, "Where is 3?" | |
assert lfu[1] is None, "Why 1 is still here" | |
lfu.visualize() | |
# LFUCache: max 3 current 3 | |
# B[1] [(2, 2), (4, 4)] | |
# B[2] [(3, 3)] | |
lfu[3] | |
lfu.visualize() | |
# LFUCache: max 3 current 3 | |
# B[1] [(2, 2), (4, 4)] | |
# B[3] [(3, 3)] | |
lfu[5] = 5 # Evic 2 | |
lfu.visualize() | |
# LFUCache: max 3 current 3 | |
# B[1] [(4, 4), (5, 5)] | |
# B[3] [(3, 3)] | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment