Created
June 6, 2014 11:40
-
-
Save PirosB3/01dca53fd9e845e6b7e8 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import unicode_literals | |
from bisect import bisect | |
from collections import OrderedDict | |
from django.apps import apps | |
from django.conf import settings | |
from django.db.models.fields.related import ManyToManyRel | |
from django.db.models.fields import AutoField, FieldDoesNotExist | |
from django.db.models.fields.proxy import OrderWrt | |
from django.utils import six | |
from django.utils.encoding import force_text, smart_text, python_2_unicode_compatible | |
from django.utils.functional import cached_property | |
from django.utils.text import camel_case_to_spaces | |
from django.utils.translation import activate, deactivate_all, get_language, string_concat | |
DEFAULT_NAMES = ('verbose_name', 'verbose_name_plural', 'db_table', 'ordering', | |
'unique_together', 'permissions', 'get_latest_by', | |
'order_with_respect_to', 'app_label', 'db_tablespace', | |
'abstract', 'managed', 'proxy', 'swappable', 'auto_created', | |
'index_together', 'apps', 'default_permissions', | |
'select_on_save') | |
DATA = 0b0001 | |
M2M = 0b0010 | |
RELATED_OBJECTS = 0b0100 | |
RELATED_M2M = 0b1000 | |
# Aggregates | |
NON_RELATED_FIELDS = DATA | M2M | |
NONE = 0b0000 | |
LOCAL_ONLY = 0b0001 | |
CONCRETE = 0b0010 | |
INCLUDE_HIDDEN = 0b0100 | |
INCLUDE_PROXY = 0b1000 | |
def normalize_together(option_together): | |
""" | |
option_together can be either a tuple of tuples, or a single | |
tuple of two strings. Normalize it to a tuple of tuples, so that | |
calling code can uniformly expect that. | |
""" | |
try: | |
if not option_together: | |
return () | |
if not isinstance(option_together, (tuple, list)): | |
raise TypeError | |
first_element = next(iter(option_together)) | |
if not isinstance(first_element, (tuple, list)): | |
option_together = (option_together,) | |
# Normalize everything to tuples | |
return tuple(tuple(ot) for ot in option_together) | |
except TypeError: | |
# If the value of option_together isn't valid, return it | |
# verbatim; this will be picked up by the check framework later. | |
return option_together | |
@python_2_unicode_compatible | |
class Options(object): | |
def __init__(self, meta, app_label=None): | |
self.local_fields = [] | |
self.local_many_to_many = [] | |
self.virtual_fields = [] | |
self.model_name = None | |
self.verbose_name = None | |
self.verbose_name_plural = None | |
self.db_table = '' | |
self.ordering = [] | |
self.unique_together = [] | |
self.index_together = [] | |
self.select_on_save = False | |
self.default_permissions = ('add', 'change', 'delete') | |
self.permissions = [] | |
self.object_name = None | |
self.app_label = app_label | |
self.get_latest_by = None | |
self.order_with_respect_to = None | |
self.db_tablespace = settings.DEFAULT_TABLESPACE | |
self.meta = meta | |
self.pk = None | |
self.has_auto_field = False | |
self.auto_field = None | |
self.abstract = False | |
self.managed = True | |
self.proxy = False | |
# For any class that is a proxy (including automatically created | |
# classes for deferred object loading), proxy_for_model tells us | |
# which class this model is proxying. Note that proxy_for_model | |
# can create a chain of proxy models. For non-proxy models, the | |
# variable is always None. | |
self.proxy_for_model = None | |
# For any non-abstract class, the concrete class is the model | |
# in the end of the proxy_for_model chain. In particular, for | |
# concrete models, the concrete_model is always the class itself. | |
self.concrete_model = None | |
self.swappable = None | |
self.parents = OrderedDict() | |
self.auto_created = False | |
# To handle various inheritance situations, we need to track where | |
# managers came from (concrete or abstract base classes). | |
self.abstract_managers = [] | |
self.concrete_managers = [] | |
# List of all lookups defined in ForeignKey 'limit_choices_to' options | |
# from *other* models. Needed for some admin checks. Internal use only. | |
self.related_fkey_lookups = [] | |
# A custom app registry to use, if you're making a separate model set. | |
self.apps = apps | |
@property | |
def app_config(self): | |
# Don't go through get_app_config to avoid triggering imports. | |
return self.apps.app_configs.get(self.app_label) | |
@property | |
def installed(self): | |
return self.app_config is not None | |
def get_non_swapped_models(self, include_auto_created=True): | |
apps = self.apps.get_models(include_auto_created= | |
include_auto_created) | |
return filter(lambda a: not a._meta.swapped, apps) | |
def _validate_related_object(self, obj): | |
parent_list = self.get_parent_list() | |
return not ((obj.field.creation_counter < 0 | |
or obj.field.rel.parent_link) | |
and obj.model not in parent_list) | |
def get_new_field(self, field_name, opts=NONE): | |
field, model, direct, m2m = self.get_field_details(field_name, | |
opts) | |
return field | |
def get_field_details(self, field_name, opts=NONE): | |
base = OrderedDict() | |
if apps.ready: | |
for _, data in self.get_new_fields(types=RELATED_M2M, | |
opts=opts, with_model=True): | |
name = data[0].field.related_query_name() | |
base[name] = data + (False, True,) | |
for _, data in self.get_new_fields(types=RELATED_OBJECTS, | |
opts=opts, with_model=True): | |
name = data[0].field.related_query_name() | |
base[name] = data + (False, False,) | |
for name, data in self.get_new_fields(types=M2M, | |
opts=opts, with_model=True): | |
# NOTE: adding name and attname | |
base[data[0].name] = data + (True, True,) | |
base[name] = data + (True, True,) | |
for name, data in self.get_new_fields(types=DATA, | |
opts=opts, with_model=True): | |
# NOTE: adding name and attname | |
base[data[0].name] = data + (True, False,) | |
base[name] = data + (True, False,) | |
for f in self.virtual_fields: | |
base[f.name] = (f, f.model, True, False) | |
try: | |
return base[field_name] | |
except KeyError: | |
raise FieldDoesNotExist('%s has no field named %r' | |
% (self.object_name, field_name)) | |
def get_new_fields(self, types, opts=NONE, **kwargs): | |
fields = OrderedDict() | |
if types & NON_RELATED_FIELDS: | |
if not (opts & LOCAL_ONLY): | |
for parent in self.parents: | |
fields.update(parent._meta.get_new_fields(types, | |
opts, **kwargs)) | |
if types & DATA: | |
for field in self.local_fields: | |
if not ((opts & CONCRETE) and field.column is None): | |
data = (field, self.model) if 'with_model' in kwargs else field | |
fields[field.attname] = data | |
if types & M2M: | |
for field in self.local_many_to_many: | |
data = (field, self.model) if 'with_model' in kwargs else field | |
fields[field.attname] = data | |
if types & RELATED_M2M: | |
related_m2m_fields = OrderedDict() | |
if not (opts & LOCAL_ONLY): | |
for parent in self.parents: | |
for obj, data in parent._meta.get_new_fields(types=RELATED_M2M, | |
**dict(kwargs, inversed_order=True)): | |
is_valid = not (obj.field.creation_counter < 0 | |
and obj.model not in self.get_parent_list()) | |
if is_valid: | |
related_m2m_fields[obj] = data | |
for model in self.get_non_swapped_models(False): | |
for name, f in model._meta.get_new_fields(types=M2M, | |
inversed_order=True): | |
has_rel_attr = f.rel and not isinstance(f.rel.to, six.string_types) | |
if has_rel_attr and self == f.rel.to._meta: | |
data = (f.attname, self.model) if 'with_model' in kwargs else f.attname | |
related_m2m_fields[f.related] = data | |
if 'inversed_order' not in kwargs: | |
temp = OrderedDict() | |
for field, data in related_m2m_fields.items(): | |
if 'with_model' in kwargs: | |
key, value = data[0], (field, data[1]) | |
else: | |
key, value = data, field | |
temp[key] = value | |
related_m2m_fields = temp | |
fields.update(related_m2m_fields) | |
if types & RELATED_OBJECTS: | |
related_fields = OrderedDict() | |
# ERROR? check | |
if not (opts & LOCAL_ONLY): | |
for parent in self.parents: | |
for obj, data in parent._meta.get_new_fields( | |
types=RELATED_OBJECTS, | |
opts=INCLUDE_HIDDEN, | |
**dict(kwargs, inversed_order=True)): | |
if self._validate_related_object(obj): | |
related_fields[obj] = data | |
for model in self.get_non_swapped_models(True): | |
# NOTE: missing virtual fields | |
for name, f in model._meta.get_new_fields(types=DATA, | |
opts=INCLUDE_HIDDEN, | |
inversed_order=True): | |
has_rel_attr = hasattr(f, 'rel') and f.rel | |
if has_rel_attr and f.has_class_relation(): | |
to_meta = f.rel.to._meta | |
if (to_meta == self) or ((opts & INCLUDE_PROXY) | |
and self.concrete_model == to_meta.concrete_model): | |
data = (f.attname, self.model) if 'with_model' in kwargs else f.attname | |
related_fields[f.related] = data | |
if not opts & INCLUDE_HIDDEN: | |
related_fields = OrderedDict([(k, v) for k, v in related_fields.items() | |
if not k.field.rel.is_hidden()]) | |
if 'inversed_order' not in kwargs: | |
temp = OrderedDict() | |
for field, data in related_fields.items(): | |
if 'with_model' in kwargs: | |
key, value = data[0], (field, data[1]) | |
else: | |
key, value = data, field | |
temp[key] = value | |
related_fields = temp | |
fields.update(related_fields) | |
return tuple(fields.iteritems()) | |
def contribute_to_class(self, cls, name): | |
from django.db import connection | |
from django.db.backends.utils import truncate_name | |
cls._meta = self | |
self.model = cls | |
# First, construct the default values for these options. | |
self.object_name = cls.__name__ | |
self.model_name = self.object_name.lower() | |
self.verbose_name = camel_case_to_spaces(self.object_name) | |
# Store the original user-defined values for each option, | |
# for use when serializing the model definition | |
self.original_attrs = {} | |
# Next, apply any overridden values from 'class Meta'. | |
if self.meta: | |
meta_attrs = self.meta.__dict__.copy() | |
for name in self.meta.__dict__: | |
# Ignore any private attributes that Django doesn't care about. | |
# NOTE: We can't modify a dictionary's contents while looping | |
# over it, so we loop over the *original* dictionary instead. | |
if name.startswith('_'): | |
del meta_attrs[name] | |
for attr_name in DEFAULT_NAMES: | |
if attr_name in meta_attrs: | |
setattr(self, attr_name, meta_attrs.pop(attr_name)) | |
self.original_attrs[attr_name] = getattr(self, attr_name) | |
elif hasattr(self.meta, attr_name): | |
setattr(self, attr_name, getattr(self.meta, attr_name)) | |
self.original_attrs[attr_name] = getattr(self, attr_name) | |
ut = meta_attrs.pop('unique_together', self.unique_together) | |
self.unique_together = normalize_together(ut) | |
it = meta_attrs.pop('index_together', self.index_together) | |
self.index_together = normalize_together(it) | |
# verbose_name_plural is a special case because it uses a 's' | |
# by default. | |
if self.verbose_name_plural is None: | |
self.verbose_name_plural = string_concat(self.verbose_name, 's') | |
# Any leftover attributes must be invalid. | |
if meta_attrs != {}: | |
raise TypeError("'class Meta' got invalid attribute(s): %s" % ','.join(meta_attrs.keys())) | |
else: | |
self.verbose_name_plural = string_concat(self.verbose_name, 's') | |
del self.meta | |
# If the db_table wasn't provided, use the app_label + model_name. | |
if not self.db_table: | |
self.db_table = "%s_%s" % (self.app_label, self.model_name) | |
self.db_table = truncate_name(self.db_table, connection.ops.max_name_length()) | |
def _prepare(self, model): | |
if self.order_with_respect_to: | |
self.order_with_respect_to = self.get_field(self.order_with_respect_to) | |
self.ordering = ('_order',) | |
if not any(isinstance(field, OrderWrt) for field in model._meta.local_fields): | |
model.add_to_class('_order', OrderWrt()) | |
else: | |
self.order_with_respect_to = None | |
if self.pk is None: | |
if self.parents: | |
# Promote the first parent link in lieu of adding yet another | |
# field. | |
field = next(six.itervalues(self.parents)) | |
# Look for a local field with the same name as the | |
# first parent link. If a local field has already been | |
# created, use it instead of promoting the parent | |
already_created = [fld for fld in self.local_fields if fld.name == field.name] | |
if already_created: | |
field = already_created[0] | |
field.primary_key = True | |
self.setup_pk(field) | |
else: | |
auto = AutoField(verbose_name='ID', primary_key=True, | |
auto_created=True) | |
model.add_to_class('id', auto) | |
def add_field(self, field): | |
# Insert the given field in the order in which it was created, using | |
# the "creation_counter" attribute of the field. | |
# Move many-to-many related fields from self.fields into | |
# self.many_to_many. | |
if field.rel and isinstance(field.rel, ManyToManyRel): | |
self.local_many_to_many.insert(bisect(self.local_many_to_many, field), field) | |
if hasattr(self, '_m2m_cache'): | |
del self._m2m_cache | |
else: | |
self.local_fields.insert(bisect(self.local_fields, field), field) | |
self.setup_pk(field) | |
if hasattr(self, '_field_cache'): | |
del self._field_cache | |
del self._field_name_cache | |
# The fields, concrete_fields and local_concrete_fields are | |
# implemented as cached properties for performance reasons. | |
# The attrs will not exists if the cached property isn't | |
# accessed yet, hence the try-excepts. | |
try: | |
del self.fields | |
except AttributeError: | |
pass | |
try: | |
del self.concrete_fields | |
except AttributeError: | |
pass | |
try: | |
del self.local_concrete_fields | |
except AttributeError: | |
pass | |
if hasattr(self, '_name_map'): | |
del self._name_map | |
def add_virtual_field(self, field): | |
self.virtual_fields.append(field) | |
def setup_pk(self, field): | |
if not self.pk and field.primary_key: | |
self.pk = field | |
field.serialize = False | |
def pk_index(self): | |
""" | |
Returns the index of the primary key field in the self.concrete_fields | |
list. | |
""" | |
return self.concrete_fields.index(self.pk) | |
def setup_proxy(self, target): | |
""" | |
Does the internal setup so that the current model is a proxy for | |
"target". | |
""" | |
self.pk = target._meta.pk | |
self.proxy_for_model = target | |
self.db_table = target._meta.db_table | |
def __repr__(self): | |
return '<Options for %s>' % self.object_name | |
def __str__(self): | |
return "%s.%s" % (smart_text(self.app_label), smart_text(self.model_name)) | |
def verbose_name_raw(self): | |
""" | |
There are a few places where the untranslated verbose name is needed | |
(so that we get the same value regardless of currently active | |
locale). | |
""" | |
lang = get_language() | |
deactivate_all() | |
raw = force_text(self.verbose_name) | |
activate(lang) | |
return raw | |
verbose_name_raw = property(verbose_name_raw) | |
def _swapped(self): | |
""" | |
Has this model been swapped out for another? If so, return the model | |
name of the replacement; otherwise, return None. | |
For historical reasons, model name lookups using get_model() are | |
case insensitive, so we make sure we are case insensitive here. | |
""" | |
if self.swappable: | |
model_label = '%s.%s' % (self.app_label, self.model_name) | |
swapped_for = getattr(settings, self.swappable, None) | |
if swapped_for: | |
try: | |
swapped_label, swapped_object = swapped_for.split('.') | |
except ValueError: | |
# setting not in the format app_label.model_name | |
# raising ImproperlyConfigured here causes problems with | |
# test cleanup code - instead it is raised in get_user_model | |
# or as part of validation. | |
return swapped_for | |
if '%s.%s' % (swapped_label, swapped_object.lower()) not in (None, model_label): | |
return swapped_for | |
return None | |
swapped = property(_swapped) | |
def _map_model(self, fields): | |
res = [] | |
for fn, f in fields: | |
model = self.get_field_details(fn)[1] | |
if model == self.model: | |
model = None | |
res.append((f, model)) | |
return res | |
@cached_property | |
def fields(self): | |
return [field for field_name, field | |
in self.get_new_fields(types=DATA)] | |
@cached_property | |
def concrete_fields(self): | |
return [field for field_name, field | |
in self.get_new_fields(types=DATA, opts=CONCRETE)] | |
@cached_property | |
def local_concrete_fields(self): | |
return [field for field_name, field | |
in self.get_new_fields(types=DATA, opts=CONCRETE | LOCAL_ONLY)] | |
def get_fields_with_model(self): | |
# get_fields(local=RECURSIVE) | |
""" | |
Returns a sequence of (field, model) pairs for all fields. The "model" | |
element is None for fields on the current model. Mostly of use when | |
constructing queries so that we know which model a field belongs to. | |
""" | |
return self._map_model(self.get_new_fields(types=DATA)) | |
#try: | |
#self._field_cache | |
#except AttributeError: | |
#self._fill_fields_cache() | |
#return self._field_cache | |
def get_concrete_fields_with_model(self): | |
# get_fields(local=RECURSIVE | CONCRETE) | |
return [(field, model) for field, model in self.get_fields_with_model() if | |
field.column is not None] | |
def _fill_fields_cache(self): | |
cache = [] | |
for parent in self.parents: | |
for field, model in parent._meta.get_fields_with_model(): | |
if model: | |
cache.append((field, model)) | |
else: | |
cache.append((field, parent)) | |
cache.extend((f, None) for f in self.local_fields) | |
self._field_cache = tuple(cache) | |
self._field_name_cache = [x for x, _ in cache] | |
def _many_to_many(self): | |
return [field for field_name, field | |
in self.get_new_fields(types=M2M)] | |
many_to_many = property(_many_to_many) | |
def get_m2m_with_model(self): | |
#get_fields(m2m=RECURSIVE) | |
""" | |
The many-to-many version of get_fields_with_model(). | |
""" | |
return self._map_model(self.get_new_fields(types=M2M)) | |
#try: | |
#self._m2m_cache | |
#except AttributeError: | |
#self._fill_m2m_cache() | |
#return list(six.iteritems(self._m2m_cache)) | |
def _fill_m2m_cache(self): | |
cache = OrderedDict() | |
for parent in self.parents: | |
for field, model in parent._meta.get_m2m_with_model(): | |
if model: | |
cache[field] = model | |
else: | |
cache[field] = parent | |
for field in self.local_many_to_many: | |
cache[field] = None | |
self._m2m_cache = cache | |
def get_field(self, name, many_to_many=True): | |
""" | |
Returns the requested field by name. Raises FieldDoesNotExist on error. | |
""" | |
to_search = (self.fields + self.many_to_many) if many_to_many else self.fields | |
for f in to_search: | |
if f.name == name: | |
return f | |
raise FieldDoesNotExist('%s has no field named %r' % (self.object_name, name)) | |
def get_field_by_name(self, name): | |
""" | |
Returns the (field_object, model, direct, m2m), where field_object is | |
the Field instance for the given name, model is the model containing | |
this field (None for local fields), direct is True if the field exists | |
on this model, and m2m is True for many-to-many relations. When | |
'direct' is False, 'field_object' is the corresponding RelatedObject | |
for this field (since the field doesn't have an instance associated | |
with it). | |
Uses a cache internally, so after the first access, this is very fast. | |
""" | |
mine = list(self.get_field_details(name)) | |
if mine[1] is self.model: | |
mine[1] = None | |
mine = tuple(mine) | |
try: | |
try: | |
theirs = self._name_map[name] | |
except AttributeError: | |
cache = self.init_name_map() | |
theirs = cache[name] | |
except KeyError: | |
raise FieldDoesNotExist('%s has no field named %r' | |
% (self.object_name, name)) | |
if not mine == theirs: | |
import ipdb; ipdb.set_trace() | |
return theirs | |
def get_all_field_names(self): | |
""" | |
Returns a list of all field names that are possible for this model | |
(including reverse relation names). This is used for pretty printing | |
debugging output (a list of choices), so any internal-only field names | |
are not included. | |
""" | |
#fields = filter(lambda val: not val.endswith('+'), get_fields(m2m=RECURSIVE)) | |
try: | |
cache = self._name_map | |
except AttributeError: | |
cache = self.init_name_map() | |
names = sorted(cache.keys()) | |
# Internal-only names end with "+" (symmetrical m2m related names being | |
# the main example). Trim them. | |
return [val for val in names if not val.endswith('+')] | |
def init_name_map(self): | |
""" | |
Initialises the field name -> field object mapping. | |
""" | |
cache = {} | |
# We intentionally handle related m2m objects first so that symmetrical | |
# m2m accessor names can be overridden, if necessary. | |
for f, model in self.get_all_related_m2m_objects_with_model(): | |
cache[f.field.related_query_name()] = (f, model, False, True) | |
for f, model in self.get_all_related_objects_with_model(): | |
cache[f.field.related_query_name()] = (f, model, False, False) | |
for f, model in self.get_m2m_with_model(): | |
cache[f.name] = cache[f.attname] = (f, model, True, True) | |
for f, model in self.get_fields_with_model(): | |
cache[f.name] = cache[f.attname] = (f, model, True, False) | |
for f in self.virtual_fields: | |
cache[f.name] = (f, None if f.model == self.model else f.model, True, False) | |
if apps.ready: | |
self._name_map = cache | |
return cache | |
def get_all_related_objects(self, local_only=False, include_hidden=False, | |
include_proxy_eq=False): | |
return [k for k, v in self.get_all_related_objects_with_model( | |
local_only=local_only, include_hidden=include_hidden, | |
include_proxy_eq=include_proxy_eq)] | |
def get_all_related_objects_with_model(self, local_only=False, | |
include_hidden=False, | |
include_proxy_eq=False): | |
#bits = LOCAL | |
#if not local_only: | |
#bits |= RECURSIVE | |
#if include_hidden: | |
#bits |= HIDDEN | |
#if include_proxy_eq: | |
#bits |= PROXY | |
#get_fields(related=bits) | |
""" | |
Returns a list of (related-object, model) pairs. Similar to | |
get_fields_with_model(). | |
""" | |
try: | |
self._related_objects_cache | |
except AttributeError: | |
self._fill_related_objects_cache() | |
predicates = [] | |
if local_only: | |
predicates.append(lambda k, v: not v) | |
if not include_hidden: | |
predicates.append(lambda k, v: not k.field.rel.is_hidden()) | |
cache = (self._related_objects_proxy_cache if include_proxy_eq | |
else self._related_objects_cache) | |
return [t for t in cache.items() if all(p(*t) for p in predicates)] | |
def _fill_related_objects_cache(self): | |
cache = OrderedDict() | |
parent_list = self.get_parent_list() | |
for parent in self.parents: | |
# For each parent, recursively call this fn | |
for obj, model in parent._meta.get_all_related_objects_with_model(include_hidden=True): | |
# If model is invalid, continue | |
if (obj.field.creation_counter < 0 or obj.field.rel.parent_link) and obj.model not in parent_list: | |
continue | |
# Add model to cache | |
if not model: | |
cache[obj] = parent | |
else: | |
cache[obj] = model | |
# Collect also objects which are in relation to some proxy child/parent of self. | |
proxy_cache = cache.copy() | |
for klass in self.apps.get_models(include_auto_created=True): | |
if not klass._meta.swapped: | |
for f in klass._meta.local_fields + klass._meta.virtual_fields: | |
if (hasattr(f, 'rel') and f.rel and not isinstance(f.rel.to, six.string_types) | |
and f.generate_reverse_relation): | |
# If its on the same model | |
if self == f.rel.to._meta: | |
cache[f.related] = None | |
proxy_cache[f.related] = None | |
# If its on another model | |
elif self.concrete_model == f.rel.to._meta.concrete_model: | |
proxy_cache[f.related] = None | |
# Only return direct related links | |
self._related_objects_cache = cache | |
self._related_objects_proxy_cache = proxy_cache | |
def get_all_related_many_to_many_objects(self, local_only=False): | |
try: | |
cache = self._related_many_to_many_cache | |
except AttributeError: | |
cache = self._fill_related_many_to_many_cache() | |
if local_only: | |
return [k for k, v in cache.items() if not v] | |
return list(cache) | |
def get_all_related_m2m_objects_with_model(self): | |
""" | |
Returns a list of (related-m2m-object, model) pairs. Similar to | |
get_fields_with_model(). | |
""" | |
try: | |
cache = self._related_many_to_many_cache | |
except AttributeError: | |
cache = self._fill_related_many_to_many_cache() | |
return list(six.iteritems(cache)) | |
def _fill_related_many_to_many_cache(self): | |
cache = OrderedDict() | |
parent_list = self.get_parent_list() | |
for parent in self.parents: | |
for obj, model in parent._meta.get_all_related_m2m_objects_with_model(): | |
if obj.field.creation_counter < 0 and obj.model not in parent_list: | |
continue | |
if not model: | |
cache[obj] = parent | |
else: | |
cache[obj] = model | |
for klass in self.apps.get_models(): | |
if not klass._meta.swapped: | |
for f in klass._meta.local_many_to_many: | |
if (f.rel | |
and not isinstance(f.rel.to, six.string_types) | |
and self == f.rel.to._meta): | |
cache[f.related] = None | |
if apps.ready: | |
self._related_many_to_many_cache = cache | |
return cache | |
def get_base_chain(self, model): | |
""" | |
Returns a list of parent classes leading to 'model' (order from closet | |
to most distant ancestor). This has to handle the case were 'model' is | |
a grandparent or even more distant relation. | |
""" | |
if not self.parents: | |
return None | |
if model in self.parents: | |
return [model] | |
for parent in self.parents: | |
res = parent._meta.get_base_chain(model) | |
if res: | |
res.insert(0, parent) | |
return res | |
return None | |
def get_parent_list(self): | |
""" | |
Returns a list of all the ancestor of this model as a list. Useful for | |
determining if something is an ancestor, regardless of lineage. | |
""" | |
result = set() | |
for parent in self.parents: | |
result.add(parent) | |
result.update(parent._meta.get_parent_list()) | |
return result | |
def get_ancestor_link(self, ancestor): | |
""" | |
Returns the field on the current model which points to the given | |
"ancestor". This is possible an indirect link (a pointer to a parent | |
model, which points, eventually, to the ancestor). Used when | |
constructing table joins for model inheritance. | |
Returns None if the model isn't an ancestor of this one. | |
""" | |
if ancestor in self.parents: | |
return self.parents[ancestor] | |
for parent in self.parents: | |
# Tries to get a link field from the immediate parent | |
parent_link = parent._meta.get_ancestor_link(ancestor) | |
if parent_link: | |
# In case of a proxied model, the first link | |
# of the chain to the ancestor is that parent | |
# links | |
return self.parents[parent] or parent_link |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment