Skip to content

Instantly share code, notes, and snippets.

@jdthorpe
Last active May 13, 2022 07:47
Show Gist options
  • Save jdthorpe/8f6487e0673d7c5ed57589da55dde148 to your computer and use it in GitHub Desktop.
Save jdthorpe/8f6487e0673d7c5ed57589da55dde148 to your computer and use it in GitHub Desktop.
LRU cache in python

Motivation

Least Recently Used Caching can fail when usage frequency is correlated with object size. For example an address database sharded by City and State would likely have increased activity in the largest shards. The cache may initially contains a random subset of cities (e.g. due to an automated process that runs through cities alphabetically) but can eventually become populated by just the largetst shards due to organic traffic (i.e. user based web traffic).

This code is inspired by this gist and this SO post

Strategy

Each object is stored with an estimated budget, which is calculated / estimated when the object is initially cached. This could be accomplished with this wrapper:

@lru_cache()
def get_shard(*args,**kwargs):
    shard = build_shard(*args,**kwargs)
    # return the object and it's budget allocation 
    return shard, shard.size

# now use get_shard in place of build_shard

Like the reference implementation the least recently used object is dropped from cache when caching a new object when the full condition is satisfied.

Note that the full condition may be specified by:

  • The number of objects cached @lru_cache(maxsize=128)

  • The total budget of the objects cached @lru_cache(maxbudget=100)

  • An arbitrary metric:

    # drop the last item from cache when the available memory falls below 1GB
    @lru_cache(
        get_available_resources = lambda: psutil.virtual_memory().available,
        min_available_resources=2^30)
    def f():
        ...

However, when resources are exhausted, the reference implementation only allows for dumping the entire cache (get_shard.cache_clear()). This implementation also allows for clearing a budget's worth of cache with get_shard.pop(budget_amount_to_drop). This can be especially useful when the budget can be estimated from parameters, as in:

try:
    shard = get_shard(*args,**kwargs)
except MemoryError:
    get_shard.pop(get_shard_size(*args,**kwargs))
    shard = get_shard(*args,**kwargs)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Memory-aware LRU Cache function decorator
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
A modification of the builtin ``functools.lru_cache`` decorator that takes an
additional keyword argument, ``max_resource_usage``. The cache is considered full
if there are fewer than ``max_resource_usage`` bytes of memory available.
If ``max_resource_usage`` is set, then ``maxsize`` has no effect.
Uses the ``psutil`` module to get the available memory.
"""
import psutil
import warnings
from functools import RLock, update_wrapper, namedtuple
_CacheInfo = namedtuple(
"CacheInfo", ["hits", "misses", "maxsize", "currsize", "currbudget"]
)
class _HashedSeq(list):
"""This class guarantees that hash() will be called no more than once
per element. This is important because the lru_cache() will hash
the key multiple times on a cache miss.
"""
__slots__ = "hashvalue"
def __init__(self, tup, hash=hash):
self[:] = tup
self.hashvalue = hash(tup)
def __hash__(self):
return self.hashvalue
def _make_key(
args,
kwds,
typed,
kwd_mark=(object(),),
fasttypes={int, str, frozenset, type(None)},
sorted=sorted,
tuple=tuple,
type=type,
len=len,
):
"""Make a cache key from optionally typed positional and keyword arguments
The key is constructed in a way that is flat as possible rather than
as a nested structure that would take more memory.
If there is only a single argument and its data type is known to cache
its hash value, then that argument is returned without a wrapper. This
saves space and improves lookup speed.
"""
key = args
if kwds:
sorted_items = sorted(kwds.items())
key += kwd_mark
for item in sorted_items:
key += item
if typed:
key += tuple(type(v) for v in args)
if kwds:
key += tuple(type(v) for k, v in sorted_items)
elif len(key) == 1 and type(key[0]) in fasttypes:
return key[0]
return _HashedSeq(key)
_get_available_resources = lambda: psutil.virtual_memory().available
def lru_cache(
maxsize=128,
maxbudget=None,
typed=False,
get_available_resources=_get_available_resources,
min_available_resources=False,
):
"""Budteg and memory aware Least-recently-used cache decorator.
Motivation: when caching objects of various sizes, the droping the oldest
(possibly tiny) asset when caching a new (much larger) asset can still
explode resource usage. This implementation allows for dropping least
recently used items based on their (budgeted) resource usage, in additon to
*min_available_resources* is an integer representing the amount of resources (bytes of memory)
that must be available on the system in order for a value to be cached. If
it is set, *maxsize* has no effect.
If *maxsize* is set to None, the LRU features are disabled and the cache
can grow without bound.
If *typed* is True, arguments of different types will be cached separately.
For example, f(3.0) and f(3) will be treated as distinct calls with
distinct results.
Arguments to the cached function must be hashable.
View the cache statistics named tuple (hits, misses, maxsize, currsize, currbudget)
with f.cache_info(). Clear the cache and statistics with f.cache_clear().
Access the underlying function with f.__wrapped__.
See: http://en.wikipedia.org/wiki/Cache_algorithms#Least_Recently_Used
Example usage:
@lru_cache(max)
def build_asset(x, b):
return x, b
budget=some_calculaions(*parameters)
try:
# try to build the asset using the
build_asset(*parametes,budget)
except MemoryError:
# release `budget` worth of assets from the cache
build_asset.pop(budget)
build_asset(*parametes,budget)
"""
# Users should only access the lru_cache through its public API:
# cache_info, cache_clear, and f.__wrapped__
# The internals of the lru_cache are encapsulated for thread safety and
# to allow the implementation to change (including a possible C version).
# Constants shared by all lru cache instances:
sentinel = object() # unique object used to signal cache misses
make_key = _make_key # build a key from the function arguments
PREV, NEXT, KEY, RESULT, BUDGET = 0, 1, 2, 3, 4 # names for the link fields
if maxbudget:
if get_available_resources is not _get_available_resources:
warnings.warn(
"min_available_resources is ignored when maxbudget is specified",
RuntimeWarning,
)
min_available_resources = 1
get_available_resources = lambda: maxbudget - budget
maxsize = None
elif min_available_resources:
# only relent for the reporting function (cache_info)
maxsize = None
budget = 0
cache = {}
hits = misses = 0
full = False
cache_get = cache.get # bound method to lookup a key or return None
lock = RLock() # because linkedlist updates aren't threadsafe
root = [] # root of the circular doubly linked list
root[:] = [root, root, None, None, None] # initialize by pointing to self
def decorating_function(user_function):
if min_available_resources:
def wrapper(*args, **kwds):
# Size limited caching that tracks accesses by recency
nonlocal root, hits, misses, full, budget
key = make_key(args, kwds, typed)
with lock:
link = cache_get(key)
if link is not None:
# Move the link to the front of the circular queue
link_prev, link_next, _, result, _ = link
link_prev[NEXT] = link_next
link_next[PREV] = link_prev
last = root[PREV]
last[NEXT] = root[PREV] = link
link[PREV] = last
link[NEXT] = root
hits += 1
return result
result, newbudget = user_function(*args, **kwds)
with lock:
if key in cache:
# Getting here means that this same key was added to the
# cache while the lock was released. Since the link
# update is already done, we need only return the
# computed result and update the count of misses.
pass
elif full:
# Use the old root to store the new key and result.
oldroot = root
oldroot[KEY] = key
oldroot[RESULT] = result
oldroot[BUDGET] = newbudget
# Empty the oldest link and make it the new root.
# Keep a reference to the old key and old result to
# prevent their ref counts from going to zero during the
# update. That will prevent potentially arbitrary object
# clean-up code (i.e. __del__) from running while we're
# still adjusting the links.
root = oldroot[NEXT]
oldkey = root[KEY]
oldresult = root[RESULT]
oldbudget = root[BUDGET]
root[KEY] = root[RESULT] = root[BUDGET] = None
# Now update the cache dictionary.
del cache[oldkey]
# Save the potentially reentrant cache[key] assignment
# for last, after the root and links have been put in
# a consistent state.
cache[key] = oldroot
budget += newbudget
budget -= oldbudget
else:
# Put result in a new link at the front of the queue.
last = root[PREV]
link = [last, root, key, result, newbudget]
last[NEXT] = root[PREV] = cache[key] = link
budget += newbudget
full = get_available_resources() < min_available_resources
misses += 1
return result
elif maxsize == 0:
def wrapper(*args, **kwds):
# No caching -- just a statistics update after a successful call
nonlocal misses
result, newbudget = user_function(*args, **kwds)
misses += 1
return result
elif maxsize is None:
def wrapper(*args, **kwds):
# Simple caching without ordering or size limit
nonlocal hits, misses, budget
key = make_key(args, kwds, typed)
result = cache_get(key, sentinel)
if result is not sentinel:
hits += 1
return result
result, newbudget = user_function(*args, **kwds)
budget += newbudget
cache[key] = result
misses += 1
return result
else:
def wrapper(*args, **kwds):
# Size limited caching that tracks accesses by recency
nonlocal root, hits, misses, full, budget
key = make_key(args, kwds, typed)
with lock:
link = cache_get(key)
if link is not None:
# Move the link to the front of the circular queue
link_prev, link_next, _key, result, oldbudget = link
link_prev[NEXT] = link_next
link_next[PREV] = link_prev
last = root[PREV]
last[NEXT] = root[PREV] = link
link[PREV] = last
link[NEXT] = root
hits += 1
return result
result, newbudget = user_function(*args, **kwds)
with lock:
if key in cache:
# Getting here means that this same key was added to the
# cache while the lock was released. Since the link
# update is already done, we need only return the
# computed result and update the count of misses.
pass
elif full:
# Use the old root to store the new key and result.
oldroot = root
oldroot[KEY] = key
oldroot[RESULT] = result
oldroot[BUDGET] = newbudget
# Empty the oldest link and make it the new root.
# Keep a reference to the old key and old result to
# prevent their ref counts from going to zero during the
# update. That will prevent potentially arbitrary object
# clean-up code (i.e. __del__) from running while we're
# still adjusting the links.
root = oldroot[NEXT]
oldkey = root[KEY]
oldresult = root[RESULT]
oldbudget = root[BUDGET]
root[KEY] = root[RESULT] = root[BUDGET] = None
# Now update the cache dictionary.
del cache[oldkey]
# Save the potentially reentrant cache[key] assignment
# for last, after the root and links have been put in
# a consistent state.
cache[key] = oldroot
budget += newbudget
budget -= oldbudget
else:
# Put result in a new link at the front of the queue.
last = root[PREV]
link = [last, root, key, result, newbudget]
last[NEXT] = root[PREV] = cache[key] = link
budget += newbudget
full = len(cache) >= maxsize
misses += 1
return result
def budgetitems():
"""list the items from least to most recent"""
out = []
with lock:
link = root[NEXT]
while link is not root:
out.append((link[RESULT], link[BUDGET]))
link = link[NEXT]
return out
def current_budget():
"""list the current_budget from least to most recent"""
out = 0
with lock:
link = root[NEXT]
while link is not root:
out += link[BUDGET]
link = link[NEXT]
return out
def pop(amount=None):
"""drop an items from the cache up to the `amount` amount"""
if root[NEXT] is root:
return 0
with lock:
if amount is None:
link = root[NEXT]
if link is not root:
link_prev, link_next, _key, result, budget = link
link_prev[NEXT] = link_next
link_next[PREV] = link_prev
del cache[_key]
return budget
return 0
else:
link = root[NEXT]
released = 0
while released < amount and link is not root:
link_prev, link_next, _key, result, budget = link
released += budget
link_prev[NEXT] = link_next
link_next[PREV] = link_prev
link = link[NEXT]
del cache[_key]
return released
def cache_info():
"""Report cache statistics"""
return _CacheInfo(hits, misses, maxsize, len(cache), budget)
def cache_clear():
"""Clear the cache and cache statistics"""
nonlocal hits, misses, full, budget
with lock:
cache.clear()
root[:] = [root, root, None, None, None]
hits = misses = 0
full = False
budget = 0
wrapper.budgetitems = budgetitems
wrapper.pop = pop
wrapper.current_budget = current_budget
wrapper.cache_info = cache_info
wrapper.cache_clear = cache_clear
return update_wrapper(wrapper, user_function)
return decorating_function
if __name__ == "__main__":
def _test(cached):
for i, l in enumerate("abcdefghijklmnopqrstuvwxyz"):
cached(l, i)
assert cached.cache_info().currbudget == cached.current_budget()
# for i, l in enumerate("abcdefgh"): cached(l, i)
cached("m", 12)
info = cached.cache_info()
assert info.currbudget == (25) * 13
info = cached.cache_info()
def test(cached):
_test(cached)
cached.cache_clear()
_test(cached)
@lru_cache()
def f1(x, b):
return x, b
@lru_cache(maxsize=26)
def f2(x, b):
return x, b
@lru_cache(min_available_resources=10) # requre at least 10 bites available
def f3(x, b):
return x, b
test(f1)
test(f2)
test(f3)
@lru_cache(maxsize=15) # requre at least 10 bites available
def f4(x, b):
return x, b
@lru_cache(maxbudget=100) # requre at least 10 bites available
def f5(x, b):
return x, b
for i, l in enumerate("abcdefghijklmnopqrstuvwxyz"):
f4(l, i)
f5(l, i)
assert f4.cache_info().currbudget == f5.cache_info().currbudget
# print(f4.cache_info())
# print(f5.cache_info())
# print(dict(f4.budgetitems()))
f5.pop(f5.cache_info().currbudget - 100)
# print(dict(f4.budgetitems()))
# print(f6.cache_info())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment