Skip to content

Instantly share code, notes, and snippets.

@yatt
Last active June 29, 2017 05:36
Show Gist options
  • Save yatt/57952146e04c8fdb8b811d44b958cecc to your computer and use it in GitHub Desktop.
Save yatt/57952146e04c8fdb8b811d44b958cecc to your computer and use it in GitHub Desktop.
persist memoize decorator with timeout
#! /bin/python2.7
# coding: utf-8
"""
persistent cache with timeout using pickle
restore cache from disk when decorator is used and
store cache to disk before exit program.
usage sample
@memoize_with_timeout('/cache/my_function.bin', timeout_sec=10)
def my_function(n, callback):
ret = something(n)
return callback(ret)
"""
import logging
import os
import pickle
import time
import types
import atexit
logger = logging.getLogger(__name__)
def serialize(obj):
"""
serialize object used in cache key
"""
if isinstance(obj, types.FunctionType):
# function -> function name string
return obj.func_name
else:
return obj
def create_cache_key(args, kwargs):
"""
create cache key ( convert list/dict to tuple, function to string )
"""
targs = tuple(serialize(arg) for arg in args)
tkwargs = tuple((serialize(key), serialize(kwargs[key])) for key in kwargs)
return targs, tkwargs
def store_cache(path, cache):
"""
store cache to storage.
"""
try:
with open(path, 'wb') as ofs:
pickle.dump(cache, ofs)
logger.debug(u'store to "%s"', path)
except Exception, ex:
logger.exception(u'error on storing to "%s"', path, exc_info=ex)
def memoize_with_timeout(path, timeout_sec=0):
"""
embed persistent cache mechanism with timeout to function
"""
def composer(func):
"""
compose function and cache mechanism
"""
cache = {}
# restore cache from disk
try:
if os.path.exists(path):
with open(path) as ifs:
cache = pickle.load(ifs)
logger.debug(u'restore from "%s"', path)
else:
logger.debug(u'cache file "%s" does not exists', path)
except Exception, ex:
logger.exception(u'error on restoring from "%s"', path, exc_info=ex)
raise ex
def is_expired_entry(cache_entry):
"""is expired cache entry?"""
elapsed_sec = time.time() - cache_entry[1]
logger.debug("elapsed %s sec.", elapsed_sec)
return elapsed_sec > timeout_sec
def garbage_collect():
"""delete expired cache entry"""
del_key_list = []
for key in cache:
if is_expired_entry(cache[key]):
del_key_list.append(key)
for key in del_key_list:
del cache[key]
def finalize():
"""gc and store cache to disk"""
logger.debug('finalize for cache "%s", size=%s', func.func_name, len(cache))
garbage_collect()
store_cache(path, cache)
atexit.register(finalize)
def cache_mechanism(*args, **kwargs):
"""
cache mechanism
"""
key = create_cache_key(args, kwargs)
logger.debug('lookup cache "%s" by key = %s', func.func_name, key)
if key not in cache:
logger.debug('=> entry not found.')
cache[key] = func(*args, **kwargs), time.time()
elif is_expired_entry(cache[key]):
logger.debug('=> entry found but expired.')
cache[key] = func(*args, **kwargs), time.time()
else:
logger.debug('=> entry found.')
value, timestamp = cache[key]
return value
cache_mechanism.func_name = func.func_name
return cache_mechanism
return composer
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment