Skip to content

Instantly share code, notes, and snippets.

@Arachnid
Created December 6, 2008 17:50
Show Gist options
  • Save Arachnid/32927 to your computer and use it in GitHub Desktop.
Save Arachnid/32927 to your computer and use it in GitHub Desktop.
import logging
from google.appengine.api import memcache
from google.appengine.api import apiproxy_stub_map
from google.appengine.datastore import datastore_pb
"""Provides a shim that caches datastore Get calls.
Example code:
import datastore_get_cache
datastore_get_cache.DatastoreCachingShim.Install()
# ...
def main(args):
util.run_wsgi_app(application)
datastore_get_cache.DatastoreCachingShim.ResetCache()
By default, ResetCache() will update memcache counters in _DatastoreCachingShim_cache_hits and _DatastoreCachingShim_cache_misses to keep track of cache hits and misses.
"""
class APIProxyShim(object):
"""A generic shim class, with methods to install/uninstall it.
Subclasses of this class can be used to replace the real stub for a service,
intercepting and possibly passing on calls to the original stub.
"""
SERVICE_NAME = None # To be overridden by subclasses
_instance = None
def __init__(self, wrapped_stub):
"""Constructor. Internal use only - see Install()."""
self._wrapped_stub = wrapped_stub
def CallWrappedStub(self, call, request, response):
"""Allows subclasses to call the wrapped stub."""
self._wrapped_stub.MakeSyncCall(self.SERVICE_NAME, call, request, response)
def MakeSyncCall(self, service, call, request, response):
assert (service == self.SERVICE_NAME,
'Got service name "%s", expected "%s"'
% (service, self.SERVICE_NAME))
messages = []
assert request.IsInitialized(messages), messages
method = getattr(self, '_Dynamic_' + call, None)
if method:
method(request, response)
else:
self.CallWrappedStub(call, request, response)
assert response.IsInitialized(messages), messages
def __getattr__(self, name):
"""Pass-through to the wrapped stub."""
return getattr(self._wrapped_stub, name)
@classmethod
def Install(cls):
"""Installs the shim. Only needs to be run once at import time.
Note that this accesses internal members of APIProxyStubMap, so may break
in future.
"""
if not cls._instance:
wrapped_stub = apiproxy_stub_map.apiproxy.GetStub(cls.SERVICE_NAME)
assert wrapped_stub, "No service '%s' found to wrap." % cls.SERVICE_NAME
if isinstance(wrapped_stub, cls):
cls._instance = wrapped_stub
else:
cls._instance = cls(wrapped_stub)
stub_dict = apiproxy_stub_map.apiproxy._APIProxyStubMap__stub_map
stub_dict[cls.SERVICE_NAME] = cls._instance._wrapped_stub
@classmethod
def Uninstall(cls):
"""Uninstalls the shim.
Note that there's no need to uninstall a shim after each request. You can
install it once at import time and leave it there between requests.
"""
if cls._instance:
stub_dict = apiproxy_stub_map.apiproxy._APIProxyStubMap__stub_map
stub_dict[cls.SERVICE_NAME] = cls._instance
cls._instance = None
class DatastoreCachingShim(APIProxyShim):
SERVICE_NAME = 'datastore_v3'
def __init__(self, default_stub):
super(DatastoreCachingShim, self).__init__(default_stub)
self.entity_cache = {}
self.cache_hits = 0
self.cache_misses = 0
def _Dynamic_Get(self, request, response):
"""Intercepts get requests and returns them from cache if available."""
logging.info("Tx: %s, Keys: %s", request.has_transaction(), [str(x) for x in request.key_list()])
if request.has_transaction():
self.CallWrappedStub('Get', request, response)
return
new_request = datastore_pb.GetRequest()
new_response = datastore_pb.GetResponse()
keys = []
for key in request.key_list():
encoded_key = key.Encode()
keys.append(encoded_key)
if encoded_key not in self.entity_cache:
new_request.add_key().CopyFrom(key)
if new_request.key_size() > 0:
self.CallWrappedStub('Get', new_request, new_response)
entity_iter = iter(new_response.entity_list())
for encoded_key in keys:
if encoded_key in self.entity_cache:
entity = self.entity_cache[encoded_key]
if entity:
response.add_entity().mutable_entity().CopyFrom(entity)
else:
# Negative response (no entity).
response.add_entity().mutable_entity()
else:
entity = entity_iter.next()
if entity:
self.entity_cache[encoded_key] = entity.entity()
else:
self.entity_cache[encoded_key] = None
response.add_entity().CopyFrom(entity)
# Update counters
if new_request.key_size() == 0:
self.cache_hits += 1
else:
self.cache_misses += 1
def _Dynamic_Put(self, request, response):
"""Intercepts puts and adds them to the cache."""
self.CallWrappedStub('Put', request, response)
for entity in request.entity_list():
encoded_key = entity.key().Encode()
self.entity_cache[encoded_key] = entity
def _Dynamic_Delete(self, request, response):
"""Intercepts delets and adds negative entries to the cache."""
self.CallWrappedStub('Delete', request, response)
for key in request.key_list():
encoded_key = key.Encode()
self.entity_cache[encoded_key] = None
def _Dynamic_Next(self, request, response):
"""Intercepts query results and caches the returned entities."""
self.CallWrappedStub('Next', request, response)
for entity in response.result_list():
encoded_key = entity.key().Encode()
self.entity_cache[encoded_key] = entity
@classmethod
def ResetCache(cls, update_stats=True):
self = cls._instance
if self.cache_hits + self.cache_misses > 0:
if update_stats:
update_list = {
'_DatastoreCachingShim_cache_hits': self.cache_hits,
'_DatastoreCachingShim_cache_misses': self.cache_misses,
}
memcache.add_multi(dict((x, 0) for x in update_list))
for k, v in update_list.items():
memcache.incr(k, v)
hit_rate = self.cache_hits / float(self.cache_hits + self.cache_misses)
logging.info("Datastore cache had %d hits, %d misses (%d%%)"
% (self.cache_hits, self.cache_misses, 100.0*hit_rate))
self.cache_hits = 0
self.cache_misses = 0
self.entity_cache = {}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment