Created
September 23, 2025 16:16
-
-
Save bitner/4c9bae9caabbacc0ab886895b88a85c5 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env -S uv run --script | |
| # /// script | |
| # requires-python = ">=3.10" | |
| # dependencies = [ | |
| # "fastapi", | |
| # "cql2", | |
| # "ciso8601", | |
| # "pydantic>=2.0.0", | |
| # "uvicorn[standard]", | |
| # ] | |
| # /// | |
| from fastapi import FastAPI, Query, Depends | |
| from fastapi.exceptions import RequestValidationError | |
| from starlette.requests import Request | |
| from typing import Annotated, Optional, Union, List, Dict, Any | |
| from functools import reduce | |
| from pydantic import Field | |
| from pydantic.functional_validators import BeforeValidator, AfterValidator | |
| from cql2 import Expr | |
| from ciso8601 import parse_rfc3339 | |
| import uvicorn | |
| app = FastAPI() | |
| RESERVED_PARAMS = [ | |
| "f", | |
| "ids", | |
| "datetime", | |
| "bbox", | |
| "properties", | |
| "filter", | |
| "filter-lang", | |
| "geom-column", | |
| "datetime-column", | |
| "limit", | |
| "offset", | |
| "bbox-only", | |
| "simplify", | |
| "sortby", | |
| ] | |
| # These will come from introspecting the database schema | |
| # For now, we hardcode a sample list | |
| WHITELIST_TABLES = ["roads", "buildings", "lakes", "rivers"] | |
| WHITELIST_COLUMNS = { | |
| "id": {"type": "text"}, | |
| "geometry": {"type": "geometry"}, | |
| "timestamp": {"type": "timestamptz"}, | |
| "name": {"type": "text"}, | |
| "num": {"type": "int"}, | |
| } | |
| WHITELIST_FUNCTIONS = [ | |
| "strip_accents", | |
| "upper", | |
| "lower", | |
| "st_intersects", | |
| "st_equals", | |
| "st_within", | |
| "st_contains", | |
| "st_crosses", | |
| "st_overlaps", | |
| "st_touches", | |
| "st_disjoint", | |
| "st_dwithin", | |
| "st_area", | |
| "st_length", | |
| "st_distance", | |
| "st_centroid", | |
| "st_simplify", | |
| ] | |
| Latitude = Annotated[float, Field(ge=-90, le=90)] | |
| Longitude = Annotated[float, Field(ge=-180, le=180)] | |
| def bbox_before_validator(v: Union[str, list[float]]) -> list[float]: | |
| if isinstance(v, str): | |
| v = [float(x.strip()) for x in v.split(",")] | |
| elif len(v) == 1 and isinstance(v[0], str): | |
| v = [float(x.strip()) for x in v[0].split(",")] | |
| return v | |
| def bbox_after_validator(v: list[float]) -> list[float]: | |
| # Validate for 4-element and 6-element bboxes | |
| if len(v) == 4: | |
| if not (v[0] <= v[2] and v[1] <= v[3]): | |
| raise ValueError( | |
| "Invalid bbox: minx must be < maxx and miny must be < maxy for 4-element bbox" | |
| ) | |
| elif len(v) == 6: | |
| if not (v[0] <= v[3] and v[1] <= v[4]): | |
| raise ValueError( | |
| "Invalid bbox: minx must be < maxx and miny must be < maxy for 6-element bbox" | |
| ) | |
| return v | |
| BBox = Annotated[ | |
| Union[ | |
| tuple[Longitude, Latitude, Longitude, Latitude], | |
| tuple[Longitude, Latitude, float, Longitude, Latitude, float], | |
| ], | |
| BeforeValidator(bbox_before_validator), | |
| AfterValidator(bbox_after_validator), | |
| ] | |
| def bbox_expr(bbox: Union[BBox, None], geom_field: str = "geometry") -> Optional[Expr]: | |
| print("bbox_expr:", bbox) | |
| if bbox is None: | |
| return None | |
| return Expr(f"S_INTERSECTS({geom_field}, BBOX({', '.join(map(str, bbox))}))") | |
| def datetime_before_validator(v: Union[str, None]) -> Optional[Union[str, list[str]]]: | |
| if v is None: | |
| return None | |
| if isinstance(v, str): | |
| # Accept ISO 8601 strings, possibly range separated by '/' | |
| parts = v.split("/") | |
| for part in parts: | |
| if part and part not in ("..", ""): | |
| try: | |
| parse_rfc3339(part) | |
| except Exception: | |
| raise ValueError(f"Invalid datetime string: {part}") | |
| if len(parts) == 2: | |
| start, end = parts | |
| if start and end: | |
| start_dt = parse_rfc3339(start) | |
| end_dt = parse_rfc3339(end) | |
| if start_dt > end_dt: | |
| raise ValueError("Invalid datetime range: start must be <= end") | |
| if start_dt == end_dt: | |
| return start # Collapse to single datetime | |
| return parts if len(parts) > 1 else parts[0] | |
| raise ValueError(f"Invalid datetime value: {v}") | |
| DateTime = Annotated[ | |
| Optional[Union[str, list[str]]], BeforeValidator(datetime_before_validator) | |
| ] | |
| def datetime_expr( | |
| dt: Union[DateTime, None], dt_field: str = "datetime" | |
| ) -> Optional[Expr]: | |
| print("datetime_expr:", dt) | |
| if dt is None: | |
| return None | |
| if isinstance(dt, list): | |
| # Range: [start, end] | |
| start, end = (dt + [None, None])[:2] | |
| exprs = [] | |
| if start: | |
| exprs.append(Expr(f"{dt_field}>=TIMESTAMP('{start}')")) | |
| if end: | |
| exprs.append(Expr(f"{dt_field}<=TIMESTAMP('{end}')")) | |
| if exprs: | |
| # Combine with AND if both | |
| if len(exprs) == 2: | |
| return exprs[0] + exprs[1] | |
| return exprs[0] | |
| return None | |
| else: | |
| # Single datetime | |
| return Expr(f"{dt_field}=TIMESTAMP('{dt}')") | |
| def comma_separated_before_validator( | |
| v: Union[str, List[str], None], | |
| ) -> Optional[List[str]]: | |
| print(v) | |
| if v is None: | |
| return None | |
| if isinstance(v, list) and len(v) == 1: | |
| v = v[0] | |
| if isinstance(v, str): | |
| v = v.split(",") | |
| v = [v.strip() for v in v] | |
| # Remove duplicates, preserve order | |
| return list(dict.fromkeys(v)) | |
| CommaSeparated = Annotated[ | |
| Optional[List[str]], BeforeValidator(comma_separated_before_validator) | |
| ] | |
| def ids_expr(ids: Union[CommaSeparated, None], id_field: str = "id") -> Optional[Expr]: | |
| print("ids_expr:", ids) | |
| if ids is None: | |
| return None | |
| if len(ids) == 1: | |
| return Expr(f"{id_field}='{ids[0]}'") | |
| else: | |
| id_list = ", ".join(f"'{id_}'" for id_ in ids) | |
| return Expr(f"{id_field} IN ({id_list})") | |
| class Params: | |
| def __init__( | |
| self, | |
| request: Request, | |
| collection: Annotated[ | |
| str, Field(description="Collection ID from path parameter") | |
| ], | |
| bbox: Annotated[ | |
| Union[BBox, None], Field(description="Bounding box coordinates") | |
| ] = None, | |
| datetime: Annotated[ | |
| Union[DateTime, None], Query(description="Datetime or range") | |
| ] = None, | |
| ids: Annotated[ | |
| CommaSeparated, | |
| Field(description="Comma separated list of IDs from query parameter"), | |
| ] = None, | |
| id: Annotated[ | |
| Optional[str], Field(description="Single ID from path parameter") | |
| ] = None, | |
| filter: Annotated[ | |
| Union[str, Dict[str, Any], None], Field(description="Filter expression") | |
| ] = None, | |
| ): | |
| self.request = request | |
| if collection.lower() not in WHITELIST_TABLES: | |
| raise RequestValidationError(f"Collection '{collection}' is not recognized.") | |
| self.collection = collection | |
| self.bbox = bbox | |
| self.datetime = datetime | |
| self.ids = ids | |
| self.id = id | |
| self.filter = filter | |
| def to_expr(self) -> Optional[Expr]: | |
| exprs: list[Optional[Expr]] = [] | |
| exprs.append(Expr(f"collection='{self.collection}'")) | |
| if self.id: | |
| exprs.append(Expr(f"id='{self.id}'")) | |
| if self.filter: | |
| exprs.append(Expr(self.filter)) | |
| exprs.append(bbox_expr(self.bbox)) | |
| exprs.append(datetime_expr(self.datetime)) | |
| exprs.append(ids_expr(self.ids)) | |
| for k, v in self.request.query_params.items(): | |
| if k.lower() not in RESERVED_PARAMS: | |
| if k.lower() in WHITELIST_COLUMNS: | |
| exprs.append(Expr(f"{k}='{v}'")) | |
| else: | |
| raise RequestValidationError(f"Property '{k}' is not in this collection.") | |
| valid_exprs: list[Expr] = [e for e in exprs if e is not None] | |
| if len(valid_exprs) == 0: | |
| return None | |
| if len(valid_exprs) == 1: | |
| return valid_exprs[0] | |
| return reduce(lambda x, y: x + y, valid_exprs) | |
| @app.get("/") | |
| async def root(): | |
| return {"message": "add a collection to the path to test."} | |
| @app.get("/favicon.ico", include_in_schema=False) | |
| async def favicon(): | |
| return None | |
| @app.get("/{collection}/{id}") | |
| @app.get("/{collection}") | |
| async def bboxtest(params: Annotated[Params, Depends()]): | |
| e = params.to_expr() | |
| if e: | |
| e = e.reduce() | |
| print(e.to_sql()) | |
| return { | |
| "sql": e.to_sql(), | |
| "cql2": e.to_json() | |
| } | |
| return None | |
| if __name__ == "__main__": | |
| uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment