Created
March 11, 2011 19:01
-
-
Save ostronom/866378 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from django.db.models import TextField, ForeignKey | |
from django.db.models.signals import m2m_changed, post_save, class_prepared | |
class ComputedField(TextField): | |
def __init__(self, *args, **kwargs): | |
assert 'computation' in kwargs | |
assert callable(kwargs['computation']) | |
kwargs['editable'] = False | |
kwargs['blank'] = True | |
self.computation = kwargs.pop('computation') | |
self.uses_m2m = kwargs.pop('uses_m2m', None) | |
self.reverse_recompute = kwargs.pop('reverse_recompute', False) | |
self.simple = [] | |
self.m2m = [] | |
super(ComputedField, self).__init__(*args, **kwargs) | |
def run_computation(self, instance, field): | |
self.model.objects.filter(pk = instance.pk).update(**{field.attname: field.computation(instance)}) | |
def m2m_saved(self, *args, **kwargs): | |
if kwargs['action'] in ['post_add', 'post_clear', 'post_remove']: | |
for field, rels in self.m2m: | |
if kwargs['sender'] in (x.through for x in rels): | |
self.run_computation(kwargs['instance'], field) | |
def m2m_reverse_saved(self, *args, **kwargs): | |
if kwargs['created']: | |
return | |
for field, throughs in self.m2m: | |
for x in filter(lambda w: kwargs['sender'] == w.to, throughs): | |
""" Lookup for fk's to m2m sides """ | |
to_self, to_sender = None, None | |
for fk in filter(lambda w: isinstance(w, ForeignKey), x.through._meta.local_fields): | |
if fk.rel.to == self.model: | |
to_self = fk | |
if fk.rel.to == kwargs['sender']: | |
to_sender = fk | |
assert to_self is not None | |
assert to_sender is not None | |
affected_pks = x.through.objects.filter(**{to_sender.name: kwargs['instance'].pk}).values_list(to_self.name) | |
for instance in self.model.objects.filter(pk__in = affected_pks): | |
self.run_computation(instance, field) | |
def model_saved(self, *args, **kwargs): | |
for field in self.simple: | |
self.run_computation(kwargs['instance'], field) | |
def model_prepared(self, *args, **kwargs): | |
for field in getattr(kwargs['sender'], '__computed_field_mixin'): | |
if field.uses_m2m is not None: | |
mpath = '%s_%s' % (self.model._meta.object_name, field.attname) | |
rels = [] | |
for m2m in filter(lambda w: w.name in field.uses_m2m, self.model._meta.many_to_many): | |
m2m_changed.connect(self.m2m_saved, sender = m2m.rel.through, dispatch_uid = 'ComputedField_%s' % mpath) | |
if self.reverse_recompute: | |
post_save.connect(self.m2m_reverse_saved, sender = m2m.rel.to, dispatch_uid = 'ComputedField_reverse_%s' % mpath) | |
rels.append(m2m.rel) | |
self.m2m.append( (field, rels) ) | |
else: | |
self.simple.append(field) | |
if self.simple: | |
post_save.connect(self.model_saved, sender = self.model, dispatch_uid = 'ComputedField_simple_%s' % mpath) | |
def contribute_to_class(self, cls, name): | |
super(ComputedField, self).contribute_to_class(cls, name) | |
if hasattr(self.model, '__computed_field_mixin'): | |
getattr(self.model, '__computed_field_mixin').append(self) | |
else: | |
setattr(self.model, '__computed_field_mixin', [self]) | |
class_prepared.connect(self.model_prepared, sender = self.model) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment