Last active
July 4, 2023 15:21
-
-
Save deepanshumehtaa/87ba0e44d6eb6f63ece7af88aeb088f8 to your computer and use it in GitHub Desktop.
Custom Pagination
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import math | |
from django.db import connection | |
from rest_framework.response import Response | |
class CustomPagination(object): | |
""" | |
author: Deepanshu Mehta | |
how to use: | |
page_obj = MyPagination() | |
return page_obj.get_paginated_response(YOUR_RAW_QUERY) | |
""" | |
def __init__(self, request, page_size=50): | |
self.page_size = page_size | |
self.header = [] | |
self.request = request | |
self.uri = self.request.build_absolute_uri() | |
def jasonify(self, raw_value): | |
json_res = [] | |
for row in raw_value: | |
json_row = dict(zip(self.header, row)) | |
json_res.append(json_row) | |
return json_res | |
def get_paginated_data(self, query, page_number=1): | |
if not query: | |
raise Exception("Raw Query not provided to Pagination API") | |
with connection.cursor() as cursor: | |
cursor.execute(query) | |
self.header = [i[0] for i in cursor.description] | |
results = cursor.fetchall() | |
total_items = len(results) # Total count of items | |
start_index = (page_number - 1) * self.page_size | |
end_index = start_index + self.page_size | |
paginated_data = results[start_index:end_index] | |
# Calculate next and previous page numbers | |
total_pages = (total_items + self.page_size - 1) // self.page_size | |
next_page = page_number + 1 if page_number < total_pages else None | |
previous_page = page_number - 1 if page_number > 1 else None | |
# print(next_page, previous_page) | |
return total_items, paginated_data | |
def get_paginated_response(self, query, page_number=1): | |
total_items, paginated_data = self.get_paginated_data(query, page_number) | |
total_pages = 1 if total_items == 0 else math.ceil(total_items/self.page_size) | |
response_data = { | |
'total_items': total_items, | |
'page': page_number, | |
'total_pages': total_pages, | |
'page_size': self.page_size, | |
'next': self.get_next_page(page_number, total_pages), | |
'previous': self.get_prv_page(page_number), | |
'results': self.jasonify(paginated_data), | |
} | |
return Response(response_data, 200) | |
def get_next_page(self, page_number, total_pages): | |
nxt = page_number + 1 | |
if nxt > total_pages: | |
return | |
return self.reconstruct_uri(nxt) | |
def get_prv_page(self, page_number): | |
prv = page_number - 1 | |
if prv < 1: | |
return | |
return self.reconstruct_uri(prv) | |
def reconstruct_uri(self, new_page): | |
if '?' not in set(self.uri): | |
return | |
base, param = self.uri.split('?') | |
params_li = param.split("&") | |
param_dict = {} | |
for param_ in params_li: | |
k, v = param_.split('=') | |
if k == 'page': | |
v = new_page | |
param_dict[k] = v | |
new_link = ''.join([k + "=" + str(v) + "&" for k, v in param_dict.items()]) | |
new_link = base + '?' + new_link | |
return new_link | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment