Skip to content

Instantly share code, notes, and snippets.

@khahux
Last active August 19, 2019 05:06
Show Gist options
  • Save khahux/55f5fd578bc7e49b4ca99aecf870cec7 to your computer and use it in GitHub Desktop.
Save khahux/55f5fd578bc7e49b4ca99aecf870cec7 to your computer and use it in GitHub Desktop.
class Cursor(object):
"""
只是翻页的结构, 原始值有三种: 0, None, 其他
若是 0, 其实是上次返回时指示已没有结果的意思, 应返回空
若是 None, 表示是第一次请求, 应使用默认参数对请求做出响应
若是其他值, 正数表示像后翻, 负数表示向前翻
"""
DIRECTION_NEXT = 'next'
DIRECTION_PREVIOUS = 'previous'
DIRECTION_NONE = None
def __init__(self, direction, position):
self.direction = direction
self.position = position
@classmethod
def parse(cls, request_handler):
cursor_value = request_handler.get_argument_int('cursor', default=None)
if cursor_value is None:
cursor_direction = cls.DIRECTION_NEXT
cursor_position = None
elif cursor_value == 0:
cursor_direction = None
cursor_position = None
else:
cursor_direction = cls.DIRECTION_NEXT if cursor_value > 0 else cls.DIRECTION_PREVIOUS
cursor_position = abs(cursor_value)
cursor = Cursor(cursor_direction, cursor_position)
return cursor
@classmethod
def get_records(cls, session, cursor, page_size, filters, record_obj):
"""
按照 cursor 获得一页结果. 返回 records, cursor_previous, cursor_next
@param session: sqlalchemy session
@param cursor: Cursor obj
@param page_size: 页大小
@param filters: 一个 list, 条件过滤器
@param record_obj: sqlalchemy 对象
"""
cursor_previous = 0
cursor_next = 0
the_records = []
if cursor.direction == cls.DIRECTION_NEXT:
# 下一页
if cursor.position:
filters.append(record_obj.id < cursor.position)
the_records = session.query(record_obj).filter(
*filters
).order_by(record_obj.id.desc())[:page_size]
if cursor.position and the_records:
# 不是请求第一页, 前页可能有结果
cursor_previous = -the_records[0].id
if len(the_records) == page_size:
# 结果满了一页了, 后页可能有结果
cursor_next = the_records[-1].id
elif cursor.direction == cls.DIRECTION_PREVIOUS:
# 上一页
filters.append(record_obj.id > cursor.position)
top_n_records = session.query(record_obj).filter(
*filters
).order_by(record_obj.id.asc()).limit(page_size).subquery()
the_records = session.query(record_obj).select_entity_from(
top_n_records
).order_by(record_obj.id.desc()).all()
if len(the_records) == page_size:
# 结果满了一页了, 前页可能有结果
cursor_previous = -the_records[0].id
if the_records:
# 后面的结果不能确定有没有...
cursor_next = the_records[-1].id
return the_records, cursor_previous, cursor_next
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment