Skip to content

Instantly share code, notes, and snippets.

@ostronom
Created March 11, 2011 19:01
Show Gist options
  • Save ostronom/866378 to your computer and use it in GitHub Desktop.
Save ostronom/866378 to your computer and use it in GitHub Desktop.
from django.db.models import TextField, ForeignKey
from django.db.models.signals import m2m_changed, post_save, class_prepared
class ComputedField(TextField):
def __init__(self, *args, **kwargs):
assert 'computation' in kwargs
assert callable(kwargs['computation'])
kwargs['editable'] = False
kwargs['blank'] = True
self.computation = kwargs.pop('computation')
self.uses_m2m = kwargs.pop('uses_m2m', None)
self.reverse_recompute = kwargs.pop('reverse_recompute', False)
self.simple = []
self.m2m = []
super(ComputedField, self).__init__(*args, **kwargs)
def run_computation(self, instance, field):
self.model.objects.filter(pk = instance.pk).update(**{field.attname: field.computation(instance)})
def m2m_saved(self, *args, **kwargs):
if kwargs['action'] in ['post_add', 'post_clear', 'post_remove']:
for field, rels in self.m2m:
if kwargs['sender'] in (x.through for x in rels):
self.run_computation(kwargs['instance'], field)
def m2m_reverse_saved(self, *args, **kwargs):
if kwargs['created']:
return
for field, throughs in self.m2m:
for x in filter(lambda w: kwargs['sender'] == w.to, throughs):
""" Lookup for fk's to m2m sides """
to_self, to_sender = None, None
for fk in filter(lambda w: isinstance(w, ForeignKey), x.through._meta.local_fields):
if fk.rel.to == self.model:
to_self = fk
if fk.rel.to == kwargs['sender']:
to_sender = fk
assert to_self is not None
assert to_sender is not None
affected_pks = x.through.objects.filter(**{to_sender.name: kwargs['instance'].pk}).values_list(to_self.name)
for instance in self.model.objects.filter(pk__in = affected_pks):
self.run_computation(instance, field)
def model_saved(self, *args, **kwargs):
for field in self.simple:
self.run_computation(kwargs['instance'], field)
def model_prepared(self, *args, **kwargs):
for field in getattr(kwargs['sender'], '__computed_field_mixin'):
if field.uses_m2m is not None:
mpath = '%s_%s' % (self.model._meta.object_name, field.attname)
rels = []
for m2m in filter(lambda w: w.name in field.uses_m2m, self.model._meta.many_to_many):
m2m_changed.connect(self.m2m_saved, sender = m2m.rel.through, dispatch_uid = 'ComputedField_%s' % mpath)
if self.reverse_recompute:
post_save.connect(self.m2m_reverse_saved, sender = m2m.rel.to, dispatch_uid = 'ComputedField_reverse_%s' % mpath)
rels.append(m2m.rel)
self.m2m.append( (field, rels) )
else:
self.simple.append(field)
if self.simple:
post_save.connect(self.model_saved, sender = self.model, dispatch_uid = 'ComputedField_simple_%s' % mpath)
def contribute_to_class(self, cls, name):
super(ComputedField, self).contribute_to_class(cls, name)
if hasattr(self.model, '__computed_field_mixin'):
getattr(self.model, '__computed_field_mixin').append(self)
else:
setattr(self.model, '__computed_field_mixin', [self])
class_prepared.connect(self.model_prepared, sender = self.model)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment