-
-
Save mikalv/fb995ff6b01ba2148ebcb672e7edc02d to your computer and use it in GitHub Desktop.
Virtual Field for Django
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from django.db.models import Field | |
from django.conf import settings | |
from django.utils.decorators import cached_property | |
class VirtualField(object): | |
""" | |
A virtual field, mainly used for caching and seamless computed field retrieval. | |
This acts both like a (cached) property and a virtual field if supported. | |
""" | |
auto_created = False | |
concrete = False | |
editable = False | |
hidden = False | |
is_relation = False | |
many_to_many = False | |
many_to_one = False | |
one_to_many = False | |
one_to_one = False | |
related_model = None | |
def __init__(self, fget=None, fset=None, cached=False, reflect=None, target=None, target_ignore_none=False, act_as_field=True, cache_when_set=True, verbose_name=None, *args, **kwargs): | |
"""Initializer for VirtualField. | |
All arguments are optional. | |
Keyword arguments: | |
fget -- Getter function for this field (instance arg is given) | |
fset -- Setter function for this field (instance and value args are given) | |
cached -- Specifies whether or not to cache the result of this field once invoked. (default is False) | |
reflect -- An iterable of key or tuples containing keys pointing to the specified (possibly nested) object attribute. Each key may be a lambda or function, in which case they are called once invoked. | |
target -- Same as reflect, except that they are not deleted once collected by __get__ and also acts as another cache target when the field's result is being cached. | |
target_ignore_none -- Specifies whether or not to ignore a target field which value is None. If this is True, fields with value as None are ignored and considered as being not "set". | |
verbose_name -- A human-readable name for the field. | |
Example: | |
@VirtualField(cache=True, reflect=('foo', ('bar', 'baz')), target=('quaz', ('qux', lambda: 'quux' + 'corge')) | |
def my_field(self): ... | |
This field, when invoked, will first try to fetch the caches in the following order: | |
* object.foo | |
* object.bar.baz | |
* object.quaz | |
* object.qux.quuxcorge | |
(The object is the instance of the owning model/class) | |
If one of the above attributes are available, the first available one is used as the result. Otherwise, the inner fget method is called to fetch it. | |
Since the cache attribute is set to True, the result is stored into instance (in the same manner as cached_property) and the fields specified in the `target' attribute. | |
""" | |
self.fget, self.fset, self.cached = fget, fset, cached | |
if fget is not None and hasattr(fget, '__name__'): | |
self.name = fget.__name__ | |
else: | |
self.name = None | |
self.act_as_field = act_as_field | |
self.cache_when_set = cache_when_set | |
self.target_ignore_none = target_ignore_none | |
self.cache_when_set = True | |
doc = getattr(fget, '__doc__', None) | |
if doc is not None: | |
self.__doc__ = doc | |
self._vf_reflects_old = reflect | |
self._vf_targets_old = reflect_readonly | |
super(VirtualField, self).__init__(*args, **kwargs) | |
self.verbose_name = verbose_name | |
if not act_as_field: | |
self.short_description = verbose_name | |
def _vf_normalize(self, name): | |
"""The function that does normalization of reflect and/or target argument. | |
For each item in the reflect/target list, | |
1. If the item is callable, call it to obtain the actual item (it must return either string or tuple) | |
2. If the item is None, ignore this item. | |
3. If the item is a string, take it as a key. make this field reference directly on it. | |
4. If the item is iterable, make this field follow the keys when fetched. | |
""" | |
# This must be called AFTER model is initialized! | |
# TODO Find out a better way to check if the model is initialized, not just checking whether self.model is set. | |
assert hasattr(self, 'model') | |
_str_types = six.string_types | |
for keys in getattr(self, name): | |
if callable(keys): | |
keys = keys() | |
if keys is None: | |
continue | |
elif isinstance(keys, _str_types): | |
keys = (keys,) | |
else: | |
keys = tuple(x() if callable(x) else x for x in keys) | |
if any(x is None for x in keys): | |
continue | |
yield keys | |
_vf_reflects = cached_property(lambda self: list(self._vf_normalize('_vf_reflects_old'))) | |
_vf_targets = cached_property(lambda self: list(self._vf_normalize('_vf_targets_old'))) | |
def _vf_enumerate(self): | |
for x in self._vf_reflects: | |
yield True, x | |
for x in self._vf_targets: | |
yield False, x | |
def getter(self, fget): | |
"""Specifies the getter of this field. | |
The usage is similliar to that of Python decorator `property'. | |
Usage: | |
@VirtualField(cached=True) | |
def field(instance): | |
return sum(x for x in instance.foos) | |
Or, alternatively: | |
field = VirtualField(cached=True) | |
@field.getter() | |
def field(instance): | |
return sum(x for x in instance.foos) | |
""" | |
self.fget = fget | |
if self.name is None and fget is not None and hasattr(fget, '__name__'): | |
self.name = fget.__name__ | |
doc = getattr(fget, '__doc__', None) | |
if doc is not None: | |
self.__doc__ = doc | |
return self | |
def setter(self, fset, still_cache=True): | |
"""Specifies the setter of this field. | |
The usage is similliar to that of Python decorator `property'. | |
Usage: | |
@VirtualField() | |
def lorem(instance): | |
return "Lorem ipsum dolor sit amet" | |
@lorem.setter | |
def lorem(instance, value): | |
print("You have set lorem to: %s" % (value,)) | |
""" | |
self.fset = fset | |
self.cache_when_set = still_cache | |
return self | |
def get_attname(self): | |
return self.name | |
def get_attname_column(self): | |
return self.get_attname(), None | |
def set_attributes_from_name(self, name): | |
if not self.name: | |
self.name = name | |
self.attname, self.column = self.get_attname(), None | |
self.concrete = False | |
if self.verbose_name is None and name: | |
self.verbose_name = name.replace('_', ' ') | |
if not self.act_as_field: | |
self.short_description = self.verbose_name | |
def contribute_to_class(self, cls, name): | |
"""Applies this field to the `cls' class, with name `name'""" | |
self.name = name | |
self.set_attributes_from_name(name) | |
self.concrete = False # Force non-concrete | |
self.model = cls | |
# Django >=1.6 required | |
if self.act_as_field: | |
if hasattr(cls._meta, 'add_virtual_field'): | |
cls._meta.add_virtual_field(self) | |
else: | |
try: | |
cls._meta.add_field(self, virtual=True) | |
except: | |
if hasattr(cls._meta, 'virtual_fields'): | |
cls._meta.virtual_fields.append(self) | |
else: | |
# Just act as a property | |
pass | |
setattr(cls, name, self) | |
def _store_cache(self, instance, value): | |
"""Stores the invocation result `value' to fields referenced by `target'.""" | |
instance.__dict__[self.name] = value | |
for keys in self._vf_targets: | |
parent, child = None, instance | |
for k in keys[:-1]: | |
parent, child = child, getattr(child, k, None) | |
if child is None: | |
break | |
if child is not None: | |
setattr(child, keys[-1], value) | |
def _get_reflects(self, instance, do_cache, do_clean): | |
"""Fetch all proxies (reflect/target), and optionally copy them to targets and delete them.""" | |
logger.debug("%s trying on %s (%#x, %s): %s" % (self.name, instance, id(instance), instance.__dict__, repr(self._vf_reflects))) | |
value = self | |
to_be_cleaned = [] | |
for is_reflect, keys in self._vf_enumerate(): | |
parent, child = None, instance | |
t = 0 | |
logger.debug("Keys: %s" % (keys,)) | |
for k in keys: | |
parent, child = child, getattr(child, k, self) | |
logger.debug("%s To %s: %s" % (" "*t, k, "Invalid" if child is _invalid else getattr(child, '__dict__', child))) | |
if child is self: | |
break | |
t += 1 | |
if child is not instance and child is not self: | |
logger.debug("%s -> Found!" % (" "*t)) | |
if do_cache: | |
logger.debug("%s -> Putting cache as instructed.", " "*t) | |
logger.debug("%s -> BEFORE: %s", " "*t, instance.__dict__) | |
value = child | |
logger.debug("%s -> AFTER: %s", " "*t, instance.__dict__) | |
if do_clean and is_reflect and parent is not None: | |
logger.debug("%s -> Cleaning as instructed." % (" "*t)) | |
to_be_cleaned.append((parent, k)) | |
yield child | |
# Store cache after the loop so the targets are not polluted in the loop. | |
if value is not self: | |
self._store_cache(instance, child) | |
for parent, k in to_be_cleaned: | |
try: | |
delattr(parent, k) | |
except AttributeError: | |
pass | |
def is_cached(self, instance): | |
"""Determine if this field would return a cached value on `instance'.""" | |
if self.name in instance.__dict__: | |
return True | |
for x in self._get_reflects(instance, False, False): | |
# Encountered any valid cache store | |
return True | |
return False | |
def cleanup_reflects(self, instance): | |
"""Cleans up (delattr()s) all `reflect's on `instance'. | |
Note: this doesn't clean `target' attributes, which makes this field still cached.""" | |
for x in self._get_reflects(instance, False, True): pass | |
def refresh(self, instance, *args, **kwargs): | |
"""Refreshes the field immediately. This accepts additional arguments which are passed directly to the getter.""" | |
self.cleanup_reflects(instance) | |
self._store_cache(instance, self.fget(instance, *args, **kwargs)) | |
__call__ = getter # Acts as getter | |
def __get__(self, instance, owner=None): | |
if instance is None: | |
return self | |
if self.name in instance.__dict__: | |
# Not taking very advantage of cache :/ | |
return instance.__dict__[self.name] | |
if self.fget is None: | |
raise AttributeError("This virtual field is not readable") | |
# Iterate over all redirects. | |
caches = self._get_reflects(instance, self.cached, True) | |
try: | |
cache = next(caches) | |
except StopIteration: | |
if settings.DEBUG and self._vf_reflects: | |
logger.debug("CACHE: MISS") | |
logger.debug(instance.__dict__) | |
# No reflects! | |
value = self.fget(instance) | |
if self.cached: | |
instance.__dict__[self.name] = value | |
return value | |
else: | |
logger.debug("CACHE: FOUND") | |
# Clean-up all other reflects | |
for x in caches: pass | |
# Return the first found cache | |
return cache | |
def __set__(self, instance, value): | |
if instance is None: | |
raise ValueError("instance is None") | |
if self.cache_when_set: | |
instance.__dict__[self.name] = value | |
self._store_cache(instance, value) | |
if self.fset is not None: | |
self.fset(instance, value) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment