Created
December 6, 2008 17:50
-
-
Save Arachnid/32927 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from google.appengine.api import memcache | |
from google.appengine.api import apiproxy_stub_map | |
from google.appengine.datastore import datastore_pb | |
"""Provides a shim that caches datastore Get calls. | |
Example code: | |
import datastore_get_cache | |
datastore_get_cache.DatastoreCachingShim.Install() | |
# ... | |
def main(args): | |
util.run_wsgi_app(application) | |
datastore_get_cache.DatastoreCachingShim.ResetCache() | |
By default, ResetCache() will update memcache counters in _DatastoreCachingShim_cache_hits and _DatastoreCachingShim_cache_misses to keep track of cache hits and misses. | |
""" | |
class APIProxyShim(object): | |
"""A generic shim class, with methods to install/uninstall it. | |
Subclasses of this class can be used to replace the real stub for a service, | |
intercepting and possibly passing on calls to the original stub. | |
""" | |
SERVICE_NAME = None # To be overridden by subclasses | |
_instance = None | |
def __init__(self, wrapped_stub): | |
"""Constructor. Internal use only - see Install().""" | |
self._wrapped_stub = wrapped_stub | |
def CallWrappedStub(self, call, request, response): | |
"""Allows subclasses to call the wrapped stub.""" | |
self._wrapped_stub.MakeSyncCall(self.SERVICE_NAME, call, request, response) | |
def MakeSyncCall(self, service, call, request, response): | |
assert (service == self.SERVICE_NAME, | |
'Got service name "%s", expected "%s"' | |
% (service, self.SERVICE_NAME)) | |
messages = [] | |
assert request.IsInitialized(messages), messages | |
method = getattr(self, '_Dynamic_' + call, None) | |
if method: | |
method(request, response) | |
else: | |
self.CallWrappedStub(call, request, response) | |
assert response.IsInitialized(messages), messages | |
def __getattr__(self, name): | |
"""Pass-through to the wrapped stub.""" | |
return getattr(self._wrapped_stub, name) | |
@classmethod | |
def Install(cls): | |
"""Installs the shim. Only needs to be run once at import time. | |
Note that this accesses internal members of APIProxyStubMap, so may break | |
in future. | |
""" | |
if not cls._instance: | |
wrapped_stub = apiproxy_stub_map.apiproxy.GetStub(cls.SERVICE_NAME) | |
assert wrapped_stub, "No service '%s' found to wrap." % cls.SERVICE_NAME | |
if isinstance(wrapped_stub, cls): | |
cls._instance = wrapped_stub | |
else: | |
cls._instance = cls(wrapped_stub) | |
stub_dict = apiproxy_stub_map.apiproxy._APIProxyStubMap__stub_map | |
stub_dict[cls.SERVICE_NAME] = cls._instance._wrapped_stub | |
@classmethod | |
def Uninstall(cls): | |
"""Uninstalls the shim. | |
Note that there's no need to uninstall a shim after each request. You can | |
install it once at import time and leave it there between requests. | |
""" | |
if cls._instance: | |
stub_dict = apiproxy_stub_map.apiproxy._APIProxyStubMap__stub_map | |
stub_dict[cls.SERVICE_NAME] = cls._instance | |
cls._instance = None | |
class DatastoreCachingShim(APIProxyShim): | |
SERVICE_NAME = 'datastore_v3' | |
def __init__(self, default_stub): | |
super(DatastoreCachingShim, self).__init__(default_stub) | |
self.entity_cache = {} | |
self.cache_hits = 0 | |
self.cache_misses = 0 | |
def _Dynamic_Get(self, request, response): | |
"""Intercepts get requests and returns them from cache if available.""" | |
logging.info("Tx: %s, Keys: %s", request.has_transaction(), [str(x) for x in request.key_list()]) | |
if request.has_transaction(): | |
self.CallWrappedStub('Get', request, response) | |
return | |
new_request = datastore_pb.GetRequest() | |
new_response = datastore_pb.GetResponse() | |
keys = [] | |
for key in request.key_list(): | |
encoded_key = key.Encode() | |
keys.append(encoded_key) | |
if encoded_key not in self.entity_cache: | |
new_request.add_key().CopyFrom(key) | |
if new_request.key_size() > 0: | |
self.CallWrappedStub('Get', new_request, new_response) | |
entity_iter = iter(new_response.entity_list()) | |
for encoded_key in keys: | |
if encoded_key in self.entity_cache: | |
entity = self.entity_cache[encoded_key] | |
if entity: | |
response.add_entity().mutable_entity().CopyFrom(entity) | |
else: | |
# Negative response (no entity). | |
response.add_entity().mutable_entity() | |
else: | |
entity = entity_iter.next() | |
if entity: | |
self.entity_cache[encoded_key] = entity.entity() | |
else: | |
self.entity_cache[encoded_key] = None | |
response.add_entity().CopyFrom(entity) | |
# Update counters | |
if new_request.key_size() == 0: | |
self.cache_hits += 1 | |
else: | |
self.cache_misses += 1 | |
def _Dynamic_Put(self, request, response): | |
"""Intercepts puts and adds them to the cache.""" | |
self.CallWrappedStub('Put', request, response) | |
for entity in request.entity_list(): | |
encoded_key = entity.key().Encode() | |
self.entity_cache[encoded_key] = entity | |
def _Dynamic_Delete(self, request, response): | |
"""Intercepts delets and adds negative entries to the cache.""" | |
self.CallWrappedStub('Delete', request, response) | |
for key in request.key_list(): | |
encoded_key = key.Encode() | |
self.entity_cache[encoded_key] = None | |
def _Dynamic_Next(self, request, response): | |
"""Intercepts query results and caches the returned entities.""" | |
self.CallWrappedStub('Next', request, response) | |
for entity in response.result_list(): | |
encoded_key = entity.key().Encode() | |
self.entity_cache[encoded_key] = entity | |
@classmethod | |
def ResetCache(cls, update_stats=True): | |
self = cls._instance | |
if self.cache_hits + self.cache_misses > 0: | |
if update_stats: | |
update_list = { | |
'_DatastoreCachingShim_cache_hits': self.cache_hits, | |
'_DatastoreCachingShim_cache_misses': self.cache_misses, | |
} | |
memcache.add_multi(dict((x, 0) for x in update_list)) | |
for k, v in update_list.items(): | |
memcache.incr(k, v) | |
hit_rate = self.cache_hits / float(self.cache_hits + self.cache_misses) | |
logging.info("Datastore cache had %d hits, %d misses (%d%%)" | |
% (self.cache_hits, self.cache_misses, 100.0*hit_rate)) | |
self.cache_hits = 0 | |
self.cache_misses = 0 | |
self.entity_cache = {} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment