Created
May 20, 2021 19:16
-
-
Save jerinisready/0210466b94823bcf4292effcd46c9123 to your computer and use it in GitHub Desktop.
Djagno ORM Oscar Postgres Product listing filtering search and sort views in drf!
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@api_view() | |
def filter_options(request, pk): | |
""" pass parameters to create dynamic filter in frontend! """ | |
attrs = ProductAttribute.objects.filter( | |
is_varying=True, product_class__id=pk | |
).prefetch_related('productattributevalue_set') | |
return Response({ | |
'results': [{ | |
'code': attr.code, | |
'label': attr.name, | |
'val': to_client_dict({__(value) for value in attr.productattributevalue_set.all()}) | |
} for attr in attrs if attr.productattributevalue_set.exists()] | |
}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from django.conf import settings | |
from django.db.models import Q, F, Count, Max, Min, Case, When, CharField | |
from oscar.core.loading import get_model | |
from rest_framework.generics import get_object_or_404 | |
from apps.catalogue.models import Category, Product | |
from django.contrib.postgres.search import SearchQuery, SearchVector, SearchRank | |
from django.contrib.postgres.search import TrigramSimilarity | |
from lib.product_utils.search import _trigram_search, _simple_search, _similarity_with_rank_search, _similarity_search | |
ProductClass = get_model('catalogue', 'ProductClass') | |
def category_filter(queryset, category_slug, return_as_tuple=False): | |
cat = get_object_or_404(Category, slug=category_slug) | |
out = [queryset.filter( | |
productcategory__category__in=Category.objects.filter(path__startswith=cat.path)), cat] | |
return out if return_as_tuple else out[0] | |
def brand_filter(queryset, brand_ids): | |
return queryset.filter(brand__in=brand_ids) | |
def apply_filter(queryset, _filter, null_value_compatability='__'): | |
""" | |
_filter: | |
input = weight:25,30,35::minprice:25::maxprice:45::available_only:1::color=Red,Black,Blue::ram:4 GB,8 GB | |
_flt = [ | |
weight__in : [25, 30, 35], | |
minprice : 25, | |
maxprice : 45, | |
available_only : 1 | |
color: [Red,Black,Blue] | |
ram:[4 GB,8 GB] | |
] | |
""" | |
filter_values_set = _filter.split('::') | |
filter_params = {} | |
def set_key(key, value): | |
return k + '__in' if ',' in v else k | |
def set_value(key, value): | |
return [_v.strip() for _v in v.split(',')] if ',' in v else v.strip() | |
for filter_values in filter_values_set: | |
if ':' in filter_values and not filter_values.endswith(f':{null_value_compatability}'): | |
k, v = filter_values.split(':', 1) | |
# managed already | |
# if v and v == null_value_compatability: | |
# continue # frontend comfortability : frontend generated null value | |
filter_params[set_key(k, v)] = set_value(k, v) | |
price_from = price_to = None | |
if 'minprice' in filter_params.keys() and filter_params['minprice'] == null_value_compatability: | |
price_from = filter_params.pop('minprice') | |
if 'maxprice' in filter_params.keys() and filter_params['maxprice'] == null_value_compatability: | |
price_to = filter_params.pop('maxprice') | |
exclude_out_of_stock = filter_params.pop('available_only') if 'available_only' in filter_params else None | |
if price_from and price_to: | |
price_from, price_to = min(price_from, price_to), max(price_from, price_to), | |
queryset = queryset.filter(effective_price__range=(price_from, price_to)) | |
elif price_from: | |
queryset = queryset.filter(effective_price__gte=price_from) | |
elif price_to: | |
queryset = queryset.filter(effective_price__lte=price_to) | |
if exclude_out_of_stock: | |
queryset = queryset.filter(effective_price__isnull=False) | |
if exclude_out_of_stock: | |
queryset = queryset.filter(effective_price__isnull=False) | |
queryset = queryset.filter_by_attributes(**filter_params) | |
return queryset | |
def apply_search(queryset, search: str, mode: str = '_trigram', extends: bool = True): | |
""" | |
search : string | |
mode : selector_functions | |
* _trigram * _simple | |
* _similarity_rank * _similarity | |
extends : Want Unmatched products if not match found? | |
""" | |
if mode == '_trigram': # default | |
filter_func = _trigram_search | |
elif mode == '_simple': | |
filter_func = _simple_search | |
elif mode == '_similarity_rank': | |
filter_func = _similarity_with_rank_search | |
elif mode == '_similarity': | |
filter_func = _similarity_search | |
else: | |
raise Exception('Invalid Search Mode') | |
return filter_func(queryset, search, extends=extends) | |
def apply_sort(queryset, sort=None): | |
if sort is not None: | |
return queryset.order_by(*sort) | |
return queryset | |
def recommended_class(queryset): | |
# Computation in python. Query optimization verified! average of 5-18 iterations and 8 query hits | |
values = queryset.values('id', 'product_class', 'parent__product_class') | |
struct = {} | |
max_id = None | |
max_count = 0 | |
for item in values: | |
key = item['product_class'] or item['parent__product_class'] | |
if key not in struct.keys(): | |
struct[key] = 1 | |
else: | |
struct[key] += 1 | |
if max_count <= struct[key]: | |
max_id = key | |
max_count = struct[key] | |
if len(values) and max_count * 1.0 > len(values) * 3 / 4 or settings.DEBUG: # at least 3/4th are of same class. | |
return { | |
'id': max_id, | |
**Product.objects.filter(effective_price__isnull=False, product_class_id=max_id).aggregate( | |
max_price=Max('effective_price'), | |
min_price=Min('effective_price'), | |
) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
get_product_search_handler_class = get_class( | |
'catalogue.search_handlers', 'get_product_search_handler_class') | |
_ = lambda x: x | |
Category = get_model('catalogue', 'Category') | |
# sorting | |
RELEVANCY = "relevancy" | |
TOP_RATED = "rating" | |
NEWEST = "newest" | |
PRICE_HIGH_TO_LOW = "price-desc" | |
PRICE_LOW_TO_HIGH = "price-asc" | |
TITLE_A_TO_Z = "title-asc" | |
TITLE_Z_TO_A = "title-desc" | |
SORT_BY_CHOICES = [ | |
(RELEVANCY, _("Relevancy")), (TOP_RATED, _("Customer rating")), (NEWEST, _("Newest")), | |
(PRICE_HIGH_TO_LOW, _("Price high to low")), (PRICE_LOW_TO_HIGH, _("Price low to high")), | |
(TITLE_A_TO_Z, _("Title A to Z")), (TITLE_Z_TO_A, _("Title Z to A")), | |
] | |
SORT_BY_MAP = { | |
TOP_RATED: '-rating', NEWEST: '-date_created', PRICE_HIGH_TO_LOW: '-effective_price', | |
PRICE_LOW_TO_HIGH: 'effective_price', TITLE_A_TO_Z: 'title', TITLE_Z_TO_A: '-title', | |
} | |
# FILTERING | |
FILTER_BY_CHOICES = [ | |
('exclude_out_of_stock', _("Exclude Out Of Stock")), | |
('price__range', _("Price Range")), | |
('width', _("Width")), | |
('height', _("Height")), | |
('material', _('Material')), | |
] | |
@api_view() | |
def product_list(request, category='all', **kwargs): | |
""" | |
PRODUCT LISTING API, (powering, list /c/all/, /c/<category_slug>/, ) | |
q = " A search term " | |
product_range = '<product-range-id>' | |
sort = any one from ['relevancy', 'rating', 'newest', 'price-desc', 'price-asc', 'title-asc', 'title-desc'] | |
filter = minprice:25::maxprice:45::available_only:1::color=Red,Black,Blue::weight:25,30,35::ram:4 GB,8 GB | |
Where minprice, maxprice and available_only are common for all. | |
other dynamic parameters are available at reverse('wnc-filter-options', kwarg={'pk': '<ProductClass: id>'}) | |
""" | |
queryset = Product.browsable.browsable() | |
serializer_class = custom_ProductListSerializer | |
_search = request.GET.get('q') | |
_sort = request.GET.get('sort') | |
_filter = request.GET.get('filter') | |
_offer_category = request.GET.get('offer_category') | |
_product_range = request.GET.get('product_range') | |
page_number = int(request.GET.get('page', '1')) | |
page_size = int(request.GET.get('page_size', str(settings.DEFAULT_PAGE_SIZE))) | |
out = {} | |
# search_handler = get_product_search_handler_class()(request.GET, request.get_full_path(), []) | |
if _product_range: | |
product_range = get_object_or_404(Range, pk=_product_range) | |
queryset = product_range.all_products().filter(is_public=True) | |
elif _offer_category: | |
offer_banner_object = get_object_or_404(OfferBanner, code=_offer_category, offer__status=ConditionalOffer.OPEN) | |
queryset = offer_banner_object.products().filter(is_public=True) | |
elif category != 'all': | |
queryset = category_filter(queryset=queryset, category_slug=category) | |
if _filter: | |
""" | |
input = weight__in:25,30,35|price__gte:25|price__lte:45 | |
""" | |
queryset = apply_filter(queryset=queryset, _filter=_filter) | |
if _search: | |
queryset = apply_search(queryset=queryset, search=_search) | |
if _sort: | |
_sort = [SORT_BY_MAP[key] for key in _sort.split(',') if key and key in SORT_BY_MAP.keys()] | |
queryset = apply_sort(queryset=queryset, sort=_sort) | |
def _inner(): | |
nonlocal queryset, page_number | |
# queryset = queryset.browsable().base_queryset() | |
paginator = Paginator(queryset, page_size) # Show 18 contacts per page. | |
empty_list = False | |
try: | |
page_number = paginator.validate_number(page_number) | |
except PageNotAnInteger: | |
page_number = 1 | |
except EmptyPage: | |
page_number = paginator.num_pages | |
empty_list = True | |
page_obj = paginator.get_page(page_number) | |
if not empty_list: | |
product_data = get_optimized_product_dict(qs=page_obj.object_list, request=request).values() | |
# product_data = serializer_class(page_obj.object_list, many=True, context={'request': request}).data | |
else: | |
product_data = [] | |
rc = None | |
return list_api_formatter(request, page_obj=page_obj, results=product_data, product_class=rc) | |
if page_size == settings.DEFAULT_PAGE_SIZE and page_number <= 4 and not any([_search, _filter, _sort, _offer_category, _product_range, ]): | |
c_key = cache_key.product_list__key.format(page_number, page_size, category) | |
# if settings.DEBUG: | |
# cache.delete(c_key) | |
out = cache_library(c_key, cb=_inner, ttl=180) | |
else: | |
out = _inner() | |
return Response(out) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from django.db.models import F | |
from apps.catalogue.models import Category, Product | |
# add 'django.contrib.postgres' to INSTALLED_APPS | |
from django.contrib.postgres.search import SearchQuery, SearchVector, SearchRank | |
from django.contrib.postgres.search import TrigramSimilarity | |
def _trigram_search(queryset, search, extends=True): | |
trigram_similarity = TrigramSimilarity('title', search) | |
query = SearchQuery(search) | |
for s in search.split(' '): | |
query |= SearchQuery(s) | |
return queryset.annotate( | |
similarity=trigram_similarity, | |
).filter( | |
similarity__gt=0, | |
).order_by('-similarity') | |
def _similarity_with_rank_search(queryset, search, extends=False): | |
query = SearchQuery(search) | |
for s in search.split(' '): | |
query |= SearchQuery(s) | |
return queryset.annotate( | |
rank=SearchRank(F('search'), query), | |
).filter(rank__gt=0).order_by('-rank') | |
def _similarity_search(queryset, search, extends=True): | |
query = SearchQuery(search) | |
for s in search.split(' '): | |
query |= SearchQuery(s) | |
return queryset.annotate( | |
).filter(search=query) | |
def _simple_search(queryset, search, extends=True): | |
return Product.objects.filter(title__search=search).values_list('id', flat=True) | |
def apply_search(queryset, search: str, mode: str = '_trigram', extends: bool = True): | |
""" | |
search : string | |
mode : selector_functions | |
* _trigram * _simple | |
* _similarity_rank * _similarity | |
extends : Want Unmatched products if not match found? | |
""" | |
if mode == '_trigram': # default | |
filter_func = _trigram_search | |
elif mode == '_simple': | |
filter_func = _simple_search | |
elif mode == '_similarity_rank': | |
filter_func = _similarity_with_rank_search | |
elif mode == '_similarity': | |
filter_func = _similarity_search | |
else: | |
raise Exception('Invalid Search Mode') | |
return filter_func(queryset, search, extends=extends) | |
def apply_sort(queryset, sort=None): | |
if sort is not None: | |
return queryset.order_by(*sort) | |
return queryset | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@api_view() | |
def product_suggestions(request, **kwargs): | |
queryset = Product.browsable.browsable() | |
_search = request.GET.get('q') | |
_max_size = 10 | |
out = {'results': [], 'class': None, } | |
if _search: | |
queryset = apply_search(queryset=queryset, search=_search) | |
rc = recommended_class(queryset) | |
queryset = queryset.values('title', 'slug') | |
out['results'] = queryset[:_max_size] | |
out['class'] = rc | |
# return JsonResponse(out, status=(400 if len(out['results']) == 0 else 200)) | |
return Response(out, status=(400 if len(out['results']) == 0 else 200)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment