Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save jerinisready/0210466b94823bcf4292effcd46c9123 to your computer and use it in GitHub Desktop.
Save jerinisready/0210466b94823bcf4292effcd46c9123 to your computer and use it in GitHub Desktop.
Djagno ORM Oscar Postgres Product listing filtering search and sort views in drf!
@api_view()
def filter_options(request, pk):
""" pass parameters to create dynamic filter in frontend! """
attrs = ProductAttribute.objects.filter(
is_varying=True, product_class__id=pk
).prefetch_related('productattributevalue_set')
return Response({
'results': [{
'code': attr.code,
'label': attr.name,
'val': to_client_dict({__(value) for value in attr.productattributevalue_set.all()})
} for attr in attrs if attr.productattributevalue_set.exists()]
})
from django.conf import settings
from django.db.models import Q, F, Count, Max, Min, Case, When, CharField
from oscar.core.loading import get_model
from rest_framework.generics import get_object_or_404
from apps.catalogue.models import Category, Product
from django.contrib.postgres.search import SearchQuery, SearchVector, SearchRank
from django.contrib.postgres.search import TrigramSimilarity
from lib.product_utils.search import _trigram_search, _simple_search, _similarity_with_rank_search, _similarity_search
ProductClass = get_model('catalogue', 'ProductClass')
def category_filter(queryset, category_slug, return_as_tuple=False):
cat = get_object_or_404(Category, slug=category_slug)
out = [queryset.filter(
productcategory__category__in=Category.objects.filter(path__startswith=cat.path)), cat]
return out if return_as_tuple else out[0]
def brand_filter(queryset, brand_ids):
return queryset.filter(brand__in=brand_ids)
def apply_filter(queryset, _filter, null_value_compatability='__'):
"""
_filter:
input = weight:25,30,35::minprice:25::maxprice:45::available_only:1::color=Red,Black,Blue::ram:4 GB,8 GB
_flt = [
weight__in : [25, 30, 35],
minprice : 25,
maxprice : 45,
available_only : 1
color: [Red,Black,Blue]
ram:[4 GB,8 GB]
]
"""
filter_values_set = _filter.split('::')
filter_params = {}
def set_key(key, value):
return k + '__in' if ',' in v else k
def set_value(key, value):
return [_v.strip() for _v in v.split(',')] if ',' in v else v.strip()
for filter_values in filter_values_set:
if ':' in filter_values and not filter_values.endswith(f':{null_value_compatability}'):
k, v = filter_values.split(':', 1)
# managed already
# if v and v == null_value_compatability:
# continue # frontend comfortability : frontend generated null value
filter_params[set_key(k, v)] = set_value(k, v)
price_from = price_to = None
if 'minprice' in filter_params.keys() and filter_params['minprice'] == null_value_compatability:
price_from = filter_params.pop('minprice')
if 'maxprice' in filter_params.keys() and filter_params['maxprice'] == null_value_compatability:
price_to = filter_params.pop('maxprice')
exclude_out_of_stock = filter_params.pop('available_only') if 'available_only' in filter_params else None
if price_from and price_to:
price_from, price_to = min(price_from, price_to), max(price_from, price_to),
queryset = queryset.filter(effective_price__range=(price_from, price_to))
elif price_from:
queryset = queryset.filter(effective_price__gte=price_from)
elif price_to:
queryset = queryset.filter(effective_price__lte=price_to)
if exclude_out_of_stock:
queryset = queryset.filter(effective_price__isnull=False)
if exclude_out_of_stock:
queryset = queryset.filter(effective_price__isnull=False)
queryset = queryset.filter_by_attributes(**filter_params)
return queryset
def apply_search(queryset, search: str, mode: str = '_trigram', extends: bool = True):
"""
search : string
mode : selector_functions
* _trigram * _simple
* _similarity_rank * _similarity
extends : Want Unmatched products if not match found?
"""
if mode == '_trigram': # default
filter_func = _trigram_search
elif mode == '_simple':
filter_func = _simple_search
elif mode == '_similarity_rank':
filter_func = _similarity_with_rank_search
elif mode == '_similarity':
filter_func = _similarity_search
else:
raise Exception('Invalid Search Mode')
return filter_func(queryset, search, extends=extends)
def apply_sort(queryset, sort=None):
if sort is not None:
return queryset.order_by(*sort)
return queryset
def recommended_class(queryset):
# Computation in python. Query optimization verified! average of 5-18 iterations and 8 query hits
values = queryset.values('id', 'product_class', 'parent__product_class')
struct = {}
max_id = None
max_count = 0
for item in values:
key = item['product_class'] or item['parent__product_class']
if key not in struct.keys():
struct[key] = 1
else:
struct[key] += 1
if max_count <= struct[key]:
max_id = key
max_count = struct[key]
if len(values) and max_count * 1.0 > len(values) * 3 / 4 or settings.DEBUG: # at least 3/4th are of same class.
return {
'id': max_id,
**Product.objects.filter(effective_price__isnull=False, product_class_id=max_id).aggregate(
max_price=Max('effective_price'),
min_price=Min('effective_price'),
)
}
get_product_search_handler_class = get_class(
'catalogue.search_handlers', 'get_product_search_handler_class')
_ = lambda x: x
Category = get_model('catalogue', 'Category')
# sorting
RELEVANCY = "relevancy"
TOP_RATED = "rating"
NEWEST = "newest"
PRICE_HIGH_TO_LOW = "price-desc"
PRICE_LOW_TO_HIGH = "price-asc"
TITLE_A_TO_Z = "title-asc"
TITLE_Z_TO_A = "title-desc"
SORT_BY_CHOICES = [
(RELEVANCY, _("Relevancy")), (TOP_RATED, _("Customer rating")), (NEWEST, _("Newest")),
(PRICE_HIGH_TO_LOW, _("Price high to low")), (PRICE_LOW_TO_HIGH, _("Price low to high")),
(TITLE_A_TO_Z, _("Title A to Z")), (TITLE_Z_TO_A, _("Title Z to A")),
]
SORT_BY_MAP = {
TOP_RATED: '-rating', NEWEST: '-date_created', PRICE_HIGH_TO_LOW: '-effective_price',
PRICE_LOW_TO_HIGH: 'effective_price', TITLE_A_TO_Z: 'title', TITLE_Z_TO_A: '-title',
}
# FILTERING
FILTER_BY_CHOICES = [
('exclude_out_of_stock', _("Exclude Out Of Stock")),
('price__range', _("Price Range")),
('width', _("Width")),
('height', _("Height")),
('material', _('Material')),
]
@api_view()
def product_list(request, category='all', **kwargs):
"""
PRODUCT LISTING API, (powering, list /c/all/, /c/<category_slug>/, )
q = " A search term "
product_range = '<product-range-id>'
sort = any one from ['relevancy', 'rating', 'newest', 'price-desc', 'price-asc', 'title-asc', 'title-desc']
filter = minprice:25::maxprice:45::available_only:1::color=Red,Black,Blue::weight:25,30,35::ram:4 GB,8 GB
Where minprice, maxprice and available_only are common for all.
other dynamic parameters are available at reverse('wnc-filter-options', kwarg={'pk': '<ProductClass: id>'})
"""
queryset = Product.browsable.browsable()
serializer_class = custom_ProductListSerializer
_search = request.GET.get('q')
_sort = request.GET.get('sort')
_filter = request.GET.get('filter')
_offer_category = request.GET.get('offer_category')
_product_range = request.GET.get('product_range')
page_number = int(request.GET.get('page', '1'))
page_size = int(request.GET.get('page_size', str(settings.DEFAULT_PAGE_SIZE)))
out = {}
# search_handler = get_product_search_handler_class()(request.GET, request.get_full_path(), [])
if _product_range:
product_range = get_object_or_404(Range, pk=_product_range)
queryset = product_range.all_products().filter(is_public=True)
elif _offer_category:
offer_banner_object = get_object_or_404(OfferBanner, code=_offer_category, offer__status=ConditionalOffer.OPEN)
queryset = offer_banner_object.products().filter(is_public=True)
elif category != 'all':
queryset = category_filter(queryset=queryset, category_slug=category)
if _filter:
"""
input = weight__in:25,30,35|price__gte:25|price__lte:45
"""
queryset = apply_filter(queryset=queryset, _filter=_filter)
if _search:
queryset = apply_search(queryset=queryset, search=_search)
if _sort:
_sort = [SORT_BY_MAP[key] for key in _sort.split(',') if key and key in SORT_BY_MAP.keys()]
queryset = apply_sort(queryset=queryset, sort=_sort)
def _inner():
nonlocal queryset, page_number
# queryset = queryset.browsable().base_queryset()
paginator = Paginator(queryset, page_size) # Show 18 contacts per page.
empty_list = False
try:
page_number = paginator.validate_number(page_number)
except PageNotAnInteger:
page_number = 1
except EmptyPage:
page_number = paginator.num_pages
empty_list = True
page_obj = paginator.get_page(page_number)
if not empty_list:
product_data = get_optimized_product_dict(qs=page_obj.object_list, request=request).values()
# product_data = serializer_class(page_obj.object_list, many=True, context={'request': request}).data
else:
product_data = []
rc = None
return list_api_formatter(request, page_obj=page_obj, results=product_data, product_class=rc)
if page_size == settings.DEFAULT_PAGE_SIZE and page_number <= 4 and not any([_search, _filter, _sort, _offer_category, _product_range, ]):
c_key = cache_key.product_list__key.format(page_number, page_size, category)
# if settings.DEBUG:
# cache.delete(c_key)
out = cache_library(c_key, cb=_inner, ttl=180)
else:
out = _inner()
return Response(out)
from django.db.models import F
from apps.catalogue.models import Category, Product
# add 'django.contrib.postgres' to INSTALLED_APPS
from django.contrib.postgres.search import SearchQuery, SearchVector, SearchRank
from django.contrib.postgres.search import TrigramSimilarity
def _trigram_search(queryset, search, extends=True):
trigram_similarity = TrigramSimilarity('title', search)
query = SearchQuery(search)
for s in search.split(' '):
query |= SearchQuery(s)
return queryset.annotate(
similarity=trigram_similarity,
).filter(
similarity__gt=0,
).order_by('-similarity')
def _similarity_with_rank_search(queryset, search, extends=False):
query = SearchQuery(search)
for s in search.split(' '):
query |= SearchQuery(s)
return queryset.annotate(
rank=SearchRank(F('search'), query),
).filter(rank__gt=0).order_by('-rank')
def _similarity_search(queryset, search, extends=True):
query = SearchQuery(search)
for s in search.split(' '):
query |= SearchQuery(s)
return queryset.annotate(
).filter(search=query)
def _simple_search(queryset, search, extends=True):
return Product.objects.filter(title__search=search).values_list('id', flat=True)
def apply_search(queryset, search: str, mode: str = '_trigram', extends: bool = True):
"""
search : string
mode : selector_functions
* _trigram * _simple
* _similarity_rank * _similarity
extends : Want Unmatched products if not match found?
"""
if mode == '_trigram': # default
filter_func = _trigram_search
elif mode == '_simple':
filter_func = _simple_search
elif mode == '_similarity_rank':
filter_func = _similarity_with_rank_search
elif mode == '_similarity':
filter_func = _similarity_search
else:
raise Exception('Invalid Search Mode')
return filter_func(queryset, search, extends=extends)
def apply_sort(queryset, sort=None):
if sort is not None:
return queryset.order_by(*sort)
return queryset
@api_view()
def product_suggestions(request, **kwargs):
queryset = Product.browsable.browsable()
_search = request.GET.get('q')
_max_size = 10
out = {'results': [], 'class': None, }
if _search:
queryset = apply_search(queryset=queryset, search=_search)
rc = recommended_class(queryset)
queryset = queryset.values('title', 'slug')
out['results'] = queryset[:_max_size]
out['class'] = rc
# return JsonResponse(out, status=(400 if len(out['results']) == 0 else 200))
return Response(out, status=(400 if len(out['results']) == 0 else 200))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment