-
-
Save onyekaa/fa5a3c6ceb78f7008928e834716f3124 to your computer and use it in GitHub Desktop.
Django Archive Mixin - https://stackoverflow.com/questions/34380033/django-cascading-move-to-a-separate-table-instead-of-cascading-delete
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import absolute_import, print_function, unicode_literals | |
from django.db import models, transaction | |
from django.utils import six | |
from django.apps import apps | |
from django.core.exceptions import FieldDoesNotExist | |
try: | |
# Django 1.7 | |
from django.contrib.admin.utils import NestedObjects | |
except ImportError: | |
# Django < 1.7 | |
from django.contrib.admin.util import NestedObjects | |
class DeleteModelQuerySet(object): | |
'''take a look at django.db.models.deletion | |
''' | |
def hard_delete(self): | |
super(DataModelQuerySet, self).delete() | |
def delete(self): | |
print('delete') | |
if not self.is_archivable(): | |
print('deleting') | |
super(DataModelQuerySet, self).delete() | |
return | |
print('achiving') | |
archive_object_ids = [] | |
seen = [] | |
collector = NestedObjects(using='default') # or specific database | |
collector.collect(list(self)) | |
collector.sort() | |
with transaction.atomic(): | |
for model, instances in six.iteritems(collector.data): | |
if model in self.model.exclude_models_from_archive(): | |
continue | |
assert hasattr(model, "is_archivable"), { | |
"model {} doesn't know about archive".format(model) | |
} | |
if not model.is_archivable(): | |
# just delete | |
continue | |
for instance in instances: | |
if instance in seen: | |
continue | |
seen.append(instance) | |
for ptr in six.itervalues(instance._meta.parents): | |
# add parents to seen | |
if ptr: | |
seen.append(getattr(instance, ptr.name)) | |
archive_object = model.create_archive_object(instance) | |
archive_object_ids.append(archive_object.id) | |
# real delete | |
super(DataModelQuerySet, self).delete() | |
archive_objects = self.model.get_archive_model().objects.filter(id__in=archive_object_ids) | |
return archive_objects | |
def undelete(self): | |
with transaction.atomic(): | |
self.unarchive() | |
super(DataModelQuerySet, self).delete() | |
def is_archivable(self): | |
# if false, we hard delete instead of archive | |
return self.model.is_archivable() | |
def unarchive(self): | |
for obj_archive in self: | |
self.model.create_live_object(obj_archive) | |
class DeleteModelMixin(models.Model): | |
@classmethod | |
def is_archivable(cls): | |
# override if you don't want to archive and just delete | |
return True | |
def get_deletable_objects(self): | |
collector = NestedObjects(using='default') # or specific database | |
collector.collect(list(self)) | |
collector.sort() | |
deletable_data = collector.data | |
return deletable_data | |
@classmethod | |
def create_archive_object(cls, obj): | |
# http://stackoverflow.com/q/21925671/433570 | |
# d = cls.objects.filter(id=obj.id).values()[0] | |
d = obj.__dict__.copy() | |
remove_fields = [] | |
for field_name, value in six.iteritems(d): | |
try: | |
obj._meta.get_field(field_name) | |
except FieldDoesNotExist: | |
remove_fields.append(field_name) | |
for remove_field in remove_fields: | |
d.pop(remove_field) | |
cls.convert_to_archive_dictionary(d) | |
# print(d) | |
archive_object = cls.get_archive_model().objects.create(**d) | |
return archive_object | |
@classmethod | |
def create_live_object(cls, obj): | |
# index error, dont know why.. | |
# d = cls.objects.filter(id=obj.id).values()[0] | |
d = obj.__dict__.copy() | |
remove_fields = [cls.convert_to_archive_field_name(field_name) + '_id' for field_name in cls.get_twostep_field_names()] | |
for field_name, value in six.iteritems(d): | |
try: | |
obj._meta.get_field(field_name) | |
except FieldDoesNotExist: | |
remove_fields.append(field_name) | |
for remove_field in remove_fields: | |
d.pop(remove_field) | |
cls.convert_to_live_dictionary(d) | |
live_object = cls.get_live_model().objects.create(**d) | |
return live_object | |
@classmethod | |
def get_archive_model_name(cls): | |
return '{}Archive'.format(cls._meta.model_name) | |
@classmethod | |
def get_live_model_name(cls): | |
if cls._meta.model_name.endswith("archive"): | |
length = len("Archive") | |
return cls._meta.model_name[:-length] | |
return cls._meta.model_name | |
@classmethod | |
def get_archive_model(cls): | |
# http://stackoverflow.com/a/26126935/433570 | |
return apps.get_model(app_label=cls._meta.app_label, model_name=cls.get_archive_model_name()) | |
@classmethod | |
def get_live_model(cls): | |
return apps.get_model(app_label=cls._meta.app_label, model_name=cls.get_live_model_name()) | |
@classmethod | |
def is_archive_model(cls): | |
if cls._meta.model_name.endswith("Archive"): | |
return True | |
return False | |
@classmethod | |
def is_live_model(cls): | |
if cls.is_archive_model(): | |
return False | |
return True | |
def make_referers_point_to_archive(self, archive_object, seen): | |
instance = self | |
for related in get_candidate_relations_to_delete(instance._meta): | |
accessor_name = related.get_accessor_name() | |
if accessor_name.endswith('+') or accessor_name.lower().endswith("archive"): | |
continue | |
referers = None | |
if related.one_to_one: | |
referer = getattr(instance, accessor_name, None) | |
if referer: | |
referers = type(referer).objects.filter(id=referer.id) | |
else: | |
referers = getattr(instance, accessor_name).all() | |
refering_field_name = '{}_archive'.format(related.field.name) | |
if referers: | |
assert hasattr(referers, 'is_archivable'), { | |
"referers is not archivable: {referer_cls}".format( | |
referer_cls=referers.model | |
) | |
} | |
archive_referers = referers.delete(seen=seen) | |
if referers.is_archivable(): | |
archive_referers.update(**{refering_field_name: archive_object}) | |
def hard_delete(self): | |
super(DeleteModelMixin, self).delete() | |
def delete(self, *args, **kwargs): | |
print('model delete') | |
self._meta.model.objects.filter(id=self.id).delete() | |
def undelete(self, commit=True): | |
self._meta.model.objects.filter(id=self.id).undelete() | |
def unarchive(self, commit=True): | |
self._meta.model.objects.filter(id=self.id).unarchive() | |
@classmethod | |
def get_archive_field_names(cls): | |
raise NotImplementedError('get_archive_field_names() must be implemented') | |
@classmethod | |
def convert_to_archive_dictionary(cls, d): | |
field_names = cls.get_archive_field_names() | |
for field_name in field_names: | |
field_name = '{}_id'.format(field_name) | |
archive_field_name = cls.convert_to_archive_field_name(field_name) | |
d[archive_field_name] = d.pop(field_name) | |
@classmethod | |
def convert_to_live_dictionary(cls, d): | |
field_names = list(set(cls.get_archive_field_names()) - set(cls.get_twostep_field_names())) | |
for field_name in field_names: | |
field_name = '{}_id'.format(field_name) | |
archive_field_name = cls.convert_to_archive_field_name(field_name) | |
d[field_name] = d.pop(archive_field_name) | |
@classmethod | |
def convert_to_archive_field_name(cls, field_name): | |
if field_name.endswith('_id'): | |
length = len('_id') | |
return '{}_archive_id'.format(field_name[:-length]) | |
return '{}_archive'.format(field_name) | |
@classmethod | |
def convert_to_live_field_name(cls, field_name): | |
if field_name.endswith('_archive_id'): | |
length = len('_archive_id') | |
return '{}_id'.format(field_name[:-length]) | |
if field_name.endswith('archive'): | |
length = len('_archive') | |
return '{}'.format(field_name[:-length]) | |
return None | |
@classmethod | |
def get_twostep_field_names(cls): | |
return [] | |
@classmethod | |
def exclude_models_from_archive(cls): | |
# excluded model can be deleted if referencing to me | |
# or just lives if I reference him | |
return [] | |
class Meta: | |
abstract = True |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment