Skip to content

Instantly share code, notes, and snippets.

@bitner
Created September 23, 2025 16:16
Show Gist options
  • Save bitner/4c9bae9caabbacc0ab886895b88a85c5 to your computer and use it in GitHub Desktop.
Save bitner/4c9bae9caabbacc0ab886895b88a85c5 to your computer and use it in GitHub Desktop.
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "fastapi",
# "cql2",
# "ciso8601",
# "pydantic>=2.0.0",
# "uvicorn[standard]",
# ]
# ///
from fastapi import FastAPI, Query, Depends
from fastapi.exceptions import RequestValidationError
from starlette.requests import Request
from typing import Annotated, Optional, Union, List, Dict, Any
from functools import reduce
from pydantic import Field
from pydantic.functional_validators import BeforeValidator, AfterValidator
from cql2 import Expr
from ciso8601 import parse_rfc3339
import uvicorn
app = FastAPI()
RESERVED_PARAMS = [
"f",
"ids",
"datetime",
"bbox",
"properties",
"filter",
"filter-lang",
"geom-column",
"datetime-column",
"limit",
"offset",
"bbox-only",
"simplify",
"sortby",
]
# These will come from introspecting the database schema
# For now, we hardcode a sample list
WHITELIST_TABLES = ["roads", "buildings", "lakes", "rivers"]
WHITELIST_COLUMNS = {
"id": {"type": "text"},
"geometry": {"type": "geometry"},
"timestamp": {"type": "timestamptz"},
"name": {"type": "text"},
"num": {"type": "int"},
}
WHITELIST_FUNCTIONS = [
"strip_accents",
"upper",
"lower",
"st_intersects",
"st_equals",
"st_within",
"st_contains",
"st_crosses",
"st_overlaps",
"st_touches",
"st_disjoint",
"st_dwithin",
"st_area",
"st_length",
"st_distance",
"st_centroid",
"st_simplify",
]
Latitude = Annotated[float, Field(ge=-90, le=90)]
Longitude = Annotated[float, Field(ge=-180, le=180)]
def bbox_before_validator(v: Union[str, list[float]]) -> list[float]:
if isinstance(v, str):
v = [float(x.strip()) for x in v.split(",")]
elif len(v) == 1 and isinstance(v[0], str):
v = [float(x.strip()) for x in v[0].split(",")]
return v
def bbox_after_validator(v: list[float]) -> list[float]:
# Validate for 4-element and 6-element bboxes
if len(v) == 4:
if not (v[0] <= v[2] and v[1] <= v[3]):
raise ValueError(
"Invalid bbox: minx must be < maxx and miny must be < maxy for 4-element bbox"
)
elif len(v) == 6:
if not (v[0] <= v[3] and v[1] <= v[4]):
raise ValueError(
"Invalid bbox: minx must be < maxx and miny must be < maxy for 6-element bbox"
)
return v
BBox = Annotated[
Union[
tuple[Longitude, Latitude, Longitude, Latitude],
tuple[Longitude, Latitude, float, Longitude, Latitude, float],
],
BeforeValidator(bbox_before_validator),
AfterValidator(bbox_after_validator),
]
def bbox_expr(bbox: Union[BBox, None], geom_field: str = "geometry") -> Optional[Expr]:
print("bbox_expr:", bbox)
if bbox is None:
return None
return Expr(f"S_INTERSECTS({geom_field}, BBOX({', '.join(map(str, bbox))}))")
def datetime_before_validator(v: Union[str, None]) -> Optional[Union[str, list[str]]]:
if v is None:
return None
if isinstance(v, str):
# Accept ISO 8601 strings, possibly range separated by '/'
parts = v.split("/")
for part in parts:
if part and part not in ("..", ""):
try:
parse_rfc3339(part)
except Exception:
raise ValueError(f"Invalid datetime string: {part}")
if len(parts) == 2:
start, end = parts
if start and end:
start_dt = parse_rfc3339(start)
end_dt = parse_rfc3339(end)
if start_dt > end_dt:
raise ValueError("Invalid datetime range: start must be <= end")
if start_dt == end_dt:
return start # Collapse to single datetime
return parts if len(parts) > 1 else parts[0]
raise ValueError(f"Invalid datetime value: {v}")
DateTime = Annotated[
Optional[Union[str, list[str]]], BeforeValidator(datetime_before_validator)
]
def datetime_expr(
dt: Union[DateTime, None], dt_field: str = "datetime"
) -> Optional[Expr]:
print("datetime_expr:", dt)
if dt is None:
return None
if isinstance(dt, list):
# Range: [start, end]
start, end = (dt + [None, None])[:2]
exprs = []
if start:
exprs.append(Expr(f"{dt_field}>=TIMESTAMP('{start}')"))
if end:
exprs.append(Expr(f"{dt_field}<=TIMESTAMP('{end}')"))
if exprs:
# Combine with AND if both
if len(exprs) == 2:
return exprs[0] + exprs[1]
return exprs[0]
return None
else:
# Single datetime
return Expr(f"{dt_field}=TIMESTAMP('{dt}')")
def comma_separated_before_validator(
v: Union[str, List[str], None],
) -> Optional[List[str]]:
print(v)
if v is None:
return None
if isinstance(v, list) and len(v) == 1:
v = v[0]
if isinstance(v, str):
v = v.split(",")
v = [v.strip() for v in v]
# Remove duplicates, preserve order
return list(dict.fromkeys(v))
CommaSeparated = Annotated[
Optional[List[str]], BeforeValidator(comma_separated_before_validator)
]
def ids_expr(ids: Union[CommaSeparated, None], id_field: str = "id") -> Optional[Expr]:
print("ids_expr:", ids)
if ids is None:
return None
if len(ids) == 1:
return Expr(f"{id_field}='{ids[0]}'")
else:
id_list = ", ".join(f"'{id_}'" for id_ in ids)
return Expr(f"{id_field} IN ({id_list})")
class Params:
def __init__(
self,
request: Request,
collection: Annotated[
str, Field(description="Collection ID from path parameter")
],
bbox: Annotated[
Union[BBox, None], Field(description="Bounding box coordinates")
] = None,
datetime: Annotated[
Union[DateTime, None], Query(description="Datetime or range")
] = None,
ids: Annotated[
CommaSeparated,
Field(description="Comma separated list of IDs from query parameter"),
] = None,
id: Annotated[
Optional[str], Field(description="Single ID from path parameter")
] = None,
filter: Annotated[
Union[str, Dict[str, Any], None], Field(description="Filter expression")
] = None,
):
self.request = request
if collection.lower() not in WHITELIST_TABLES:
raise RequestValidationError(f"Collection '{collection}' is not recognized.")
self.collection = collection
self.bbox = bbox
self.datetime = datetime
self.ids = ids
self.id = id
self.filter = filter
def to_expr(self) -> Optional[Expr]:
exprs: list[Optional[Expr]] = []
exprs.append(Expr(f"collection='{self.collection}'"))
if self.id:
exprs.append(Expr(f"id='{self.id}'"))
if self.filter:
exprs.append(Expr(self.filter))
exprs.append(bbox_expr(self.bbox))
exprs.append(datetime_expr(self.datetime))
exprs.append(ids_expr(self.ids))
for k, v in self.request.query_params.items():
if k.lower() not in RESERVED_PARAMS:
if k.lower() in WHITELIST_COLUMNS:
exprs.append(Expr(f"{k}='{v}'"))
else:
raise RequestValidationError(f"Property '{k}' is not in this collection.")
valid_exprs: list[Expr] = [e for e in exprs if e is not None]
if len(valid_exprs) == 0:
return None
if len(valid_exprs) == 1:
return valid_exprs[0]
return reduce(lambda x, y: x + y, valid_exprs)
@app.get("/")
async def root():
return {"message": "add a collection to the path to test."}
@app.get("/favicon.ico", include_in_schema=False)
async def favicon():
return None
@app.get("/{collection}/{id}")
@app.get("/{collection}")
async def bboxtest(params: Annotated[Params, Depends()]):
e = params.to_expr()
if e:
e = e.reduce()
print(e.to_sql())
return {
"sql": e.to_sql(),
"cql2": e.to_json()
}
return None
if __name__ == "__main__":
uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment