Last active
February 2, 2023 16:30
-
-
Save yellowcap/4a0c5f23b5c6997202a5a1df7ea4b3e3 to your computer and use it in GitHub Desktop.
Sentinel-2 RGB image overlapping with GeoJSON feature served through fastapi
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
response = requests.post( | |
"http://127.0.0.1:8000/rgb/?start=2022-01-01&end=2022-01-31&mode=ndvi", | |
json={ | |
"bbox": [128.774676, -22.256217, 128.789557, -22.241022], | |
"geometry": { | |
"coordinates": [ | |
[ | |
[128.77626, -22.24742], | |
[128.783557, -22.241022], | |
[128.789557, -22.244932], | |
[128.786677, -22.253995], | |
[128.780149, -22.256217], | |
[128.774676, -22.251907], | |
[128.77626, -22.24742], | |
] | |
], | |
"type": "Polygon", | |
}, | |
"properties": None, | |
"type": "Feature", | |
}, | |
) | |
if response.status_code != 200: | |
print(response.json()) | |
else: | |
with open("fastapi-test.png", "wb") as f: | |
f.write(response.content) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io | |
from datetime import date | |
from enum import Enum | |
from math import ceil | |
from typing import Union | |
from rio_tiler.colormap import cmap | |
import numpy as np | |
import pyproj | |
from fastapi import FastAPI, Query, Response | |
from geojson_pydantic import Feature | |
from geojson_pydantic.types import BBox | |
from PIL import Image, ImageOps | |
from rasterio import Affine | |
from rasterio.features import bounds, rasterize | |
from rio_tiler.io import COGReader | |
from satsearch import Search | |
from shapely import Geometry | |
from shapely.geometry import shape | |
from shapely.ops import transform | |
RGB_BANDS = ["red", "green", "blue"] | |
NDVI_BANDS = ["red", "nir"] | |
SCALE = 10 | |
app = FastAPI() | |
def transform_geometry(geom: Feature, epsg: int) -> Geometry: | |
""" | |
Reproject a GeoJson Pydantic Feature and return it as Shapely Geometry. | |
""" | |
project = pyproj.Transformer.from_proj( | |
pyproj.Proj(init="epsg:4326"), | |
pyproj.Proj(init=f"epsg:{epsg}"), | |
) | |
if geom.geometry is None: | |
raise ValueError("Can not transform an empty geometry") | |
return transform( | |
project.transform, | |
shape(geom.geometry.dict()), | |
) | |
def compute_transform(geom: Geometry, scale: int) -> tuple[Affine, int, int]: | |
""" | |
Compute the geotransform and image size for the rasterized geometry. The | |
input scale is expected to be in the coordinate system units of the epsg | |
provided. | |
""" | |
extent = bounds(geom) | |
width = ceil((extent[2] - extent[0]) / scale) | |
height = ceil((extent[3] - extent[1]) / scale) | |
geotransform = Affine(scale, 0, extent[0], 0, -scale, extent[3]) | |
print(f"Computed width {width} and height {height}") | |
return geotransform, width, height | |
def get_pixels( | |
href: str, bbox: Union[BBox, None], width: int, height: int | |
) -> np.ndarray: | |
""" | |
Retrieve pixel values for a raster over a bounding box at the | |
given resolution. | |
""" | |
print(f"Getting data for {href}") | |
with COGReader(href) as raster: | |
return raster.part(bbox, width=width, height=height) | |
class BandCombo(str, Enum): | |
rgb = "rgb" | |
ndvi = "ndvi" | |
def bands(self): | |
if self == "rgb": | |
return RGB_BANDS | |
else: | |
return NDVI_BANDS | |
def get_array(aoi: Feature, start: date, end: date, mode: BandCombo) -> np.ndarray: | |
""" | |
Get | |
""" | |
search = Search( | |
url="https://earth-search.aws.element84.com/v0", | |
bbox=aoi.bbox, | |
datetime=f"{start}T00:00:00Z/{end}T23:59:59Z", | |
query={"eo:cloud_cover": {"lt": 90}}, | |
collections=["sentinel-s2-l2a-cogs"], | |
) | |
items = [item for item in search.items()] | |
print(f"Search found {len(items)} scenes") | |
items.sort(key=lambda x: x.properties["eo:cloud_cover"]) | |
item = items[0] | |
print(item) | |
epsg = item.properties["proj:epsg"] | |
aoi_transformed = transform_geometry(aoi, epsg) | |
geotransform, width, height = compute_transform(aoi_transformed, SCALE) | |
aoi_rasterized = rasterize( | |
[aoi_transformed], | |
out_shape=(height, width), | |
transform=geotransform, | |
all_touched=True, | |
fill=0, | |
default_value=1, | |
) | |
hrefs = [item.asset(band)["href"] for band in mode.bands()] | |
X = [] | |
for href in hrefs: | |
pixels = get_pixels(href, aoi.bbox, width, height) | |
pixels_data = pixels.data[0] | |
pixels_data_masked = pixels_data * aoi_rasterized | |
X.append(pixels_data_masked) | |
return np.array(X, dtype="float32") | |
@app.post("/rgb/", response_class=Response) | |
async def rgb(aoi: Feature, start: date, end: date, mode: BandCombo): | |
""" | |
Return an RGB image over the input geometry based on imagery from | |
within the given time interval. | |
""" | |
X = get_array(aoi, start, end, mode) | |
if mode == BandCombo.rgb: | |
X = np.multiply(np.clip(X / 3000, 0, 1), 255) | |
X = X.swapaxes(0, 1).swapaxes(1, 2) | |
img = Image.fromarray(X.astype("uint8"), mode="RGB") | |
else: | |
ndvi = (X[1] - X[0]) / (X[1] + X[0]) | |
ndvi_scaled = 255* (np.nan_to_num(ndvi, nan=-1) + 1) / 2 | |
img = Image.fromarray(ndvi_scaled.astype("uint8")) | |
with io.BytesIO() as output: | |
img.save(output, format="PNG") | |
data = output.getvalue() | |
return Response(content=data, media_type="image/png") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment