Skip to content

Instantly share code, notes, and snippets.

@vincentsarago
Created August 23, 2019 14:45
Show Gist options
  • Save vincentsarago/076e674bdd0713890200e119bf2c055c to your computer and use it in GitHub Desktop.
Save vincentsarago/076e674bdd0713890200e119bf2c055c to your computer and use it in GitHub Desktop.
"""rio_tiler.sentinel1: Sentinel-1 processing."""
import os
import re
import json
import warnings
import multiprocessing
from functools import partial
from concurrent import futures
import numpy as np
from boto3.session import Session as boto3_session
import mercantile
from affine import Affine
import rasterio
from rasterio import windows
from rasterio.crs import CRS
from rasterio.vrt import WarpedVRT
from rasterio.warp import transform_bounds
from rasterio.enums import Resampling
from rio_tiler import utils
from rio_tiler.mercator import get_zooms
from rio_tiler.errors import TileOutsideBounds, InvalidBandName, InvalidSentinelSceneId
# ref: https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor
MAX_THREADS = int(os.environ.get("MAX_THREADS", multiprocessing.cpu_count() * 5))
REGION = os.environ.get("AWS_REGION", "eu-central-1")
SENTINEL_BUCKET = "sentinel-s1-l1c"
SENTINEL_BANDS = ["vv", "vh"]
def _aws_get_object(bucket, key, request_pays=True):
"""AWS s3 get object content."""
session = boto3_session(region_name=REGION)
s3 = session.client("s3")
params = {"Bucket": bucket, "Key": key}
if request_pays:
params["RequestPayer"] = "requester"
response = s3.get_object(**params)
return response["Body"].read()
def _sentinel_parse_scene_id(sceneid):
"""Parse Sentinel-1 scene id.
Attributes
----------
sceneid : str
Sentinel-1 sceneid.
Returns
-------
out : dict
dictionary with metadata constructed from the sceneid.
e.g:
_sentinel_parse_scene_id('S1A_IW_GRDH_1SDV_20180716T004042_20180716T004107_022812_02792A_FD5B')
{
"sensor": "1",
"satellite": "A",
"beam": "IW",
"product": "GRD",
"resolution": "H",
"processing_level": "1",
"product_class": "S",
"polarisation": "DV",
"startDateTime": "20180716T004042",
"stopDateTime": "20180716T004107",
"absolute_orbit": "022812",
"mission_task": "02792A",
"product_id": "FD5B",
"key": "GRD/2018/7/16/IW/DV/S1A_IW_GRDH_1SDV_20180716T004042_20180716T004107_022812_02792A_FD5B",
"scene": "S1A_IW_GRDH_1SDV_20180716T004042_20180716T004107_022812_02792A_FD5B",
}
"""
if not re.match(
"^S1[AB]_(IW)|(EW)_[A-Z]{3}[FHM]_[0-9][SA][A-Z]{2}_[0-9]{8}T[0-9]{6}_[0-9]{8}T[0-9]{6}_[0-9A-Z]{6}_[0-9A-Z]{6}_[0-9A-Z]{4}$",
sceneid,
):
raise InvalidSentinelSceneId("Could not match {}".format(sceneid))
sentinel_pattern = (
r"^S"
r"(?P<sensor>\w{1})"
r"(?P<satellite>[AB]{1})"
r"_"
r"(?P<beam>[A-Z]{2})"
r"_"
r"(?P<product>[A-Z]{3})"
r"(?P<resolution>[FHM])"
r"_"
r"(?P<processing_level>[0-9])"
r"(?P<product_class>[SA])"
r"(?P<polarisation>(SH)|(SV)|(DH)|(DV)|(HH)|(HV)|(VV)|(VH))"
r"_"
r"(?P<startDateTime>[0-9]{8}T[0-9]{6})"
r"_"
r"(?P<stopDateTime>[0-9]{8}T[0-9]{6})"
r"_"
r"(?P<absolute_orbit>[0-9]{6})"
r"_"
r"(?P<mission_task>[0-9A-Z]{6})"
r"_"
r"(?P<product_id>[0-9A-Z]{4})$"
)
meta = re.match(sentinel_pattern, sceneid, re.IGNORECASE).groupdict()
year = meta["startDateTime"][0:4]
month = meta["startDateTime"][4:6].strip("0")
day = meta["startDateTime"][6:8].strip("0")
meta["key"] = "{}/{}/{}/{}/{}/{}/{}".format(
meta["product"], year, month, day, meta["beam"], meta["polarisation"], sceneid
)
meta["scene"] = sceneid
return meta
def _get_bounds(scene_info):
product_info = json.loads(
_aws_get_object(
SENTINEL_BUCKET, "{}/productInfo.json".format(scene_info["key"])
)
)
geom = product_info["footprint"]
xyz = list(zip(*geom["coordinates"][0]))
return min(xyz[0]), min(xyz[1]), max(xyz[0]), max(xyz[1])
def bounds(sceneid):
"""
Retrieve image bounds.
Attributes
----------
sceneid : str
Sentinel-1 sceneid.
Returns
-------
out : dict
dictionary with image bounds.
"""
scene_params = _sentinel_parse_scene_id(sceneid)
return {"sceneid": sceneid, "bounds": list(_get_bounds(scene_params))}
def _sentinel_stats(
src_path,
overview_level=None,
max_size=1024,
percentiles=(2, 98),
dst_crs=CRS({"init": "EPSG:4326"}),
histogram_bins=10,
histogram_range=None,
):
"""
Retrieve landsat dataset statistics.
Attributes
----------
src_path : str
Sentinel-1 measurement path
overview_level : int, optional
Overview (decimation) level to fetch.
max_size: int, optional
Maximum size of dataset to retrieve
(will be used to calculate the overview level to fetch).
percentiles : tulple, optional
Percentile or sequence of percentiles to compute,
which must be between 0 and 100 inclusive (default: (2, 98)).
dst_crs: CRS or dict
Target coordinate reference system (default: EPSG:4326).
histogram_bins: int, optional
Defines the number of equal-width histogram bins (default: 10).
histogram_range: tuple or list, optional
The lower and upper range of the bins. If not provided, range is simply
the min and max of the array.
Returns
-------
out : dict
(percentiles), min, max, stdev, histogram for each band,
e.g.
{
"vv": {
'pc': [15, 121],
'min': 1,
'max': 162,
'std': 27.22067722127997,
'histogram': [
[102934, 135489, 20981, 13548, 11406, 8799, 7351, 5622, 2985, 662]
[1., 17.1, 33.2, 49.3, 65.4, 81.5, 97.6, 113.7, 129.8, 145.9, 162.]
]
}
}
"""
with rasterio.open(src_path) as src_dst:
width = src_dst.width
height = src_dst.height
levels = src_dst.overviews(1)
if len(levels):
if overview_level:
decim = levels[overview_level]
else:
# determine which zoom level to read
for ii, decim in enumerate(levels):
if max(width // decim, height // decim) < max_size:
break
else:
decim = 1
warnings.warn(
"Dataset has no overviews, reading the full dataset", NoOverviewWarning
)
out_shape = (len(src_dst.indexes), height // decim, width // decim)
arr = src_dst.read(out_shape=out_shape, masked=True)
arr[arr == 0] = np.ma.masked
params = {}
if histogram_bins:
params.update(dict(bins=histogram_bins))
if histogram_range:
params.update(dict(range=histogram_range))
stats = {
src_dst.indexes[b]: utils._stats(arr[b], percentiles=percentiles, **params)
for b in range(arr.shape[0])
}
vrt_params = dict(src_crs=src_dst.gcps[1], crs=src_dst.gcps[1], src_transform=src_dst.gcps[2])
with WarpedVRT(src_dst, **vrt_params) as vrt_dst:
bounds = transform_bounds(
*[vrt_dst.crs, dst_crs] + list(vrt_dst.bounds), densify_pts=21
)
minzoom, maxzoom = get_zooms(vrt_dst)
return {
"bounds": {
"value": bounds,
"crs": dst_crs.to_string() if isinstance(dst_crs, CRS) else dst_crs,
},
"minzoom": minzoom,
"maxzoom": maxzoom,
"statistics": stats,
}
def metadata(sceneid, pmin=2, pmax=98, bands=None, **kwargs):
"""
Retrieve image bounds and band statistics.
Attributes
----------
sceneid : str
Sentinel-2 sceneid.
pmin : int, optional, (default: 2)
Histogram minimum cut.
pmax : int, optional, (default: 98)
Histogram maximum cut.
kwargs : optional
These are passed to 'rio_tiler.sentinel2._sentinel_stats'
e.g: histogram_bins=20'
Returns
-------
out : dict
Dictionary with image bounds and bands statistics.
"""
if not bands:
raise InvalidBandName("bands is required")
if not isinstance(bands, tuple):
bands = tuple((bands,))
for band in bands:
if band not in SENTINEL_BANDS:
raise InvalidBandName("{} is not a valid Sentinel band name".format(band))
scene_params = _sentinel_parse_scene_id(sceneid)
sentinel_address = "s3://{}/{}/measurement".format(
SENTINEL_BUCKET, scene_params["key"]
)
addresses = [
"{}/{}-{}.tiff".format(sentinel_address, scene_params["beam"].lower(), band)
for band in bands
]
_stats_worker = partial(
_sentinel_stats,
percentiles=(pmin, pmax),
**kwargs
)
with futures.ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
responses = list(executor.map(_stats_worker, addresses))
info = {
"sceneid": sceneid,
"bounds": responses[0]["bounds"],
"minzoom": responses[0]["minzoom"],
"maxzoom": responses[0]["maxzoom"],
}
info["statistics"] = {
b: v
for b, d in zip(bands, responses)
for k, v in d["statistics"].items()
}
return info
def _tile_read(
src_dst,
bounds,
tilesize,
indexes=None,
nodata=None,
resampling_method="bilinear",
tile_edge_padding=2,
dst_crs=CRS({"init": "EPSG:3857"}),
bounds_crs=None,
):
"""
Read data and mask.
Attributes
----------
src_dst : rasterio.io.DatasetReader
rasterio.io.DatasetReader object
bounds : list
Output bounds (left, bottom, right, top) in target crs ("dst_crs").
tilesize : int
Output image size
indexes : list of ints or a single int, optional, (defaults: None)
If `indexes` is a list, the result is a 3D array, but is
a 2D array if it is a band index number.
nodata: int or float, optional (defaults: None)
resampling_method : str, optional (default: "bilinear")
Resampling algorithm.
tile_edge_padding : int, optional (default: 2)
Padding to apply to each edge of the tile when retrieving data
to assist in reducing resampling artefacts along edges.
dst_crs: CRS or str, optional
Target coordinate reference system (default "epsg:3857").
bounds_crs: CRS or str, optional
Overwrite bounds coordinate reference system (default None, equal to dst_crs).
Returns
-------
out : array, int
returns pixel value.
"""
if isinstance(indexes, int):
indexes = [indexes]
elif isinstance(indexes, tuple):
indexes = list(indexes)
if not bounds_crs:
bounds_crs = dst_crs
bounds = transform_bounds(*[bounds_crs, dst_crs] + list(bounds), densify_pts=21)
src_crs = src_dst.gcps[1]
src_transform = src_dst.gcps[2]
with WarpedVRT(src_dst, src_crs=src_crs, crs=src_crs, src_transform=src_transform) as vrt_dst:
vrt_transform, vrt_width, vrt_height = utils.get_vrt_transform(
vrt_dst, bounds, dst_crs=dst_crs
)
out_window = windows.Window(
col_off=0, row_off=0, width=vrt_width, height=vrt_height
)
if tile_edge_padding > 0 and not utils._requested_tile_aligned_with_internal_tile(
vrt_dst, bounds, tilesize
):
vrt_transform = vrt_transform * Affine.translation(
-tile_edge_padding, -tile_edge_padding
)
orig_vrt_height = vrt_height
orig_vrt_width = vrt_width
vrt_height = vrt_height + 2 * tile_edge_padding
vrt_width = vrt_width + 2 * tile_edge_padding
out_window = windows.Window(
col_off=tile_edge_padding,
row_off=tile_edge_padding,
width=orig_vrt_width,
height=orig_vrt_height,
)
vrt_params = dict(
add_alpha=True,
crs=dst_crs,
resampling=Resampling[resampling_method],
transform=vrt_transform,
width=vrt_width,
height=vrt_height
)
indexes = indexes if indexes is not None else src_dst.indexes
out_shape = (len(indexes), tilesize, tilesize)
nodata = nodata if nodata is not None else src_dst.nodata
if nodata is not None:
vrt_params.update(dict(nodata=nodata, add_alpha=False, src_nodata=nodata))
with WarpedVRT(src_dst, src_crs=src_crs, src_transform=src_transform, **vrt_params) as vrt_dst:
data = vrt_dst.read(
out_shape=out_shape,
indexes=indexes,
window=out_window,
resampling=Resampling[resampling_method],
)
mask = vrt_dst.dataset_mask(out_shape=(tilesize, tilesize), window=out_window)
return data, mask
def tile(sceneid, tile_x, tile_y, tile_z, bands=None, tilesize=256, **kwargs):
"""
Create mercator tile from Sentinel-1 data.
Attributes
----------
sceneid : str
Sentinel-2 sceneid.
tile_x : int
Mercator tile X index.
tile_y : int
Mercator tile Y index.
tile_z : int
Mercator tile ZOOM level.
bands : tuple, str, required
Bands indexes.
tilesize : int, optional (default: 256)
Output image size.
Returns
-------
data : numpy ndarray
mask: numpy array
"""
if not bands:
raise InvalidBandName("bands is required")
if not isinstance(bands, tuple):
bands = tuple((bands,))
for band in bands:
if band not in SENTINEL_BANDS:
raise InvalidBandName("{} is not a valid Sentinel band name".format(band))
scene_params = _sentinel_parse_scene_id(sceneid)
wgs_bounds = _get_bounds(scene_params)
if not utils.tile_exists(wgs_bounds, tile_z, tile_x, tile_y):
raise TileOutsideBounds(
"Tile {}/{}/{} is outside image bounds".format(tile_z, tile_x, tile_y)
)
mercator_tile = mercantile.Tile(x=tile_x, y=tile_y, z=tile_z)
tile_bounds = mercantile.xy_bounds(mercator_tile)
sentinel_address = "s3://{}/{}/measurement".format(
SENTINEL_BUCKET, scene_params["key"]
)
addresses = [
"{}/{}-{}.tiff".format(sentinel_address, scene_params["beam"].lower(), band)
for band in bands
]
def _s1_tiler(src_path):
with rasterio.open(src_path) as src_dst:
return _tile_read(
src_dst, bounds=tile_bounds, tilesize=tilesize, nodata=0
)
with futures.ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
data, masks = zip(*list(executor.map(_s1_tiler, addresses)))
mask = np.all(masks, axis=0).astype(np.uint8) * 255
return np.concatenate(data), mask
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment