Last active
December 13, 2018 15:05
-
-
Save mark-mishyn/80a285e9cda70b9d21df106982b181c2 to your computer and use it in GitHub Desktop.
History mixin for Django REST Framework and django-reversions
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from django.core.exceptions import FieldDoesNotExist | |
from django_extensions.db.fields import json | |
from rest_framework.decorators import list_route, detail_route | |
from reversion.models import Version | |
from common.pagination import SmallPagination, HistoryDiffPagination | |
from history.serializers import HistoryUserSerializer | |
from history.views import get_serialized_history | |
class HistoryDiffPaginator(Paginator): | |
@cached_property | |
def count(self): | |
versions_count = super().count | |
if not versions_count: | |
return 0 | |
return versions_count - 1 | |
def page(self, number): | |
""" | |
The same as Paginator but does top + 1 to get paginated diff list. | |
To build one object in diff list we need 2 Versions. | |
So for example to get 10 items per page we need operate with 11 Versions. | |
""" | |
number = self.validate_number(number) | |
bottom = (number - 1) * self.per_page | |
top = bottom + self.per_page | |
if top + self.orphans >= self.count: | |
top = self.count | |
return self._get_page(self.object_list[bottom:top+1], number, self) | |
class HistoryDiffPagination(PageNumberPagination): | |
page_size = 25 | |
page_size_query_param = 'page_size' | |
max_page_size = 500 | |
django_paginator_class = HistoryDiffPaginator | |
def pk_to_str(model, pk): | |
instance = model.objects.filter(pk=pk).first() | |
if not instance: | |
return pk | |
if hasattr(instance, 'name'): | |
return instance.name | |
return str(instance) | |
def get_diff_list(versions_list, instance) -> list: | |
""" | |
Versions MUST be ordered: order_by('revision__date_created')! | |
""" | |
model = instance._meta.model | |
diff_list = [] | |
for h in range(len(versions_list) - 1): | |
prev_version = versions_list[h] | |
prev_version_data = json.loads(prev_version.serialized_data)[0]['fields'] | |
new_version = versions_list[h + 1] | |
new_version_data = json.loads(new_version.serialized_data)[0]['fields'] | |
diff_data = [] | |
for field_name in new_version_data.keys(): | |
if field_name in ('updated_at', 'updated_by'): | |
continue | |
new_value = new_version_data.get(field_name) | |
prev_value = prev_version_data.get(field_name) | |
if new_value != prev_value: | |
try: | |
model_field = model._meta.get_field(field_name) | |
except FieldDoesNotExist: # related DB column was removed | |
diff_data.append({ | |
'field': field_name, | |
'field_verbose_name': field_name, | |
'previous': prev_value, | |
'new': new_value | |
}) | |
else: | |
related_field_type = model_field.get_internal_type() | |
if related_field_type == 'ForeignKey': | |
related_model = model_field.rel.to | |
if new_value: | |
new_value = pk_to_str(related_model, new_value) | |
if prev_value: | |
prev_value = pk_to_str(related_model, prev_value) | |
elif related_field_type == 'ManyToManyField': | |
related_model = model_field.rel.to | |
new_value = ', '.join( | |
[pk_to_str(related_model, new_id) for new_id in new_value]) | |
prev_value = ', '.join( | |
[pk_to_str(related_model, prev_id) for prev_id in prev_value]) | |
diff_data.append({ | |
'field': field_name, | |
'field_verbose_name': model_field.verbose_name, | |
'previous': prev_value, | |
'new': new_value | |
}) | |
diff_list.append({ | |
'revision_user': HistoryUserSerializer(new_version.revision.user).data, | |
'previous_version': prev_version.pk, | |
'new_version': new_version.pk, | |
'date_created': new_version.revision.date_created, | |
'diff_data': diff_data, | |
}) | |
return diff_list | |
class HistoryMixin: | |
@detail_route(methods=['get']) | |
def object_history(self, request, pk=None, **kwargs): | |
instance = self.get_object() | |
history = Version.objects.get_for_object(instance).prefetch_related( | |
'object').select_related().select_related('revision__user') | |
paginator = SmallPagination() | |
result_page = paginator.paginate_queryset(get_serialized_history(history), request) | |
return paginator.get_paginated_response(result_page) | |
@list_route(methods=['get']) | |
def history(self, request, **kwargs): | |
model = self.queryset.model | |
history = Version.objects.get_for_model(model) | |
serialized_history = get_serialized_history(history) | |
delete_history = Version.objects.get_deleted(model) | |
serialized_history.extend(get_serialized_history(delete_history)) | |
paginator = SmallPagination() | |
result_page = paginator.paginate_queryset(serialized_history, request) | |
return paginator.get_paginated_response(result_page) | |
@detail_route(methods=['get']) | |
def diff_list(self, request, pk=None): | |
instance = self.get_object() | |
history = Version.objects.get_for_object(instance).prefetch_related( | |
'object').select_related('revision__user').order_by('revision__date_created') | |
paginator = HistoryDiffPagination() | |
page = paginator.paginate_queryset(history, request) | |
return paginator.get_paginated_response(get_diff_list(page, instance=instance)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment