Skip to content

Instantly share code, notes, and snippets.

@vincentsarago
Last active October 12, 2022 15:57
Show Gist options
  • Save vincentsarago/6ab3be1f2d11961c7a43c3ddbfb762c3 to your computer and use it in GitHub Desktop.
Save vincentsarago/6ab3be1f2d11961c7a43c3ddbfb762c3 to your computer and use it in GitHub Desktop.
import urllib
import math
from enum import Enum
from types import DynamicClassAttribute
from typing import List, Optional, Tuple
from fastapi import FastAPI, Query, Path
from pydantic import BaseModel, Field, root_validator
from rio_tiler.io import COGReader
from rio_tiler.profiles import img_profiles
from rasterio import windows
from starlette.responses import Response
from starlette.requests import Request
from starlette.middleware.cors import CORSMiddleware
###################################################################
# FROM TITILER
###################################################################
class XMLResponse(Response):
"""XML Response"""
media_type = "application/xml"
class SchemeEnum(str, Enum):
"""TileJSON scheme choice."""
xyz = "xyz"
tms = "tms"
class TileJSON(BaseModel):
"""
TileJSON model.
Based on https://github.com/mapbox/tilejson-spec/tree/master/2.2.0
"""
tilejson: str = "2.2.0"
name: Optional[str]
description: Optional[str]
version: str = "1.0.0"
attribution: Optional[str]
template: Optional[str]
legend: Optional[str]
scheme: SchemeEnum = SchemeEnum.xyz
tiles: List[str]
grids: Optional[List[str]]
data: Optional[List[str]]
minzoom: int = Field(0, ge=0, le=30)
maxzoom: int = Field(30, ge=0, le=30)
bounds: List[float] = [-180, -90, 180, 90]
center: Optional[Tuple[float, float, int]]
@root_validator
def compute_center(cls, values):
"""Compute center if it does not exist."""
bounds = values["bounds"]
if not values.get("center"):
values["center"] = (
(bounds[0] + bounds[2]) / 2,
(bounds[1] + bounds[3]) / 2,
values["minzoom"],
)
return values
class Config:
"""TileJSON model configuration."""
use_enum_values = True
class MediaType(str, Enum):
"""Responses Media types formerly known as MIME types."""
tif = "image/tiff; application=geotiff"
jp2 = "image/jp2"
png = "image/png"
pngraw = "image/png"
jpeg = "image/jpeg"
jpg = "image/jpg"
webp = "image/webp"
npy = "application/x-binary"
xml = "application/xml"
json = "application/json"
geojson = "application/geo+json"
html = "text/html"
text = "text/plain"
pbf = "application/x-protobuf"
mvt = "application/x-protobuf"
class ImageDriver(str, Enum):
"""Supported output GDAL drivers."""
jpeg = "JPEG"
jpg = "JPEG"
png = "PNG"
pngraw = "PNG"
tif = "GTiff"
webp = "WEBP"
jp2 = "JP2OpenJPEG"
npy = "NPY"
class ImageType(str, Enum):
"""Available Output image type."""
png = "png"
npy = "npy"
tif = "tif"
jpeg = "jpeg"
jpg = "jpg"
jp2 = "jp2"
webp = "webp"
pngraw = "pngraw"
@DynamicClassAttribute
def profile(self):
"""Return rio-tiler image default profile."""
return img_profiles.get(self._name_, {})
@DynamicClassAttribute
def driver(self):
"""Return rio-tiler image default profile."""
return ImageDriver[self._name_].value
@DynamicClassAttribute
def mediatype(self):
"""Return image media type."""
return MediaType[self._name_].value
###################################################################
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["GET"],
allow_headers=["*"],
)
@app.get("/deepzoom.dzi", response_class=XMLResponse)
def deepzoom(
url: str = Query(description="URL"),
format: ImageType = Query(ImageType.png, description="Image Format"),
):
"""DeepZoom metadata."""
with COGReader(url) as dst:
info = dst.info()
return f"""<?xml version="1.0" encoding="UTF-8"?>
<Image xmlns="http://schemas.microsoft.com/deepzoom/2008"
Format="{format.name}"
Overlap="0"
TileSize="256"
>
<Size
Height="{info.height}"
Width="{info.width}"
/>
</Image>"""
@app.get("/deepzoom/{z}/{x}_{y}.{format}", response_class=Response)
def deepzoom_tile(
level: int = Path(None, alias="z"),
column: int = Path(None, alias="x"),
row: int = Path(None, alias="y"),
format: ImageType = Path(ImageType.png, description="Image Format"),
url: str = Query(description="URL"),
):
"""DeepZoom tile."""
tile_size = 256
with COGReader(url) as dst:
max_dimension = max(dst.dataset.width, dst.dataset.height)
max_level = int(math.ceil(math.log(max_dimension, 2)))
assert 0 <= level and level < max_level + 1, 'Invalid pyramid level'
# Overview Size
level_width = dst.dataset.width // 2 ** (max_level - level)
level_height = dst.dataset.height // 2 ** (max_level - level)
# Nb tiles for Ovr
level_x_tile = int(math.ceil(level_width / tile_size))
level_y_tile = int(math.ceil(level_height / tile_size))
# Output Size
width = min(tile_size, level_width - (tile_size * (level_x_tile - 1))) if column == level_x_tile - 1 else tile_size
height = min(tile_size, level_height - (tile_size * (level_y_tile - 1))) if row == level_y_tile - 1 else tile_size
# BBox
x_origin = (dst.dataset.width / level_width) * tile_size * column
y_origin = (dst.dataset.height / level_height) * tile_size * row
x_max = min(dst.dataset.width, x_origin + dst.dataset.width / level_width * width)
y_max = min(dst.dataset.height, y_origin + dst.dataset.height / level_height * height)
w = windows.from_bounds(
x_origin,
y_max,
x_max,
y_origin,
transform=dst.dataset.transform,
)
img = dst.read(window=w, width=width, height=height)
content = img.render(img_format=format.driver, **format.profile)
return Response(content, media_type=format.mediatype)
@app.get("/info")
def info(url: str = Query(description="URL")):
with COGReader(url) as dst:
return dst.info().dict(exclude_none=True)
@app.get(
"/tilejson.json",
response_model=TileJSON,
responses={200: {"description": "Return a tilejson"}},
response_model_exclude_none=True,
)
def tilejson(
request: Request,
url: str = Query(description="URL"),
):
route_params = {
"z": "{z}",
"x": "{x}",
"y": "{y}",
}
tiles_url = request.url_for("tile", **route_params)
qs = [
(key, value)
for (key, value) in request.query_params._list
]
if qs:
tiles_url += f"?{urllib.parse.urlencode(qs)}"
with COGReader(url) as dst:
return {
"bounds": dst.geographic_bounds,
"minzoom": dst.minzoom,
"maxzoom": dst.maxzoom,
"tiles": [tiles_url],
}
@app.get("/tiles/{z}/{x}/{y}", response_class=Response)
def tile(
z: int,
x: int,
y: int,
url: str = Query(description="URL"),
):
"""Tile in Local TMS."""
with COGReader(url) as dst:
img = dst.tile(x, y, z)
format = ImageType.jpeg if img.mask.all() else ImageType.png
content = img.render(img_format=format.driver, **format.profile)
return Response(content, media_type=format.mediatype)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment