Last active
August 29, 2015 14:24
-
-
Save willstott101/d6320ff51e41035aa51c to your computer and use it in GitHub Desktop.
Checks prefetched cache of a queryset, and filters/orders in python. To avoid large number of queries.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from operator import attrgetter | |
def maybe_prefetched(parent, related_manager): | |
#return related_manager | |
try: | |
cache = parent._prefetched_objects_cache | |
except AttributeError: | |
# Nothing is prefetched. | |
return related_manager | |
if hasattr(related_manager, 'prefetch_cache_name'): | |
prefetch_cache_name = related_manager.prefetch_cache_name | |
elif hasattr(related_manager, 'field'): | |
prefetch_cache_name = related_manager.field.related_query_name() | |
else: | |
# For Django Prefetch compatibility | |
fields = parent.__class__._meta.fields_map.values() | |
fields = [f for f in fields if f.related_model is related_manager.model] | |
if len(fields) > 1: | |
raise KeyError( | |
'Too many Prefetch managers for model: %r' % related_manager.model) | |
prefetch_cache_name = fields[0].field.related_query_name() | |
try: | |
cache = cache[prefetch_cache_name] | |
except KeyError: | |
# This particular field isn't prefetched. | |
return related_manager | |
return PythonQuerySet(parent, related_manager, cache) | |
class BailFakeQuerySet(Exception): | |
pass | |
class PythonQuerySet(object): | |
def __init__(self, parent, related_manager, cache, history=None): | |
self.parent = parent | |
self.related_manager = related_manager | |
self.cache = cache | |
if history is None: | |
self.history = tuple() # ('name', args, kwargs) | |
else: | |
self.history = history | |
def _applied_history(self): | |
qs = self.related_manager.all() | |
for func, args, kwargs in self.history: | |
qs = getattr(qs, func)(*args, **kwargs) | |
return qs | |
def __getattr__(self, name): | |
try: | |
val = super(PythonQuerySet, self).__getattribute__(name) | |
if callable(val): | |
def fake_func(*args, **kwargs): | |
self.history += (name, args, kwargs) | |
try: | |
return val(*args, **kwargs) | |
except BailFakeQuerySet: | |
return self._applied_history() | |
return fake_func | |
else: | |
return val | |
except AttributeError: | |
pass | |
return getattr(self._applied_history(), name) | |
def _clone(self, cache=None): | |
if cache is None: | |
cache = list(self.cache) | |
return PythonQuerySet( | |
self.parent, self.related_manager, | |
cache, self.history) | |
# Emulating a QuerySet from here down... | |
def all(self): | |
return self._clone() | |
def __iter__(self): | |
return iter(self.cache) | |
def only(self, *args, **kwargs): | |
return self._clone() | |
def filter(self, *args, **kwargs): | |
if args: | |
raise BailFakeQuerySet | |
cache = self.cache | |
for name, val in kwargs.items(): | |
if '__' in name: | |
# TODO: Support query operators '__gt' etc | |
raise BailFakeQuerySet | |
cache = [x for x in cache if getattr(x, name) == val] | |
return self._clone(cache) | |
def exclude(self, *args, **kwargs): | |
if args: | |
raise BailFakeQuerySet | |
cache = self.cache | |
for name, val in kwargs.items(): | |
if '__' in name: | |
# TODO: Support query operators '__gt' etc | |
raise BailFakeQuerySet | |
cache = [x for x in cache if getattr(x, name) != val] | |
return self._clone(cache) | |
def get(self, *args, **kwargs): | |
ret = self.filter(*args, **kwargs) | |
if ret.count() > 1: | |
raise self.related_manager.model.MultipleObjectsReturned | |
elif not ret: | |
raise self.related_manager.model.DoesNotExist | |
return ret.first() | |
def order_by(self, *args, **kwargs): | |
cache = self.cache | |
for arg in args[::-1]: | |
reverse = arg.startswith('-') | |
if reverse: | |
arg = arg[1:] | |
cache = sorted(cache, key=attrgetter(arg), reverse=reverse) | |
return self._clone(cache) | |
def values_list(self, *args, **kwargs): | |
if kwargs.get('flat', False): | |
if len(args) != 1: | |
raise BailFakeQuerySet | |
return [getattr(x, args[0]) for x in self.cache] | |
else: | |
return [tuple(getattr(x, a) for a in args) for x in self.cache] | |
def first(self): | |
return self.cache[0] if len(self.cache) else None | |
def last(self): | |
return self.cache[-1] if len(self.cache) else None | |
def count(self): | |
return len(self.cache) | |
def exists(self): | |
return bool(self.cache) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment