Created
October 2, 2018 08:59
-
-
Save codeboy/d39412419fbe94566a75e5dc96a6e77c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import os | |
import coreapi | |
import coreschema | |
import re | |
import requests | |
from urllib.parse import urlencode | |
from django.contrib.auth.forms import PasswordResetForm | |
from django.contrib.auth.tokens import PasswordResetTokenGenerator | |
from django.contrib.sites.shortcuts import get_current_site | |
from django.core.files.base import ContentFile | |
from django.core.validators import EmailValidator | |
from django.db.models import Q | |
from django.shortcuts import render, get_object_or_404 | |
from django.contrib.auth import authenticate, login | |
from django.template.loader import render_to_string | |
from django.urls import reverse | |
from django.utils.encoding import force_text, force_bytes | |
from django.utils.http import urlsafe_base64_encode | |
from django.utils.translation import ugettext_lazy as _ | |
# Create your views here. | |
from rest_framework import permissions | |
from rest_framework import viewsets | |
from rest_framework import parsers | |
from rest_framework import mixins | |
from rest_framework.decorators import list_route, detail_route | |
from rest_framework.fields import SkipField | |
from rest_framework.filters import BaseFilterBackend | |
from rest_framework.generics import ListAPIView, CreateAPIView, GenericAPIView, \ | |
UpdateAPIView, RetrieveAPIView, RetrieveUpdateAPIView | |
from rest_framework.pagination import LimitOffsetPagination | |
from rest_framework.response import Response | |
from rest_framework import status | |
from rest_framework import serializers | |
from rest_framework.views import APIView | |
from urllib.parse import urlparse | |
from common import utils as common_utils | |
from common.vt_mail import mail_send | |
from . import models as core_models | |
class ExLimitOffsetPagination(LimitOffsetPagination): | |
max_limit = 20 | |
default_limit = 10 | |
def _paginate_cls(i_max_limit=20, i_default_limit=10): | |
class _LimitOffsetPagination(LimitOffsetPagination): | |
max_limit = i_max_limit | |
default_limit = i_default_limit | |
return _LimitOffsetPagination | |
class BaseFilter(BaseFilterBackend): | |
action_map = {} | |
def get_schema_fields(self, view): | |
return self.action_map.get(view.action, []) | |
class MeasureViewSet(viewsets.ReadOnlyModelViewSet): | |
""" | |
Measure - единица измерения | |
""" | |
permission_classes = [permissions.IsAuthenticated] | |
pagination_class = _paginate_cls(1000, 1000) | |
def get_queryset(self): | |
return core_models.Measure.objects.all() | |
def get_serializer_class(self): | |
class _Serializer(serializers.ModelSerializer): | |
class Meta: | |
model = core_models.Measure | |
fields = ['id', 'full_title', 'short_title'] | |
return _Serializer | |
class MaterialViewSet(viewsets.ReadOnlyModelViewSet): | |
""" | |
Material - Материалы | |
""" | |
permission_classes = [permissions.IsAuthenticated] | |
pagination_class = _paginate_cls(1000, 1000) | |
def get_queryset(self): | |
return core_models.Material.objects.all() | |
def get_serializer_class(self): | |
class _Serializer(serializers.ModelSerializer): | |
class Meta: | |
model = core_models.Material | |
fields = ['id', 'title'] | |
return _Serializer | |
class BuildingObjectViewSet(viewsets.ModelViewSet): | |
""" | |
BuildingObject - строительный объект | |
""" | |
permission_classes = [permissions.IsAuthenticated] | |
pagination_class = ExLimitOffsetPagination | |
def get_queryset(self): | |
# return core_models.BuildingObject.objects.filter(user=self.request.user) | |
return core_models.BuildingObject.objects.all() | |
def get_serializer_class(self): | |
class _Serializer(serializers.ModelSerializer): | |
class Meta: | |
model = core_models.BuildingObject | |
# exclude = ['user', 'order', 'created_at', 'updated_at', 'state'] | |
fields = ['id', 'name', 'user'] | |
return _Serializer | |
class ConstructionObjectViewSet(viewsets.ModelViewSet): | |
""" | |
ConstructionObject - Наименование конструкции | |
""" | |
permission_classes = [permissions.IsAuthenticated] | |
pagination_class = ExLimitOffsetPagination | |
def get_queryset(self): | |
return core_models.ConstructionObject.objects.all() | |
def get_serializer_class(self): | |
class _Serializer(serializers.ModelSerializer): | |
class Meta: | |
model = core_models.ConstructionObject | |
fields = ['id', 'building_object', 'title'] | |
return _Serializer | |
class ObjectWorkViewSet(viewsets.ModelViewSet): | |
""" | |
ObjectWork - работы на объекте | |
""" | |
permission_classes = [permissions.IsAuthenticated] | |
pagination_class = ExLimitOffsetPagination | |
def get_queryset(self): | |
return core_models.ObjectWork.objects.all() | |
def get_serializer_class(self): | |
class _Serializer(serializers.ModelSerializer): | |
class Meta: | |
model = core_models.ObjectWork | |
fields = ['id', 'name', 'status'] | |
return _Serializer | |
class WorkMaterialViewSet(viewsets.ModelViewSet): | |
""" | |
WorkMaterial - материалы для работ | |
""" | |
permission_classes = [permissions.IsAuthenticated] | |
pagination_class = ExLimitOffsetPagination | |
def get_queryset(self): | |
return core_models.WorkMaterial.objects.all() | |
def get_serializer_class(self): | |
class _Serializer(serializers.ModelSerializer): | |
class Meta: | |
model = core_models.WorkMaterial | |
fields = ['id', 'work', 'material', 'planed_value'] | |
return _Serializer | |
class ProducedWorkViewSet(viewsets.ModelViewSet): | |
""" | |
WorkMaterial - материалы для работ | |
""" | |
permission_classes = [permissions.IsAuthenticated] | |
pagination_class = ExLimitOffsetPagination | |
def get_queryset(self): | |
return core_models.ProducedWork.objects.all() | |
def get_serializer_class(self): | |
class _Serializer(serializers.ModelSerializer): | |
class Meta: | |
model = core_models.ProducedWork | |
fields = ['id', 'work', 'done_at', 'material', 'value'] | |
return _Serializer | |
from thirdparty.cutstock.cuttingstock.GreedySolver import * | |
#todo: rework response for statuses and content | |
class CutCalculateViewSet(viewsets.GenericViewSet): | |
permission_classes = [permissions.IsAuthenticated] | |
def get_serializer_class(self): | |
class _Serializer(serializers.Serializer): | |
cut_pieces = serializers.ListField(help_text='list like - [1,2,3]') | |
cut_length = serializers.IntegerField() | |
return _Serializer | |
@list_route(['post']) | |
def cut_log(self, request, *args, **kwargs): | |
r_status = 'ok' | |
# print(request.data) | |
srz = self.get_serializer(data=request.data) | |
# srz.is_valid(raise_exception=True) | |
srz_valid = srz.is_valid() | |
cut_pieces = request.data.get('cut_pieces', False) | |
cut_length = request.data.get('cut_length', False) | |
if srz_valid and cut_pieces and cut_length: | |
result, count_logs = self._calculate(cut_pieces, cut_length) | |
r_content = { | |
'cut_pieces':cut_pieces, | |
'cut_length':cut_length, | |
'result': result, | |
'count_logs': count_logs | |
} | |
else: | |
r_status = 'error' | |
r_content = srz.errors | |
r_data = {'status': r_status, 'content': r_content} | |
return Response(r_data) | |
# todo: add type checking for integer | |
def _calculate(self, cut_pieces, cut_length): | |
cut_input = dict() | |
# перебираем кусочки в словаре и создаём другой для расчётов | |
for piece in cut_pieces: | |
quantity = int(piece['quantity']) | |
size = int(piece['size']) | |
if not cut_input.get(size, False): # проверка на уже существующий элемент todo: rework in frontend | |
if quantity >= 1 and size >= 1: # не ноль | |
cut_input[size] = quantity | |
cut_length = int(cut_length) | |
a = GreedySolver(cut_input, cut_length) | |
cutPatterns = a.getResult() | |
count_logs = len(cutPatterns) | |
new_cut = list() | |
new_count = dict() | |
for combination in cutPatterns: | |
comb = combination.size_qty | |
if comb in new_cut: | |
new_count[new_cut.index(comb)]['count'] += 1 | |
else: | |
new_cut.append(comb) | |
new_count[new_cut.index(comb)] = { | |
'count': 1, | |
'waste': cut_length - combination.getCombinationSize(), | |
'combination':combination.printCombi() | |
} | |
final_cut = list() | |
for k,v in new_count.items(): | |
final_cut.append({ | |
'combination': new_cut[k], | |
'combination_clean': v['combination'], | |
'count': v['count'], | |
'waste': v['waste'] | |
}) | |
return final_cut, count_logs | |
################################################################### | |
class Base64FileField(serializers.FileField): | |
def to_internal_value(self, data): | |
if isinstance(data, str) and 'base64' in data[0:100]: | |
# base64 encoded image - decode | |
# our custom format file:1.jpg;data: | |
format, filestr = data.split( | |
';base64,') # format ~= file:;Mime type;encoding, b64code | |
filename = format.split(';')[0].split(':')[-1] # guess file name | |
data = ContentFile(base64.b64decode(filestr), name=filename) | |
return super(Base64FileField, self).to_internal_value(data) | |
class Base64ImageField(serializers.ImageField): | |
def to_internal_value(self, data): | |
if isinstance(data, str) and data.startswith('data:image'): | |
# base64 encoded image - decode | |
format, imgstr = data.split(';base64,') # format ~= data:image/X, | |
ext = format.split('/')[-1] # guess file extension | |
data = ContentFile(base64.b64decode(imgstr), name='temp.' + ext) | |
return super(Base64ImageField, self).to_internal_value(data) | |
class ExCharField(serializers.CharField): | |
def get_attribute(self, instance): | |
raise SkipField() | |
class ExEmailField(serializers.EmailField): | |
def get_attribute(self, instance): | |
raise SkipField() | |
class ExBooleanField(serializers.BooleanField): | |
def get_attribute(self, instance): | |
raise SkipField() | |
class BaseDictViewSet(viewsets.ReadOnlyModelViewSet): | |
class BaseDictBackend(BaseFilter): | |
action_map = { | |
'list': [ | |
coreapi.Field( | |
name='term', | |
location='path', | |
required=False, | |
type='string' | |
) | |
] | |
} | |
def filter_queryset(self, request, queryset, view): | |
term = request.query_params.get('term') | |
if term: | |
return queryset.filter(title__icontains=term) | |
return queryset | |
permission_classes = [permissions.AllowAny] | |
pagination_class = _paginate_cls(1000, 1000) | |
filter_backends = [BaseDictBackend] | |
def get_serializer_class(self): | |
self_model = self.queryset.model | |
class _Serializer(serializers.ModelSerializer): | |
class Meta: | |
fields = ['id', 'title'] | |
model = self_model | |
return _Serializer | |
class UserCreateAPIView(CreateAPIView): | |
permission_classes = [permissions.AllowAny] | |
def get_serializer_class(self): | |
request = self.request | |
class _Serializer(serializers.ModelSerializer): | |
password = ExCharField() | |
password_repeat = ExCharField() | |
sms = ExCharField() | |
is_agreement = ExBooleanField() | |
def validate(self, attrs): | |
if core_models.User.objects.filter(email=attrs['email']): | |
raise serializers.ValidationError( | |
{'email': [_('User with same email exists')]}) | |
if core_models.User.objects.filter(phone=attrs['phone']): | |
raise serializers.ValidationError( | |
{'phone': [_('User with same phone exists')]}) | |
if attrs['password'] != attrs['password_repeat']: | |
raise serializers.ValidationError({ | |
'password': [_('Passwords not equal')], | |
'password_repeat': [_('Passwords not equal')] | |
}) | |
if attrs['sms'] != request.session.get('sms'): | |
raise serializers.ValidationError({ | |
'sms': [_('Wrong sms')] | |
}) | |
return attrs | |
def create(self, validated_data): | |
user = core_models.User.objects.create( | |
email=validated_data['email'], | |
first_name=validated_data['first_name'], | |
last_name=validated_data['last_name'], | |
phone=validated_data['phone'] | |
) | |
user.set_password(validated_data['password']) | |
user.save() | |
user_auth = authenticate( | |
username=user.email, | |
password=validated_data['password'], | |
) | |
login(request, user_auth) | |
return user | |
class Meta: | |
model = core_models.User | |
fields = ['first_name', | |
'last_name', | |
'middle_name', | |
'is_agreement', | |
'sms', | |
'email', | |
'phone', | |
'password', | |
'password_repeat'] | |
write_only_fields = ['password', 'password_repeat'] | |
read_only_fields = ['id'] | |
return _Serializer | |
class ProfileUpdateAPIView(RetrieveUpdateAPIView): | |
# class UserUpdateAPIView(RetrieveAPIView): | |
permission_classes = [permissions.IsAuthenticated] | |
pagination_class = None | |
def get_object(self): | |
return self.request.user | |
# | |
# def get_queryset(self): | |
# return core_models.User.objects.filter(pk=self.request.user.pk) | |
def get_serializer_class(self): | |
data = self.request.data | |
required = {'required': True, 'allow_blank': False} | |
ekw = {'first_name': required, 'last_name': required} | |
user = self.request.user | |
if data and data['is_breeder']: | |
for field_name in ['address', 'office', 'education']: | |
ekw[field_name] = required | |
ekw['language'] = {'required': True, 'allow_null': False} | |
ekw['country'] = {'required': True, 'allow_null': False} | |
ekw['birth'] = {'required': True, 'allow_null': False} | |
if self.request.method == 'PUT' and data.get('image') and data[ | |
'image'].startswith('http') and user.image: | |
data['image'] = user.image | |
class _Serializer(serializers.ModelSerializer): | |
image = Base64ImageField(allow_null=True) | |
parser_classes = [parsers.FormParser, parsers.MultiPartParser] | |
class Meta: | |
model = core_models.User | |
fields = ['first_name', 'last_name', 'middle_name', | |
'image', | |
'country', 'city', 'address', 'office', 'birth', | |
'about', 'language', 'education', 'course', 'link'] | |
extra_kwargs = ekw | |
return _Serializer | |
class LoginAPIView(APIView): | |
permission_classes = [permissions.AllowAny] | |
class _Serializer(serializers.Serializer): | |
username = serializers.CharField() | |
password = serializers.CharField() | |
def validate(self, attrs): | |
user = core_models.User.objects.filter(Q(email=attrs['username']) | |
| Q( | |
phone=attrs['username'])).first() | |
err = { | |
'username': [_('wrong password or email or phone')], | |
'password': [_('wrong password or email or phone')] | |
} | |
if not user: | |
raise serializers.ValidationError(err) | |
user_auth = authenticate( | |
username=user.email, | |
password=attrs['password'], | |
) | |
if not user_auth: | |
raise serializers.ValidationError(err) | |
attrs['user_auth'] = user_auth | |
return attrs | |
def post(self, request, *args, **kwargs): | |
serializer = LoginAPIView._Serializer(data=request.data) | |
serializer.is_valid(raise_exception=True) | |
validated_data = serializer.validated_data | |
login(request, validated_data['user_auth']) | |
return Response({'success': True}) | |
class AuthManagerViewSet(viewsets.GenericViewSet): | |
# filter_backends = [PetViewSetBackend] | |
email_template_name = 'apps/activation/mails/password_reset.j2', | |
permission_classes = [permissions.AllowAny] | |
# def _msg_email_get(self, request, **kwargs): | |
# from django.utils.http import urlsafe_base64_encode | |
# from django.contrib.auth.tokens import PasswordResetTokenGenerator | |
# from django.conf import settings | |
# | |
# current_site = get_current_site(request) | |
# site_name = current_site.name | |
# domain = current_site.domain | |
# context = { 'domain': domain, | |
# 'site_name': site_name, | |
# 'uid': urlsafe_base64_encode(force_bytes(self.user.pk)), | |
# 'user': self.user, | |
# 'token': PasswordResetTokenGenerator().make_token(self.user), | |
# 'protocol': 'https' if kwargs.get('use_https') else 'http'} | |
# return render_to_string(self.email_template_name, context) | |
# | |
# def _email_send(self, request): | |
# from django.core.mail import EmailMultiAlternatives | |
# email_message = EmailMultiAlternatives(_('password recovery'), | |
# self._msg_email_get(request), getattr(settings, 'DEFAULT_FROM_EMAIL'), [self.user.email]) | |
# html_email = loader.render_to_string(html_email_template_name, context) | |
# email_message.attach_alternative(html_email, 'text/html') | |
# | |
# email_message.send() | |
def get_serializer_class(self): | |
if self.action == 'reset_password': | |
class _Serializer(serializers.Serializer): | |
username = serializers.CharField() | |
def validate(self, attrs): | |
self.is_phone = False | |
# EmailValidator(message=self.error_messages['invalid']) | |
# +7950 0376541 (11) | |
username = attrs['username'].strip('+') | |
if '@' in username: | |
try: | |
EmailValidator(message=_('invalid email'))(username) | |
self.user = core_models.User.objects.get( | |
email=username) | |
except Exception as e: | |
raise serializers.ValidationError( | |
{'username': [_('invalid email')]}) | |
# if not core_models.User.objects.filter() | |
else: | |
self.is_phone = True | |
try: | |
re.match('\d{11}', username).group(0) | |
self.user = core_models.User.objects.get( | |
phone=username) | |
except Exception as e: | |
raise serializers.ValidationError( | |
{'username': [_('invalid phone')]}) | |
return attrs | |
return _Serializer | |
return None | |
@list_route(['post']) | |
def reset_password(self, request, *args, **kwargs): | |
srz = self.get_serializer(data=request.data) | |
srz.is_valid(raise_exception=True) | |
if srz.is_phone: | |
# TODO: maybe add phone rcovery | |
pass | |
site = get_current_site(request) | |
token = PasswordResetTokenGenerator().make_token(srz.user) | |
url = 'http://{}{}'.format(site.domain, | |
reverse('activation:password_reset_confirm', | |
kwargs={'uidb64': urlsafe_base64_encode( | |
force_bytes(srz.user.pk)), 'token': token})) | |
html = render_to_string(self.email_template_name, { | |
'user': srz.user, | |
'url': url, | |
'site': site | |
}) | |
try: | |
mail_send(srz.user.email, message_html=html, | |
subject=_('password reset')) | |
return Response({'success': True, 'email': srz.user.email}) | |
except Exception as e: | |
return Response({'success': False, 'error': str(e)}) | |
return Response({'hello': 'world'}) | |
class CountryViewSet(BaseDictViewSet): | |
queryset = core_models.Country.objects.all() | |
class CityViewSet(viewsets.ReadOnlyModelViewSet): | |
queryset = core_models.City.objects.all() | |
permission_classes = [permissions.AllowAny] | |
pagination_class = _paginate_cls(1000, 1000) | |
def get_serializer_class(self): | |
self_model = self.queryset.model | |
class _Serializer(serializers.ModelSerializer): | |
class Meta: | |
fields = ['id', 'title'] | |
model = self_model | |
return _Serializer | |
class CityDictBackend(BaseFilter): | |
action_map = { | |
'list': [ | |
coreapi.Field( | |
name='term', | |
location='path', | |
required=False, | |
type='string' | |
), | |
coreapi.Field( | |
name='country_id', | |
location='path', | |
required=False, | |
type='string' | |
) | |
], | |
} | |
def filter_queryset(self, request, queryset, view): | |
term = request.query_params.get('term') | |
country_id = request.query_params.get('country_id') | |
if term: | |
queryset = queryset.filter(title__icontains=term) | |
if country_id: | |
queryset = queryset.filter(country_id=country_id) | |
return queryset | |
filter_backends = [CityDictBackend] | |
@list_route(['post']) | |
def city_set(self, request, *args, **kwargs): | |
city = get_object_or_404(core_models.City, | |
pk=request.data.get('city_id')) | |
if request.user.is_authenticated(): | |
request.user.city = city | |
request.user.country_id = city.country_id | |
request.user.save() | |
else: | |
request.session['current_city_id'] = city.pk | |
return Response({'city': city.title, 'country': city.country.title}) | |
class LanguageViewSet(BaseDictViewSet): | |
queryset = core_models.Language.objects.all() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment