Last active
October 29, 2019 20:15
-
-
Save alisonamerico/0b8054c7733d961be846ef185019834c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# models.py | |
from django.db import models | |
class Country(models.Model): | |
id = models.AutoField(db_column='ID_COUNTRY', primary_key=True) | |
name = models.CharField(db_column='NAME', max_length=30) | |
initials = models.CharField(db_column='INITIALS', max_length=4, blank=True, null=True) | |
class Meta: | |
managed = False | |
db_table = 'COUNTRY' | |
class State(models.Model): | |
id = models.AutoField(db_column='ID_STATE', primary_key=True) | |
country = models.ForeignKey(Country, on_delete=models.CASCADE, db_column='ID_COUNTRY') | |
name = models.CharField(db_column='NAME', max_length=100) | |
initials = models.CharField(db_column='INITIALS', max_length=4, blank=True, null=True) | |
def __str__(self): | |
"""A string representation of the model.""" | |
return self.name | |
class Meta: | |
managed = False | |
db_table = 'STATE' | |
class VariableType(models.Model): | |
id = models.AutoField(db_column='ID_VARIABLE', primary_key=True) | |
name = models.CharField(db_column='NAME', max_length=30) | |
class Meta: | |
managed = False | |
db_table = 'VARIABLE_TYPE' | |
class Variable(models.Model): | |
id = models.AutoField(db_column='ID_VARIABLE', primary_key=True) | |
type = models.ForeignKey('VariableType', on_delete=models.CASCADE, db_column='TYPE_ID_VARIABLE') | |
name = models.CharField(db_column='NAME', max_length=40) | |
def __str__(self): | |
"""A string representation of the model.""" | |
return self.name | |
class Meta: | |
managed = False | |
db_table = 'VARIABLE' | |
class Unity(models.Model): | |
id = models.AutoField(db_column='ID_UNITY', primary_key=True) | |
name = models.CharField(db_column='NAME', max_length=30) | |
description = models.CharField(db_column='DESCRIPTION', max_length=100) | |
def __str__(self): | |
"""A string representation of the model.""" | |
return self.name | |
class Meta: | |
managed = False | |
db_table = 'UNITY_TABLE' | |
class GlobalVariable(models.Model): | |
id = models.AutoField(db_column='ID_GLOBAL_VARIABLE', primary_key=True) | |
variable = models.ForeignKey('Variable', on_delete=models.CASCADE, db_column='ID_VARIABLE', null=True) | |
unity = models.ForeignKey('Unity', db_column='ID_UNITY', on_delete=models.CASCADE) | |
state = models.ForeignKey('State', related_name='global_variable', on_delete=models.CASCADE, db_column='ID_STATE', null=True, unique=True) | |
value = models.DecimalField(db_column='VALUE', max_digits=18, decimal_places=9, blank=True, null=True) # Field name made lowercase. | |
month = models.DecimalField(db_column='MONTH', max_digits=2, decimal_places=0, blank=True, null=True) # Field name made lowercase. | |
year = models.DecimalField(db_column='YEAR', max_digits=4, decimal_places=0, blank=True, null=True) # Field name made lowercase. | |
class Meta: | |
managed = False | |
db_table = 'GLOBAL_VARIABLE' | |
# views.py | |
from gc import get_objects | |
from django.db.models import F | |
from django.shortcuts import get_object_or_404 | |
from rest_framework import status, viewsets | |
from rest_framework.exceptions import ValidationError | |
from rest_framework.response import Response | |
from rest_framework.serializers import DjangoValidationError | |
from rest_framework.status import (HTTP_201_CREATED, HTTP_400_BAD_REQUEST, | |
HTTP_409_CONFLICT) | |
from .models import GlobalVariable, State, Unity, Variable | |
# from .serializers import (ConflictError, GlobalVariablesSerializer, StateSerializer) | |
from .serializers import GlobalVariablesSerializer, StateSerializer | |
class TaxesAndTariffsViewSet(viewsets.ModelViewSet): | |
""" | |
ViewSet for model Global Variable | |
""" | |
queryset = GlobalVariable.objects.filter(variable_id__name__in=['PIS', 'COFINS', 'COTAÇÃO DO DÓLAR', 'TARIFA FIXA DE RATEIO']).annotate(variable_name=F('variable_id__name'), state_name=F('state_id__name'), unity_type=F('unity_id__name')) | |
# serializer_class = GlobalVariablesSerializer | |
serializer_class = GlobalVariablesSerializer | |
# queryset = GlobalVariable.objects.all() | |
# def create(self, request, *args, **kwargs): | |
# is_many = isinstance(request.data, list) | |
# if not is_many: | |
# return super(TaxesAndTariffsViewSet, self).create(request, *args, **kwargs) | |
# serializers = self.get_serializer(data=request.data, many=True) | |
# serializers.is_valid(raise_exception=True) | |
# self.perform_create(serializers) | |
# headers = self.get_success_headers(serializers.data) | |
# return Response(serializers.data, status=201, headers=headers) | |
def create(self, request, *args, **kwargs): | |
new_global_variables = [self.set_correct_value(global_variable) for global_variable in request.data if global_variable.get('id') is None] | |
registered_global_variables = [self.set_correct_value(global_variable) for global_variable in request.data if global_variable.get('id') is not None] | |
registered_data = GlobalVariablesSerializer(data=registered_global_variables, many=True) | |
if not registered_data.is_valid(True): | |
return Response(registered_data.errors, HTTP_409_CONFLICT if 'must be unique' in str(registered_data.errors) else HTTP_400_BAD_REQUEST) | |
registered_data.update(instance=GlobalVariable.objects.select_for_update().all(), validated_data=registered_global_variables) | |
data = GlobalVariablesSerializer(data=new_global_variables, many=True) | |
if not data.is_valid(True): | |
return Response(data.errors, HTTP_409_CONFLICT if 'must be unique' in str(data.errors) else HTTP_400_BAD_REQUEST) | |
data.save() | |
return Response({'message': 'Success!'}, HTTP_201_CREATED) | |
def set_correct_value(self, global_variable): | |
global_variable['variable'] = Variable.objects.get(name__exact=global_variable['variable']).id | |
global_variable['unity'] = Unity.objects.get(name__exact=global_variable['unity']).id | |
return global_variable | |
class IcmsAliquotViewSet(viewsets.ModelViewSet): | |
queryset = GlobalVariable.objects.filter(variable_id__name__exact='ICMS').annotate(variable_name=F('variable_id__name'), state_name=F('state_id__name'), unity_type=F('unity_id__name')) | |
serializer_class = GlobalVariablesSerializer | |
def create(self, request, *args, **kwargs): | |
try: | |
GlobalVariablesSerializer(data=[data for data in request.data if data.get('id') is not None], many=True).update(instance=GlobalVariable.objects.select_for_update().all(), validated_data=list(filter(lambda data: data.get('id') is not None, request.data))) | |
new_variables = GlobalVariablesSerializer(data=[data for data in request.data if data.get('id') is None], many=True) | |
if new_variables.is_valid(True): | |
new_variables.save() | |
except ValidationError: | |
return Response({'message': 'Solicitação incorreta!'}, HTTP_400_BAD_REQUEST) | |
except ConflictError as conflict_error: | |
return Response(conflict_error.detail, HTTP_409_CONFLICT) | |
return Response({'message': 'Success!'}, HTTP_201_CREATED) | |
class IndexesViewSet(viewsets.ModelViewSet): | |
queryset = GlobalVariable.objects.filter(variable_id__name__in=['IGP-M', 'IPCA']).order_by('-year', '-month').annotate(variable_name=F('variable_id__name'), state_name=F('state_id__name'), unity_type=F('unity_id__name')) | |
serializer_class = GlobalVariablesSerializer | |
def create(self, request, *args, **kwargs): | |
try: | |
GlobalVariablesSerializer(data=[data for data in request.data if data.get('id') is not None], many=True).update(instance=GlobalVariable.objects.select_for_update().all(), validated_data=list(filter(lambda data: data.get('id') is not None, request.data))) | |
new_variables = GlobalVariablesSerializer(data=[data for data in request.data if data.get('id') is None], many=True) | |
if new_variables.is_valid(True): | |
new_variables.save() | |
except ValidationError: | |
return Response({'message': 'Solicitação incorreta!'}, HTTP_400_BAD_REQUEST) | |
except ConflictError as conflict_error: | |
return Response(conflict_error.detail, HTTP_409_CONFLICT) | |
return Response({'message': 'Success!'}, HTTP_201_CREATED) | |
class StateViewSet(viewsets.ModelViewSet): | |
queryset = State.objects.all() | |
serializer_class = StateSerializer | |
# serializers.py | |
from rest_framework import serializers | |
from django.db.models import F, When, Case, Q, Value, Count | |
from django.db import transaction, DatabaseError | |
from .models import GlobalVariable, State, Variable, Unity | |
from rest_framework.exceptions import ValidationError, APIException | |
from django.utils.log import logging | |
class ConflictError(APIException): | |
def __init__(self, detail, code): | |
super().__init__(detail=detail, code=code) | |
class GlobalVariablesListSerializer(serializers.ListSerializer): | |
def update_icms(self, icms_variables): | |
with transaction.atomic(): | |
for global_variable in icms_variables: | |
GlobalVariable.objects.filter(id=global_variable['id']).update( | |
state = Case(When(state=global_variable['state'], then=Value(global_variable['state'])), default=State.objects.filter(id = global_variable['state']).annotate(counter = Count('global_variable')).filter(Q(counter=0) | Q(global_variable__id=global_variable['id'])).first().id), | |
value = global_variable['value'] | |
) | |
def update_ipca(self, ipca_variables): | |
with transaction.atomic(): | |
for global_variable in ipca_variables: | |
GlobalVariable.objects.filter(variable__name__exact=global_variable['variable']).update( | |
month = global_variable['month'], | |
year = global_variable['year'], | |
value = global_variable['value'] | |
) | |
def update(self, instance, validated_data): | |
try: | |
self.update_icms(filter(lambda global_variable: global_variable['variable'] == 'ICMS', validated_data)) | |
self.update_ipca(filter(lambda global_variable: global_variable['variable'] == 'IPCA', validated_data)) | |
except Exception as some_erro: | |
raise ConflictError(f'Já há um ICMS cadastrado para o estado informado. {some_erro}', 409) | |
# class GlobalVariablesListSerializer(serializers.ListSerializer): | |
# def create(self, validated_data): | |
# with transaction.atomic(): | |
# itens = [] | |
# for item in validated_data: | |
# obj, created = GlobalVariable.objects.update_or_create(variable=item['variable'], defaults=item) | |
# itens.append(obj) | |
# return itens | |
class GlobalVariablesSerializer(serializers.ModelSerializer): | |
""" | |
Serializer for model Global Variable | |
""" | |
variable_name = serializers.SerializerMethodField() | |
state_name = serializers.SerializerMethodField() | |
unity_type = serializers.SerializerMethodField() | |
formated_value = serializers.SerializerMethodField() | |
def get_formated_value(self, serialized_object): | |
return f'{serialized_object.value:.4f}' | |
def get_variable_name(self, serialized_object): | |
return serialized_object.variable_name | |
def get_state_name(self, serialized_object): | |
return serialized_object.state_name | |
def get_unity_type(self, serialized_object): | |
return serialized_object.unity_type | |
class Meta: | |
list_serializer_class = GlobalVariablesListSerializer | |
model = GlobalVariable | |
fields = '__all__' | |
# fields = ['id', 'variable', 'unity', 'state', 'variable_name', 'value', 'formated_value', 'month', 'year', 'state_name', 'unity_type'] | |
class StateSerializer(serializers.ModelSerializer): | |
class Meta: | |
model = State | |
fields = '__all__' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment