ref: https://twitter.com/_VincentS_/status/1579727759621779456
This is an experiment!!!
The goal was to build a simple raster API to server local tiles and DeepZoom from image using the WIP rio-tiler cogeotiff/rio-tiler#533
ref: https://twitter.com/_VincentS_/status/1579727759621779456
This is an experiment!!!
The goal was to build a simple raster API to server local tiles and DeepZoom from image using the WIP rio-tiler cogeotiff/rio-tiler#533
import urllib | |
import math | |
from enum import Enum | |
from types import DynamicClassAttribute | |
from typing import List, Optional, Tuple | |
from fastapi import FastAPI, Query, Path | |
from pydantic import BaseModel, Field, root_validator | |
from rio_tiler.io import COGReader | |
from rio_tiler.profiles import img_profiles | |
from rasterio import windows | |
from starlette.responses import Response | |
from starlette.requests import Request | |
from starlette.middleware.cors import CORSMiddleware | |
################################################################### | |
# FROM TITILER | |
################################################################### | |
class XMLResponse(Response): | |
"""XML Response""" | |
media_type = "application/xml" | |
class SchemeEnum(str, Enum): | |
"""TileJSON scheme choice.""" | |
xyz = "xyz" | |
tms = "tms" | |
class TileJSON(BaseModel): | |
""" | |
TileJSON model. | |
Based on https://github.com/mapbox/tilejson-spec/tree/master/2.2.0 | |
""" | |
tilejson: str = "2.2.0" | |
name: Optional[str] | |
description: Optional[str] | |
version: str = "1.0.0" | |
attribution: Optional[str] | |
template: Optional[str] | |
legend: Optional[str] | |
scheme: SchemeEnum = SchemeEnum.xyz | |
tiles: List[str] | |
grids: Optional[List[str]] | |
data: Optional[List[str]] | |
minzoom: int = Field(0, ge=0, le=30) | |
maxzoom: int = Field(30, ge=0, le=30) | |
bounds: List[float] = [-180, -90, 180, 90] | |
center: Optional[Tuple[float, float, int]] | |
@root_validator | |
def compute_center(cls, values): | |
"""Compute center if it does not exist.""" | |
bounds = values["bounds"] | |
if not values.get("center"): | |
values["center"] = ( | |
(bounds[0] + bounds[2]) / 2, | |
(bounds[1] + bounds[3]) / 2, | |
values["minzoom"], | |
) | |
return values | |
class Config: | |
"""TileJSON model configuration.""" | |
use_enum_values = True | |
class MediaType(str, Enum): | |
"""Responses Media types formerly known as MIME types.""" | |
tif = "image/tiff; application=geotiff" | |
jp2 = "image/jp2" | |
png = "image/png" | |
pngraw = "image/png" | |
jpeg = "image/jpeg" | |
jpg = "image/jpg" | |
webp = "image/webp" | |
npy = "application/x-binary" | |
xml = "application/xml" | |
json = "application/json" | |
geojson = "application/geo+json" | |
html = "text/html" | |
text = "text/plain" | |
pbf = "application/x-protobuf" | |
mvt = "application/x-protobuf" | |
class ImageDriver(str, Enum): | |
"""Supported output GDAL drivers.""" | |
jpeg = "JPEG" | |
jpg = "JPEG" | |
png = "PNG" | |
pngraw = "PNG" | |
tif = "GTiff" | |
webp = "WEBP" | |
jp2 = "JP2OpenJPEG" | |
npy = "NPY" | |
class ImageType(str, Enum): | |
"""Available Output image type.""" | |
png = "png" | |
npy = "npy" | |
tif = "tif" | |
jpeg = "jpeg" | |
jpg = "jpg" | |
jp2 = "jp2" | |
webp = "webp" | |
pngraw = "pngraw" | |
@DynamicClassAttribute | |
def profile(self): | |
"""Return rio-tiler image default profile.""" | |
return img_profiles.get(self._name_, {}) | |
@DynamicClassAttribute | |
def driver(self): | |
"""Return rio-tiler image default profile.""" | |
return ImageDriver[self._name_].value | |
@DynamicClassAttribute | |
def mediatype(self): | |
"""Return image media type.""" | |
return MediaType[self._name_].value | |
################################################################### | |
app = FastAPI() | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["GET"], | |
allow_headers=["*"], | |
) | |
@app.get("/deepzoom.dzi", response_class=XMLResponse) | |
def deepzoom( | |
url: str = Query(description="URL"), | |
format: ImageType = Query(ImageType.png, description="Image Format"), | |
): | |
"""DeepZoom metadata.""" | |
with COGReader(url) as dst: | |
info = dst.info() | |
return f"""<?xml version="1.0" encoding="UTF-8"?> | |
<Image xmlns="http://schemas.microsoft.com/deepzoom/2008" | |
Format="{format.name}" | |
Overlap="0" | |
TileSize="256" | |
> | |
<Size | |
Height="{info.height}" | |
Width="{info.width}" | |
/> | |
</Image>""" | |
@app.get("/deepzoom/{z}/{x}_{y}.{format}", response_class=Response) | |
def deepzoom_tile( | |
level: int = Path(None, alias="z"), | |
column: int = Path(None, alias="x"), | |
row: int = Path(None, alias="y"), | |
format: ImageType = Path(ImageType.png, description="Image Format"), | |
url: str = Query(description="URL"), | |
): | |
"""DeepZoom tile.""" | |
tile_size = 256 | |
with COGReader(url) as dst: | |
max_dimension = max(dst.dataset.width, dst.dataset.height) | |
max_level = int(math.ceil(math.log(max_dimension, 2))) | |
assert 0 <= level and level < max_level + 1, 'Invalid pyramid level' | |
# Overview Size | |
level_width = dst.dataset.width // 2 ** (max_level - level) | |
level_height = dst.dataset.height // 2 ** (max_level - level) | |
# Nb tiles for Ovr | |
level_x_tile = int(math.ceil(level_width / tile_size)) | |
level_y_tile = int(math.ceil(level_height / tile_size)) | |
# Output Size | |
width = min(tile_size, level_width - (tile_size * (level_x_tile - 1))) if column == level_x_tile - 1 else tile_size | |
height = min(tile_size, level_height - (tile_size * (level_y_tile - 1))) if row == level_y_tile - 1 else tile_size | |
# BBox | |
x_origin = (dst.dataset.width / level_width) * tile_size * column | |
y_origin = (dst.dataset.height / level_height) * tile_size * row | |
x_max = min(dst.dataset.width, x_origin + dst.dataset.width / level_width * width) | |
y_max = min(dst.dataset.height, y_origin + dst.dataset.height / level_height * height) | |
w = windows.from_bounds( | |
x_origin, | |
y_max, | |
x_max, | |
y_origin, | |
transform=dst.dataset.transform, | |
) | |
img = dst.read(window=w, width=width, height=height) | |
content = img.render(img_format=format.driver, **format.profile) | |
return Response(content, media_type=format.mediatype) | |
@app.get("/info") | |
def info(url: str = Query(description="URL")): | |
with COGReader(url) as dst: | |
return dst.info().dict(exclude_none=True) | |
@app.get( | |
"/tilejson.json", | |
response_model=TileJSON, | |
responses={200: {"description": "Return a tilejson"}}, | |
response_model_exclude_none=True, | |
) | |
def tilejson( | |
request: Request, | |
url: str = Query(description="URL"), | |
): | |
route_params = { | |
"z": "{z}", | |
"x": "{x}", | |
"y": "{y}", | |
} | |
tiles_url = request.url_for("tile", **route_params) | |
qs = [ | |
(key, value) | |
for (key, value) in request.query_params._list | |
] | |
if qs: | |
tiles_url += f"?{urllib.parse.urlencode(qs)}" | |
with COGReader(url) as dst: | |
return { | |
"bounds": dst.geographic_bounds, | |
"minzoom": dst.minzoom, | |
"maxzoom": dst.maxzoom, | |
"tiles": [tiles_url], | |
} | |
@app.get("/tiles/{z}/{x}/{y}", response_class=Response) | |
def tile( | |
z: int, | |
x: int, | |
y: int, | |
url: str = Query(description="URL"), | |
): | |
"""Tile in Local TMS.""" | |
with COGReader(url) as dst: | |
img = dst.tile(x, y, z) | |
format = ImageType.jpeg if img.mask.all() else ImageType.png | |
content = img.render(img_format=format.driver, **format.profile) | |
return Response(content, media_type=format.mediatype) |