Instantly share code, notes, and snippets.
Forked from Finndersen/polymorphic_related_queryset.py
Created
August 29, 2023 12:53
-
Star
(0)
0
You must be signed in to star a gist -
Fork
(0)
0
You must be signed in to fork a gist
-
Save mazulo/488797b9b2965d21ca1c01c636ef214e to your computer and use it in GitHub Desktop.
PolymorphicRelatedQuerySet which allows select_related() to return polymorphic child models instead of base model
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
PolymorphicRelatedQuerySet which allows select_related() to return polymorphic child models instead of base model | |
Based on code by AndySun25 at: https://gist.github.com/AndySun25/157da4195c139a38ec3d609a277773cb | |
To address issue: https://github.com/django-polymorphic/django-polymorphic/issues/244 | |
Added support for Polymorphic Proxy Models | |
""" | |
import functools | |
from django.db import models | |
from django.db.models.constants import LOOKUP_SEP | |
from django.db.models.query import ModelIterable | |
from django.apps import apps | |
from polymorphic.models import PolymorphicModel | |
from polymorphic.query import transmogrify | |
def rgetattr(obj, attr, separator='.'): | |
"""Recursive getattr to retrieve attributes of nested objects.""" | |
return functools.reduce(getattr, [obj] + attr.split(separator)) | |
def rsetattr(obj, attr, value, separator='.'): | |
"""Recursively getattr to fetch final object layer before using setattr.""" | |
attrs = attr.split(separator) | |
setattr(functools.reduce(getattr, attrs[:-1], obj), attrs[-1], value) | |
def is_polymorphic_subclass(super_cls, sub_cls): | |
try: | |
return (issubclass(sub_cls, super_cls) and | |
sub_cls != models.Model and | |
sub_cls != super_cls and | |
sub_cls != PolymorphicModel and not | |
sub_cls._meta.abstract) | |
except AttributeError: | |
return False | |
def polymorphic_iterator(*fields): | |
""" | |
Creates a custom iterator which replaces the referenced instances with child versions | |
:param fields: | |
:return: | |
""" | |
class PolymorphicModelIterable: | |
def __init__(self, *args, **kwargs): | |
self.iterable = ModelIterable(*args, **kwargs) | |
def __iter__(self): | |
for obj in self.iterable: | |
for field in fields: | |
# Must get recursively in case our related polymorphic model is nested. | |
instance = rgetattr(obj, field, separator=LOOKUP_SEP) | |
real_instance_class = instance.polymorphic_ctype.model_class() | |
if real_instance_class._meta.proxy: | |
# Change class of proxy model | |
real_instance = transmogrify(real_instance_class, instance) | |
else: | |
real_instance_name = instance.polymorphic_ctype.model | |
# We must copy the field cache for the base_model instance to the real instance | |
# else additional data from select_related will be lost. | |
real_instance = instance._state.fields_cache.pop(real_instance_name) | |
real_instance._state.fields_cache = instance._state.fields_cache | |
# Same recursion goes here for setting the related object. | |
rsetattr(obj, field, real_instance, separator=LOOKUP_SEP) | |
yield obj | |
return PolymorphicModelIterable | |
class PolymorphicRelatedQuerySetMixin: | |
""" | |
A class with a relationship to a polymorphic model should use this queryset | |
TODO: Override normal select_related() so the functionality works automatically/seamlessly | |
""" | |
def _get_nested_base_model(self, field): | |
field_parts = field.split(LOOKUP_SEP) | |
model = self.model | |
for part in field_parts: | |
field = getattr(model, part) | |
# Should find a better solution to determine the related model than below. | |
try: | |
# In case of forward related descriptors. | |
model = field.field.related_model | |
except AttributeError: | |
# In case of reverse related descriptors. | |
model = field.related.related_model | |
return model | |
def select_polymorphic_related(self, *fields): | |
""" | |
Specify fields that should be cast to the real polymorphic class. | |
""" | |
subclass_names = [] | |
# Add references to all child models in select_related, e.g. reference__childA, reference__childB etc | |
if fields: | |
for field_name in fields: | |
polymorphic_parent_model = self._get_nested_base_model(field_name) | |
# This is somewhat a replication of PolymorphicModel._get_inheritance_relation_fields_and_models(), | |
# but it's necessary to do this unless we want to instantiate a base_model instance for every | |
# query made. Would be a consideration to perhaps cache the results. | |
for model in apps.get_models(): | |
if is_polymorphic_subclass(polymorphic_parent_model, model): | |
parent_link_field = model._meta.parents[polymorphic_parent_model] | |
if parent_link_field is None: # Proxy model | |
pass | |
elif parent_link_field.remote_field.related_name: | |
subclass_names.append(parent_link_field.remote_field.related_name) | |
else: | |
subclass_names.append('{}__{}'.format(field_name, model.__name__.lower())) | |
# We also need to add the polymorphic_ctype field name | |
polymorphic_ctype_field_name = polymorphic_parent_model.polymorphic_internal_model_fields[0] | |
subclass_names.append('{}__{}'.format(field_name, polymorphic_ctype_field_name)) | |
self._iterable_class = polymorphic_iterator(*fields) | |
return self.select_related(*subclass_names) | |
class PolymorphicRelatedQuerySet(PolymorphicRelatedQuerySetMixin, models.QuerySet): | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment