ref: https://twitter.com/_VincentS_/status/1579727759621779456
This is an experiment!!!
The goal was to build a simple raster API to server local tiles and DeepZoom from image using the WIP rio-tiler cogeotiff/rio-tiler#533
ref: https://twitter.com/_VincentS_/status/1579727759621779456
This is an experiment!!!
The goal was to build a simple raster API to server local tiles and DeepZoom from image using the WIP rio-tiler cogeotiff/rio-tiler#533
| import urllib | |
| import math | |
| from enum import Enum | |
| from types import DynamicClassAttribute | |
| from typing import List, Optional, Tuple | |
| from fastapi import FastAPI, Query, Path | |
| from pydantic import BaseModel, Field, root_validator | |
| from rio_tiler.io import COGReader | |
| from rio_tiler.profiles import img_profiles | |
| from rasterio import windows | |
| from starlette.responses import Response | |
| from starlette.requests import Request | |
| from starlette.middleware.cors import CORSMiddleware | |
| ################################################################### | |
| # FROM TITILER | |
| ################################################################### | |
| class XMLResponse(Response): | |
| """XML Response""" | |
| media_type = "application/xml" | |
| class SchemeEnum(str, Enum): | |
| """TileJSON scheme choice.""" | |
| xyz = "xyz" | |
| tms = "tms" | |
| class TileJSON(BaseModel): | |
| """ | |
| TileJSON model. | |
| Based on https://github.com/mapbox/tilejson-spec/tree/master/2.2.0 | |
| """ | |
| tilejson: str = "2.2.0" | |
| name: Optional[str] | |
| description: Optional[str] | |
| version: str = "1.0.0" | |
| attribution: Optional[str] | |
| template: Optional[str] | |
| legend: Optional[str] | |
| scheme: SchemeEnum = SchemeEnum.xyz | |
| tiles: List[str] | |
| grids: Optional[List[str]] | |
| data: Optional[List[str]] | |
| minzoom: int = Field(0, ge=0, le=30) | |
| maxzoom: int = Field(30, ge=0, le=30) | |
| bounds: List[float] = [-180, -90, 180, 90] | |
| center: Optional[Tuple[float, float, int]] | |
| @root_validator | |
| def compute_center(cls, values): | |
| """Compute center if it does not exist.""" | |
| bounds = values["bounds"] | |
| if not values.get("center"): | |
| values["center"] = ( | |
| (bounds[0] + bounds[2]) / 2, | |
| (bounds[1] + bounds[3]) / 2, | |
| values["minzoom"], | |
| ) | |
| return values | |
| class Config: | |
| """TileJSON model configuration.""" | |
| use_enum_values = True | |
| class MediaType(str, Enum): | |
| """Responses Media types formerly known as MIME types.""" | |
| tif = "image/tiff; application=geotiff" | |
| jp2 = "image/jp2" | |
| png = "image/png" | |
| pngraw = "image/png" | |
| jpeg = "image/jpeg" | |
| jpg = "image/jpg" | |
| webp = "image/webp" | |
| npy = "application/x-binary" | |
| xml = "application/xml" | |
| json = "application/json" | |
| geojson = "application/geo+json" | |
| html = "text/html" | |
| text = "text/plain" | |
| pbf = "application/x-protobuf" | |
| mvt = "application/x-protobuf" | |
| class ImageDriver(str, Enum): | |
| """Supported output GDAL drivers.""" | |
| jpeg = "JPEG" | |
| jpg = "JPEG" | |
| png = "PNG" | |
| pngraw = "PNG" | |
| tif = "GTiff" | |
| webp = "WEBP" | |
| jp2 = "JP2OpenJPEG" | |
| npy = "NPY" | |
| class ImageType(str, Enum): | |
| """Available Output image type.""" | |
| png = "png" | |
| npy = "npy" | |
| tif = "tif" | |
| jpeg = "jpeg" | |
| jpg = "jpg" | |
| jp2 = "jp2" | |
| webp = "webp" | |
| pngraw = "pngraw" | |
| @DynamicClassAttribute | |
| def profile(self): | |
| """Return rio-tiler image default profile.""" | |
| return img_profiles.get(self._name_, {}) | |
| @DynamicClassAttribute | |
| def driver(self): | |
| """Return rio-tiler image default profile.""" | |
| return ImageDriver[self._name_].value | |
| @DynamicClassAttribute | |
| def mediatype(self): | |
| """Return image media type.""" | |
| return MediaType[self._name_].value | |
| ################################################################### | |
| app = FastAPI() | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["GET"], | |
| allow_headers=["*"], | |
| ) | |
| @app.get("/deepzoom.dzi", response_class=XMLResponse) | |
| def deepzoom( | |
| url: str = Query(description="URL"), | |
| format: ImageType = Query(ImageType.png, description="Image Format"), | |
| ): | |
| """DeepZoom metadata.""" | |
| with COGReader(url) as dst: | |
| info = dst.info() | |
| return f"""<?xml version="1.0" encoding="UTF-8"?> | |
| <Image xmlns="http://schemas.microsoft.com/deepzoom/2008" | |
| Format="{format.name}" | |
| Overlap="0" | |
| TileSize="256" | |
| > | |
| <Size | |
| Height="{info.height}" | |
| Width="{info.width}" | |
| /> | |
| </Image>""" | |
| @app.get("/deepzoom/{z}/{x}_{y}.{format}", response_class=Response) | |
| def deepzoom_tile( | |
| level: int = Path(None, alias="z"), | |
| column: int = Path(None, alias="x"), | |
| row: int = Path(None, alias="y"), | |
| format: ImageType = Path(ImageType.png, description="Image Format"), | |
| url: str = Query(description="URL"), | |
| ): | |
| """DeepZoom tile.""" | |
| tile_size = 256 | |
| with COGReader(url) as dst: | |
| max_dimension = max(dst.dataset.width, dst.dataset.height) | |
| max_level = int(math.ceil(math.log(max_dimension, 2))) | |
| assert 0 <= level and level < max_level + 1, 'Invalid pyramid level' | |
| # Overview Size | |
| level_width = dst.dataset.width // 2 ** (max_level - level) | |
| level_height = dst.dataset.height // 2 ** (max_level - level) | |
| # Nb tiles for Ovr | |
| level_x_tile = int(math.ceil(level_width / tile_size)) | |
| level_y_tile = int(math.ceil(level_height / tile_size)) | |
| # Output Size | |
| width = min(tile_size, level_width - (tile_size * (level_x_tile - 1))) if column == level_x_tile - 1 else tile_size | |
| height = min(tile_size, level_height - (tile_size * (level_y_tile - 1))) if row == level_y_tile - 1 else tile_size | |
| # BBox | |
| x_origin = (dst.dataset.width / level_width) * tile_size * column | |
| y_origin = (dst.dataset.height / level_height) * tile_size * row | |
| x_max = min(dst.dataset.width, x_origin + dst.dataset.width / level_width * width) | |
| y_max = min(dst.dataset.height, y_origin + dst.dataset.height / level_height * height) | |
| w = windows.from_bounds( | |
| x_origin, | |
| y_max, | |
| x_max, | |
| y_origin, | |
| transform=dst.dataset.transform, | |
| ) | |
| img = dst.read(window=w, width=width, height=height) | |
| content = img.render(img_format=format.driver, **format.profile) | |
| return Response(content, media_type=format.mediatype) | |
| @app.get("/info") | |
| def info(url: str = Query(description="URL")): | |
| with COGReader(url) as dst: | |
| return dst.info().dict(exclude_none=True) | |
| @app.get( | |
| "/tilejson.json", | |
| response_model=TileJSON, | |
| responses={200: {"description": "Return a tilejson"}}, | |
| response_model_exclude_none=True, | |
| ) | |
| def tilejson( | |
| request: Request, | |
| url: str = Query(description="URL"), | |
| ): | |
| route_params = { | |
| "z": "{z}", | |
| "x": "{x}", | |
| "y": "{y}", | |
| } | |
| tiles_url = request.url_for("tile", **route_params) | |
| qs = [ | |
| (key, value) | |
| for (key, value) in request.query_params._list | |
| ] | |
| if qs: | |
| tiles_url += f"?{urllib.parse.urlencode(qs)}" | |
| with COGReader(url) as dst: | |
| return { | |
| "bounds": dst.geographic_bounds, | |
| "minzoom": dst.minzoom, | |
| "maxzoom": dst.maxzoom, | |
| "tiles": [tiles_url], | |
| } | |
| @app.get("/tiles/{z}/{x}/{y}", response_class=Response) | |
| def tile( | |
| z: int, | |
| x: int, | |
| y: int, | |
| url: str = Query(description="URL"), | |
| ): | |
| """Tile in Local TMS.""" | |
| with COGReader(url) as dst: | |
| img = dst.tile(x, y, z) | |
| format = ImageType.jpeg if img.mask.all() else ImageType.png | |
| content = img.render(img_format=format.driver, **format.profile) | |
| return Response(content, media_type=format.mediatype) |