Last active
August 3, 2018 12:28
-
-
Save MattFaus/bd8c0a1a4fb3d7cc3fe3 to your computer and use it in GitHub Desktop.
All of the code necessary to implement and test protobuf projection in a Google Appengine web application.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import db_util | |
db_util.enable_db_protobuf_projection() | |
db_util.enable_ndb_protobuf_projection() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This is really a list, but we don't have a ThreadLocalList class. | |
current_protobuf_projection = thread_util.ThreadLocalDict() | |
@contextlib.contextmanager | |
def protobuf_projection(cls, property_names): | |
# Update the thread-local with the current projected properties | |
current_protobuf_projection.update(dict.fromkeys(property_names)) | |
# Monkey-patch this class's init to keep track of the projected property | |
# names. This monkey-patch only is reverted before leaving the context. | |
# Note this is only for db.Model classes. ndb.Model classes have a | |
# different approach to init, and we have to set this attribute in the | |
# _projected_pb_to_entity function, below. | |
if issubclass(cls, db.Model): | |
orig_init = cls.__init__ | |
def new_init(self, *args, **kwargs): | |
self._protobuf_projection_properties = property_names | |
return orig_init(self, *args, **kwargs) | |
cls.__init__ = new_init | |
try: | |
yield | |
finally: | |
if issubclass(cls, db.Model): | |
cls.__init__ = orig_init | |
current_protobuf_projection.clear() | |
def _create_projected_pb_to_entity(orig_pb_to_entity): | |
"""Removes properties from protobufs for faster queries. | |
Retrieving data from the GAE datastore happens in two stages: | |
1. Issuing an RPC to the datastore API, which returns a message in | |
protobuf format. | |
2. Translating the protobuf message into an instance of a db.Model | |
or a ndb.Model class. | |
The second stage is considerably slower than the first stage, primarily due | |
to a lot of validation that makes sure the raw data returned matches the | |
schema defined in the *.Model classes. This validation happens for *all* | |
of the properties defined by the *.Model class, regardless if you actually | |
care to read any of those properties, later. | |
So, to speed up the second stage, we remove the unnecessary properties from | |
the protobuf before entering the second stage. This is much like the | |
scenario where a new property is added to a Model. Entities that have not | |
been written since the new property was introduced will return a protobuf | |
that excludes the newly introduced property. | |
NOTE: Reading an attribute excluded from the projection from an entity | |
created during protobuf projection will raise an AttributeError. | |
NOTE: Trying to put() an entity created during protobuf projection will | |
raise an IOError. | |
""" | |
def _projected_pb_to_entity(self, pb): | |
if current_protobuf_projection: | |
def del_undesired_properties(lst): | |
for i in xrange(len(lst) - 1, -1, -1): | |
if lst[i].name() not in current_protobuf_projection.keys(): | |
del lst[i] | |
if pb.property_size() > 0: | |
del_undesired_properties(pb.property_list()) | |
if pb.raw_property_size() > 0: | |
del_undesired_properties(pb.raw_property_list()) | |
entity = orig_pb_to_entity(self, pb) | |
# Keep track of which properties were projected when this instance | |
# was created. For db.Model instances, we use a monkey-patched init | |
# to keep track of these properties, which is done in | |
# protobuf_projection(), above. | |
if current_protobuf_projection and isinstance(entity, ndb.Model): | |
entity._protobuf_projection_properties = ( | |
current_protobuf_projection.keys()) | |
return entity | |
return _projected_pb_to_entity | |
_orig_db_pb_to_entity = datastore.DatastoreAdapter.pb_to_entity | |
def _protobuf_projection_mock_getattribute(cls): | |
"""Prevent access to properties excluded from a projection query. | |
Monkey-patch this class's getattribute to raise an exception when | |
trying to access an unavailable property on an instance that was created | |
within a protobuf_projection context. | |
This monkey-patch for this function is always in effect, but it only | |
applies to objects that were created within the protobuf_projection | |
context. Those are the only objects that will have a | |
_protobuf_projection_properties attribute available. The code inside | |
this function must execute extremely fast, because it is called A LOT. | |
""" | |
orig_get_attribute = cls.__getattribute__ | |
projected_attribute_error_msg = ("Property unavailable due to protobuf " | |
"projection.") | |
projected_attribute_error = AttributeError(projected_attribute_error_msg) | |
def new_get_attribute(self, name): | |
try: | |
# Don't call hasattr(), since that'd be infinite recursion | |
projected_properties = super(cls, self).__getattribute__( | |
'_protobuf_projection_properties') | |
all_properties = super(cls, self).__getattribute__( | |
'_properties').keys() | |
if name in all_properties and name not in projected_properties: | |
raise projected_attribute_error | |
except AttributeError, ae: | |
if ae.message == projected_attribute_error_msg: | |
raise | |
else: | |
# This instances wasn't fetched during protobuf projection, | |
# since _protobuf_projection_properties is unavailable | |
pass | |
return orig_get_attribute(self, name) | |
cls.__getattribute__ = new_get_attribute | |
_protobuf_projection_disallowed_put_exception = IOError( | |
'Write disallowed due to protobuf projection.') | |
def _protobuf_projection_disallow_ndb_put(): | |
# Monkey-patch the various put() functions to raise an exception when | |
# trying to put an entity that was created within a protobuf_projection | |
# context. | |
def do_not_allow_puts(self, orig_func, *args, **kwargs): | |
if hasattr(self, '_protobuf_projection_properties'): | |
raise _protobuf_projection_disallowed_put_exception | |
return orig_func(self, *args, **kwargs) | |
orig_put = ndb.Model.put | |
ndb.Model.put = lambda self, *args, **kwargs: do_not_allow_puts(self, | |
orig_put, *args, **kwargs) | |
orig_put_async = ndb.Model.put_async | |
ndb.Model.put_async = lambda self, *args, **kwargs: do_not_allow_puts(self, | |
orig_put_async, *args, **kwargs) | |
def _protobuf_projection_disallow_db_put(): | |
orig_db_put_async = db.put_async | |
def new_db_put_async(models, **kwargs): | |
if not isinstance(models, (list, tuple)): | |
models = (models,) | |
if any([hasattr(m, '_protobuf_projection_properties') | |
for m in models]): | |
raise _protobuf_projection_disallowed_put_exception | |
return orig_db_put_async(models, **kwargs) | |
db.put_async = new_db_put_async | |
orig_model_put = db.Model.put | |
def new_db_model_put(self, *args, **kwargs): | |
if hasattr(self, '_protobuf_projection_properties'): | |
raise _protobuf_projection_disallowed_put_exception | |
return orig_model_put(self, *args, **kwargs) | |
db.Model.put = new_db_model_put | |
def enable_db_protobuf_projection(): | |
_protobuf_projection_mock_getattribute(db.Model) | |
_protobuf_projection_disallow_db_put() | |
datastore.DatastoreAdapter.pb_to_entity = _create_projected_pb_to_entity( | |
_orig_db_pb_to_entity) | |
_orig_ndb_pb_to_entity = ndb_model.ModelAdapter.pb_to_entity | |
def enable_ndb_protobuf_projection(): | |
_protobuf_projection_mock_getattribute(ndb.Model) | |
_protobuf_projection_disallow_ndb_put() | |
ndb_model.ModelAdapter.pb_to_entity = _create_projected_pb_to_entity( | |
_orig_ndb_pb_to_entity) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ProjectedProtobufQueryTest(gae_model.GAEModelTestCase): | |
def setUp(self): | |
super(ProjectedProtobufQueryTest, self).setUp() | |
db_util.enable_db_protobuf_projection() | |
db_util.enable_ndb_protobuf_projection() | |
@db_util.disable_ndb_memcache | |
class TestNDBModel(ndb.Model): | |
prop_a = ndb.TextProperty(indexed=False) | |
prop_b = ndb.IntegerProperty(indexed=True, default=-1) | |
self.test_ndb_class = TestNDBModel | |
class TestDBModel(db.Model): | |
prop_a = db.TextProperty(indexed=False) | |
prop_b = db.IntegerProperty(indexed=True, default=-1) | |
self.test_db_class = TestDBModel | |
def test_ndb_simple(self): | |
obj = self.test_ndb_class(prop_a="Hello world", prop_b=3) | |
key = obj.put() | |
# Get by key | |
with db_util.protobuf_projection(self.test_ndb_class, ['prop_a']): | |
obj = key.get(use_cache=False) | |
self.assertEqual(obj.prop_a, "Hello world") | |
# Get a different projection by key | |
with db_util.protobuf_projection(self.test_ndb_class, ['prop_b']): | |
obj = key.get(use_cache=False) | |
self.assertEqual(obj.prop_b, 3) | |
# Get a different projection by key | |
with db_util.protobuf_projection(self.test_ndb_class, | |
['prop_a', 'prop_b']): | |
obj = key.get(use_cache=False) | |
self.assertEqual(obj.prop_a, "Hello world") | |
self.assertEqual(obj.prop_b, 3) | |
# Get by query | |
with db_util.protobuf_projection(self.test_ndb_class, ['prop_b']): | |
obj = self.test_ndb_class.query().filter( | |
self.test_ndb_class.prop_b == 3).get(use_cache=False) | |
self.assertEqual(obj.prop_b, 3) | |
def test_ndb_put_disallowed(self): | |
obj = self.test_ndb_class(prop_a="Hello world", prop_b=3) | |
key = obj.put() | |
def call_put(): | |
obj.put() | |
def call_put_async(): | |
obj.put_async() | |
def call_global_put(): | |
ndb.put_multi([obj]) | |
with db_util.protobuf_projection(self.test_ndb_class, ['prop_a']): | |
obj = key.get(use_cache=False) | |
# Calling put() on an instance created during protobuf projection | |
# should always raise an IOError | |
self.assertRaises(IOError, call_put) | |
self.assertRaises(IOError, call_put_async) | |
self.assertRaises(IOError, call_global_put) | |
# But, calling put on an instance created by a regular datastore | |
# read should put() correctly. | |
obj = key.get(use_cache=False) | |
try: | |
call_put() | |
call_put_async() | |
call_global_put() | |
except IOError: | |
self.fail('IOError raised incorrectly.') | |
def test_ndb_excluded_attribute_reading_disallowed(self): | |
obj = self.test_ndb_class(prop_a="Hello world", prop_b=3) | |
key = obj.put() | |
# Reading an excluded property from an instance created during | |
# projection should always raise an AttributeError | |
with db_util.protobuf_projection(self.test_ndb_class, ['prop_a']): | |
obj = key.get(use_cache=False) | |
self.assertRaises(AttributeError, lambda: obj.prop_b) | |
self.assertRaises(AttributeError, lambda: obj.prop_b) | |
# But reading an excluded property should be fine during regular | |
# datastore reads. | |
obj = key.get(use_cache=False) | |
try: | |
obj.prop_b | |
except AttributeError: | |
self.fail('AttributeError raised incorrectly.') | |
def test_db_simple(self): | |
obj = self.test_db_class(prop_a="Hello world", prop_b=3) | |
key = obj.put() | |
# Get by key | |
with db_util.protobuf_projection(self.test_db_class, ['prop_a']): | |
obj = db.get(key) | |
self.assertEqual(obj.prop_a, "Hello world") | |
# Get a different projection by key | |
with db_util.protobuf_projection(self.test_db_class, ['prop_b']): | |
obj = db.get(key) | |
self.assertEqual(obj.prop_b, 3) | |
# Get a different projection by key | |
with db_util.protobuf_projection(self.test_db_class, | |
['prop_a', 'prop_b']): | |
obj = db.get(key) | |
self.assertEqual(obj.prop_a, "Hello world") | |
self.assertEqual(obj.prop_b, 3) | |
# Get by query | |
with db_util.protobuf_projection(self.test_db_class, ['prop_b']): | |
obj = self.test_db_class.all().filter( | |
'prop_b =', 3).get() | |
self.assertEqual(obj.prop_b, 3) | |
def test_db_put_disallowed(self): | |
obj = self.test_db_class(prop_a="Hello world", prop_b=3) | |
key = obj.put() | |
def call_put(): | |
obj.put() | |
def call_global_put(): | |
db.put(obj) | |
db.put([obj]) | |
with db_util.protobuf_projection(self.test_db_class, ['prop_a']): | |
obj = db.get(key) | |
# Calling put() on an instance created during protobuf projection | |
# should always raise an IOError | |
self.assertRaises(IOError, call_put) | |
self.assertRaises(IOError, call_global_put) | |
# But, calling put on an instance created by a regular datastore | |
# read should put() correctly. | |
obj = db.get(key) | |
try: | |
call_put() | |
call_global_put() | |
except IOError: | |
self.fail('IOError raised incorrectly.') | |
def test_db_excluded_attribute_reading_disallowed(self): | |
obj = self.test_db_class(prop_a="Hello world", prop_b=3) | |
key = obj.put() | |
# Reading an excluded property from an instance created during | |
# projection should always raise an AttributeError | |
with db_util.protobuf_projection(self.test_db_class, ['prop_a']): | |
obj = db.get(key) | |
self.assertRaises(AttributeError, lambda: obj.prop_b) | |
self.assertRaises(AttributeError, lambda: obj.prop_b) | |
# But reading an excluded property should be fine during regular | |
# datastore reads. | |
obj = db.get(key) | |
try: | |
obj.prop_b | |
except AttributeError: | |
self.fail('AttributeError raised incorrectly.') | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
projection = ['user_email', 'username', 'user_nickname', 'user_id', | |
'is_phantom', 'birthdate', 'may_be_child', 'restricted_domain', | |
'child_capabilities'] | |
with db_util.protobuf_projection(user_models.UserData, projection): | |
students_data = user_models.UserData.all().fetch(50) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Utility functions to help write thread-safe code. | |
The main advantage of this code is that it knows whether the | |
dev-appserver is being run with 'threadsafe: true' or 'threadsafe: | |
false', and if it's false, it uses more efficient, but less | |
thread-safe, variants. (This 'knowing' is actually enforced in the | |
unittest.) | |
Therefore, don't use these methods if you are spawning threads | |
yourself; these are meant to be used for 'generic' GAE objects that do | |
not use threads unless we are using a threaded version of our webapp. | |
NOTE: This is imported by appengine_config.py, which is very | |
restricted in what it's safe to import. Only import python system | |
libs from this file. | |
""" | |
import os | |
import threading | |
import modules_util | |
_USES_THREADSAFE = modules_util.module_yaml(full_parse=False).get( | |
'threadsafe', False) | |
if _USES_THREADSAFE: | |
import UserDict | |
import threading | |
_THREAD_LOCAL_SUPER = threading.local | |
# Sadly, it's illegal to inherit from two C classes, and | |
# threading.local and dict are both written in C. So we have to | |
# use UserDict instead of dict to make ThreadLocalDict() work. | |
_DICT_SUPER = UserDict.UserDict | |
RLock = threading.RLock | |
else: | |
# No need for threadsafety if we don't see threadsafe: true in app.yaml. | |
_THREAD_LOCAL_SUPER = object | |
_DICT_SUPER = dict | |
# I have to do this context-manager the old-fashioned way since | |
# we do collections.defaultdict(threading.RLock) in cacheutil.py. | |
class RLock(object): | |
"""A no-op version of threading.RLock; can only be used with 'with'.""" | |
def __enter__(self): | |
pass | |
def __exit__(self, *args): | |
pass | |
class ThreadLocal(_THREAD_LOCAL_SUPER): | |
"""Equivalent to threading.local. | |
You can use this in two ways: make an instance of it to get a | |
thread-local dummy-object, or subclass it to get a thread-local | |
smartie-object. | |
Usage: | |
class MyClass(thread_util.ThreadLocal): | |
... | |
my_instance = MyClass() # note this is at the global level | |
my_random_container = thread_util.ThreadLocal() | |
Now my_instance will have a different version of MyClass() in every | |
thread. my_random_container will also be different in every thread. | |
The reason to use this instead of threading.local directly is that | |
we revert to 'object' when dev-appserver is run in non-threaded mode, | |
so you don't pay the cost of thread-local-ness if you don't have to. | |
""" | |
pass | |
class ThreadLocalDict(_DICT_SUPER, _THREAD_LOCAL_SUPER): | |
"""A threadsafe dict: each thread has its own version of it.""" | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment