Created
April 23, 2011 15:51
-
-
Save mishudark/938717 to your computer and use it in GitHub Desktop.
paginator compatible with pymongo and mongoengine
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# pagination.py | |
class Paginator(object): | |
def __init__(self, query, per_page): | |
"""Initialize a paginator. | |
Arguments: | |
- query -- queryset from pymongo or mongoengine | |
- per_page -- number of items per page | |
Usage: :: | |
latest_blog_posts = BlogPost.objects() | |
paginator = Paginator(latest_blog_posts, 10) | |
# get first page of blog posts | |
page = paginator.page(1) | |
for post in page.object_list: | |
# do something with BlogPost instance | |
""" | |
from math import ceil | |
self.query = query | |
self.per_page = per_page | |
self._count = query.count() | |
self.page_count = int(ceil(float(self._count) / per_page)) | |
def page_range(self): | |
"""Return a 1-based list of page numbers valid for this ``Paginator``. | |
""" | |
return range(1, self.page_count + 1) | |
def page(self, page_number): | |
"""1-based access to pages. | |
""" | |
if not isinstance(page_number, int): | |
try: | |
page_number = int(page_number) | |
except: | |
raise Exception("Page numbers must be integers.") | |
if page_number < 1: | |
raise Exception("%s is less than 1." % page_number) | |
if page_number > self.page_count: | |
raise Exception("Page %s contains no results." \ | |
% page_number) | |
# apply pagination offsets and limits | |
start_offset = self.per_page * (page_number-1) | |
end_offset = self.per_page + start_offset | |
query = self.query[start_offset:end_offset] | |
# return Page object | |
object_list = list(query) | |
return Page(page_number, self.page_count, object_list, self._count) | |
class Page(object): | |
def __init__(self, object_list, page_number, page_count, all_objects_count): | |
"""A page of data generated by ``Paginator``. | |
""" | |
self.object_list = object_list | |
self.page_number = page_number | |
self.page_count = page_count | |
self.all_objects_count = all_objects_count | |
def __repr__(self): | |
return u"<Page %s of %s>" % (self.page_number, self.page_count) | |
def has_previous(self): | |
return self.page_number > 1 | |
def has_next(self): | |
return self.page_number < self.page_count | |
def has_other_pages(self): | |
return self.has_next() or self.has_previous() | |
def next_page_number(self): | |
return self.page_number + 1 | |
def previous_page_number(self): | |
return self.page_number - 1 | |
# pagination_tests.py | |
import unittest | |
from mongoengine.document import Document | |
from mongoengine import fields, connect | |
from pagination import Paginator | |
class PaginationTest(unittest.TestCase): | |
def setUp(self): | |
connect(db='me_pagination_tests') | |
class BlogPost(Document): | |
title = fields.StringField() | |
self.BlogPost = BlogPost | |
post_1 = BlogPost(title="Post #1") | |
post_1.save() | |
post_2 = BlogPost(title="Post #2") | |
post_2.save() | |
post_3 = BlogPost(title="Post #3") | |
post_3.save() | |
post_4 = BlogPost(title="Post #4") | |
post_4.save() | |
post_5 = BlogPost(title="Post #5") | |
post_5.save() | |
def test_paginator(self): | |
"""Ensure paginator page counts and ranges are correct. | |
""" | |
query = self.BlogPost.objects() | |
paginator = Paginator(query, 2) | |
# test count and range | |
self.assertEqual(paginator.page_count, 3) | |
self.assertEqual(paginator.page_range(), [1, 2, 3]) | |
def test_page(self): | |
"""Ensure a ``Page`` returned by a ``Paginator`` is correct. | |
""" | |
# get some pages | |
first_page = Paginator(self.BlogPost.objects(), 2).page(1) | |
second_page = Paginator(self.BlogPost.objects(), 2).page(2) | |
third_page = Paginator(self.BlogPost.objects(), 2).page(3) | |
# validate object list's length | |
# self.assertEqual(len(first_page.object_list), 2) | |
self.assertEqual(len(second_page.object_list), 2) | |
self.assertEqual(len(third_page.object_list), 1) | |
# validate first page's directions | |
self.assertEqual(first_page.has_next(), True) | |
self.assertEqual(first_page.has_previous(), False) | |
self.assertEqual(first_page.has_other_pages(), True) | |
self.assertEqual(first_page.next_page_number(), 2) | |
self.assertEqual(first_page.previous_page_number(), 0) | |
# validate second page | |
self.assertEqual(second_page.has_next(), True) | |
self.assertEqual(second_page.has_previous(), True) | |
self.assertEqual(second_page.has_other_pages(), True) | |
self.assertEqual(second_page.next_page_number(), 3) | |
self.assertEqual(second_page.previous_page_number(), 1) | |
# validate third page | |
self.assertEqual(third_page.has_next(), False) | |
self.assertEqual(third_page.has_previous(), True) | |
self.assertEqual(third_page.has_other_pages(), True) | |
self.assertEqual(third_page.next_page_number(), 4) | |
self.assertEqual(third_page.previous_page_number(), 2) | |
def tearDown(self): | |
self.BlogPost.drop_collection() | |
if __name__ == '__main__': | |
unittest.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment