Skip to content

Instantly share code, notes, and snippets.

@GeekTrainer
Last active May 28, 2026 00:09
Show Gist options
  • Select an option

  • Save GeekTrainer/560afbf5d86604df1795a152c4df83a2 to your computer and use it in GitHub Desktop.

Select an option

Save GeekTrainer/560afbf5d86604df1795a152c4df83a2 to your computer and use it in GitHub Desktop.
Flask + SQLAlchemy 2.0 read-endpoint pattern

Flask + SQLAlchemy 2.0 read-endpoint pattern

Small helpers for Flask + SQLAlchemy 2.x read endpoints:

  • paginate() — paginated list responses
  • get_or_404() — single-entity lookup with a JSON 404
  • A select()-based base statement per resource
  • A shared HTTPException handler so every error response is JSON

Wire it into the app factory:

from .errors import register_error_handlers
from .routes import items_bp, owners_bp

def create_app():
    app = Flask(__name__)
    register_error_handlers(app)
    app.register_blueprint(items_bp)
    app.register_blueprint(owners_bp)
    return app

Example endpoints in routes.py:

  • GET /api/items — paginated list, optional filter
  • GET /api/items/<id> — single item with nested owner
  • GET /api/owners — unpaged list
  • GET /api/owners/<id> — single owner with nested items
from flask import Flask, jsonify
from werkzeug.exceptions import HTTPException
def register_error_handlers(app: Flask) -> None:
@app.errorhandler(HTTPException)
def handle_http_exception(error: HTTPException):
return (
jsonify({"error": error.name, "message": error.description}),
error.code or 500,
)
from dataclasses import dataclass
from math import ceil
from typing import Callable, Generic, Sequence, TypeVar
from flask import abort
from sqlalchemy import Select, func
from sqlalchemy.orm import Session
T = TypeVar("T")
@dataclass(frozen=True)
class Page(Generic[T]):
items: Sequence[T]
page: int
page_size: int
total: int
total_pages: int
def to_response(
self,
collection_key: str,
serialize: Callable[[T], dict],
) -> dict:
return {
collection_key: [serialize(x) for x in self.items],
"pagination": {
"page": self.page,
"pageSize": self.page_size,
"total": self.total,
"totalPages": self.total_pages,
},
}
def paginate(
session: Session,
stmt: Select,
page: int = 1,
page_size: int = 20,
max_page_size: int = 100,
) -> Page:
page = max(1, page)
page_size = max(1, min(page_size, max_page_size))
count_stmt = stmt.order_by(None).with_only_columns(func.count())
total = session.scalar(count_stmt) or 0
rows = session.scalars(
stmt.limit(page_size).offset((page - 1) * page_size)
).all()
return Page(
items=rows,
page=page,
page_size=page_size,
total=total,
total_pages=ceil(total / page_size),
)
def get_or_404(session: Session, stmt: Select, message: str = "Not found"):
obj = session.scalars(stmt).one_or_none()
if obj is None:
abort(404, description=message)
return obj
from flask import Blueprint, jsonify, request
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from .models import db, Item, Owner
from .query_helpers import paginate, get_or_404
items_bp = Blueprint("items", __name__)
owners_bp = Blueprint("owners", __name__)
def items_base_stmt():
return select(Item).options(selectinload(Item.owner))
def owners_base_stmt():
return select(Owner)
def serialize_item(item: Item) -> dict:
return {
"id": item.id,
"name": item.name,
"owner": {"id": item.owner.id, "name": item.owner.name} if item.owner else None,
}
def serialize_owner(owner: Owner, items: list[Item] | None = None) -> dict:
payload = {"id": owner.id, "name": owner.name}
if items is not None:
payload["items"] = [{"id": i.id, "name": i.name} for i in items]
return payload
@items_bp.get("/api/items")
def list_items():
page = request.args.get("page", default=1, type=int)
page_size = request.args.get("pageSize", default=20, type=int)
owner_id = request.args.get("ownerId", type=int)
stmt = items_base_stmt()
if owner_id is not None:
stmt = stmt.where(Item.owner_id == owner_id)
stmt = stmt.order_by(Item.name.asc())
page_result = paginate(db.session, stmt, page=page, page_size=page_size)
return jsonify(page_result.to_response("items", serialize_item))
@items_bp.get("/api/items/<int:item_id>")
def get_item(item_id: int):
item = get_or_404(
db.session,
items_base_stmt().where(Item.id == item_id),
message="Item not found",
)
return jsonify(serialize_item(item))
@owners_bp.get("/api/owners")
def list_owners():
stmt = owners_base_stmt().order_by(Owner.name.asc())
owners = db.session.scalars(stmt).all()
return jsonify({"owners": [serialize_owner(o) for o in owners]})
@owners_bp.get("/api/owners/<int:owner_id>")
def get_owner(owner_id: int):
stmt = (
owners_base_stmt()
.where(Owner.id == owner_id)
.options(selectinload(Owner.items))
)
owner = get_or_404(db.session, stmt, message="Owner not found")
return jsonify(serialize_owner(owner, items=owner.items))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment