Skip to content

Instantly share code, notes, and snippets.

@sdtaylor
Created May 25, 2022 17:55
Show Gist options
  • Save sdtaylor/067ede6a060ddbde86df1f19af2ec9d6 to your computer and use it in GitHub Desktop.
Save sdtaylor/067ede6a060ddbde86df1f19af2ec9d6 to your computer and use it in GitHub Desktop.
eolearn_example
# Built-in modules
import os
import datetime
import itertools
from glob import glob
from aenum import MultiValueEnum
# Basics of Python data handling and visualization
import numpy as np
np.random.seed(42)
import geopandas as gpd
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap, BoundaryNorm
from shapely.geometry import Polygon, box
from tqdm.auto import tqdm
import bottleneck as bn
# Machine learning
import lightgbm as lgb
import joblib
from sklearn import metrics
from sklearn import preprocessing
# Imports from eo-learn and sentinelhub-py
from eolearn.core import (
EOTask,
EOPatch,
EOWorkflow,
linearly_connect_tasks,
FeatureType,
OverwritePermission,
LoadTask,
SaveTask,
EOExecutor,
MergeFeatureTask,
)
from eolearn.io import SentinelHubInputTask, VectorImportTask, ExportToTiffTask
from eolearn.geometry import VectorToRasterTask, ErosionTask
from eolearn.features import LinearInterpolationTask, SimpleFilterTask, NormalizedDifferenceIndexTask
from eolearn.ml_tools import FractionSamplingTask
from sentinelhub import UtmZoneSplitter, DataCollection
# Example from https://eo-learn.readthedocs.io/en/latest/examples/land-cover-map/SI_LULC_pipeline.html
#--------------------------------------------
# Folder where data for running the notebook is stored
DATA_FOLDER = os.path.join(".", "data")
# Locations for collected data and intermediate results
EOPATCH_FOLDER = os.path.join(DATA_FOLDER, "eopatches")
EOPATCH_TIF_FOLDER = os.path.join(DATA_FOLDER, "eopatches_tif")
EOPATCH_SAMPLES_FOLDER = os.path.join(DATA_FOLDER, "eopatches_sampled")
RESULTS_FOLDER = os.path.join(DATA_FOLDER, "results")
for folder in (EOPATCH_FOLDER, EOPATCH_TIF_FOLDER, EOPATCH_SAMPLES_FOLDER, RESULTS_FOLDER):
os.makedirs(folder, exist_ok=True)
#--------------------------------------------
class SentinelHubValidDataTask(EOTask):
"""
Combine Sen2Cor's cloud probability map with `IS_DATA` to define a `VALID_DATA_SH` mask
The SentinelHub's cloud mask is asumed to be found in eopatch.mask['CLM']
"""
def __init__(self, output_feature, max_pixel_cloud_probability=0.05):
self.output_feature = output_feature
self.max_pixel_cloud_probability = max_pixel_cloud_probability
def execute(self, eopatch):
clp = eopatch.data['CLP'] / 255
#eopatch[self.output_feature] = eopatch.mask["IS_DATA"].astype(bool) & (~eopatch.mask["CLM"].astype(bool))
eopatch[self.output_feature] = eopatch.mask["IS_DATA"].astype(bool) & (clp<=self.max_pixel_cloud_probability)
return eopatch
class AddValidCountTask(EOTask):
"""
The task counts number of valid observations in time-series and stores the results in the timeless mask.
"""
def __init__(self, count_what, feature_name):
self.what = count_what
self.name = feature_name
def execute(self, eopatch):
eopatch[FeatureType.MASK_TIMELESS, self.name] = np.count_nonzero(eopatch.mask[self.what], axis=0)
return eopatch
class AddCloudFreeAllBandMosaic(EOTask):
"""
Across all BANDS, get the most recent cloud free/valid data pixel.
Based off SentinelHubValidDataTask which creates the IS_VALID mask.
Uses the bottleneck.push function to fill nan values
"""
def __init__(self, feature_name):
self.name = feature_name
def execute(self, eopatch):
# broadcast valid pixels across all bands
all_band_valid_pixels = np.broadcast_to(eopatch.mask['IS_VALID'], eopatch.data['BANDS'].shape)
band_data = eopatch.data['BANDS'].copy()
band_data[~all_band_valid_pixels] = np.nan
# Cloud free mosaic from multiple image dates.
# cloud/nodata pixels are nan. bn.push will fill nans with
# prior values (working from index=0 forward along axis 0)
# band_data shape here is (time, height, width, band), so after pushing
# non-nan values forward along axis 0, the final timestep should have
# mostly valid pixels, or potentially nans if *every* date was cloudy.
band_data = bn.push(band_data, axis=0)
eopatch[FeatureType.DATA_TIMELESS, self.name] = band_data[-1]
return eopatch
#--------------------------------------------
# square regions around mato groso
region_shape = box(minx=-56.4, miny=-13.4, maxx=-53.1, maxy=-11.0)
region_shape_crs = 'EPSG:4326'
# Create a splitter to obtain a list of bboxes with 5km sides
bbox_splitter = UtmZoneSplitter([region_shape], region_shape_crs, 5000)
bbox_list = np.array(bbox_splitter.get_bbox_list())
info_list = np.array(bbox_splitter.get_info_list())
# Prepare info of selected EOPatches
geometry = [bbox.transform(crs=region_shape_crs).geometry for bbox in bbox_list]
idxs = [info["index"] for info in info_list]
idxs_x = [info["index_x"] for info in info_list]
idxs_y = [info["index_y"] for info in info_list]
bbox_gdf = gpd.GeoDataFrame({"index": idxs, "index_x": idxs_x, "index_y": idxs_y}, crs=region_shape_crs, geometry=geometry)
bbox_gdf.to_file('./patches_shapes.geojson',driver='GeoJSON')
# a 2x2 patch selected by hand
patch_IDs = [1606, 1607, 1660, 1661]
#--------------------------------------------
# BAND DATA
# Add a request for S2 bands.
# Here we also do a simple filter of cloudy scenes (on tile level).
# The s2cloudless masks and probabilities are requested via additional data.
band_names = ["B03", "B04", "B08"]
add_data = SentinelHubInputTask(
bands_feature=(FeatureType.DATA, "BANDS"),
bands=band_names,
resolution=10,
maxcc=0.8,
time_difference=datetime.timedelta(minutes=120),
data_collection=DataCollection.SENTINEL2_L1C,
additional_data=[(FeatureType.MASK, "dataMask", "IS_DATA"), (FeatureType.MASK, "CLM"), (FeatureType.DATA, "CLP")],
max_threads=5,
)
# CALCULATING NEW FEATURES
# NDVI: (B08 - B04)/(B08 + B04)
# NDWI: (B03 - B08)/(B03 + B08)
# NDBI: (B11 - B08)/(B11 + B08)
ndvi = NormalizedDifferenceIndexTask(
(FeatureType.DATA, "BANDS"), (FeatureType.DATA, "NDVI"), [band_names.index("B08"), band_names.index("B04")]
)
# VALIDITY MASK
# Validate pixels using SentinelHub's cloud detection mask and region of acquisition
add_sh_validmask = SentinelHubValidDataTask(output_feature = (FeatureType.MASK, "IS_VALID"),
max_pixel_cloud_probability = 0.05)
# COUNTING VALID PIXELS
# Count the number of valid observations per pixel using valid data mask
add_mosaic = AddCloudFreeAllBandMosaic("BANDS_MOSAIC")
# SAVING TO OUTPUT (if needed)
save = SaveTask(EOPATCH_FOLDER, overwrite_permission=OverwritePermission.OVERWRITE_PATCH)
# Define the workflow
workflow_nodes = linearly_connect_tasks(
add_data, ndvi, add_sh_validmask, add_mosaic, save
)
workflow = EOWorkflow(workflow_nodes)
# Let's visualize it
#workflow.dependency_graph()
# Time interval for the SH request
time_interval = ["2019-05-01", "2019-05-31"]
# Define additional parameters of the workflow
input_node = workflow_nodes[0]
save_node = workflow_nodes[-1]
execution_args = []
for idx, bbox in enumerate(bbox_list[patch_IDs]):
execution_args.append(
{
input_node: {"bbox": bbox, "time_interval": time_interval},
save_node: {"eopatch_folder": f"eopatch_{idx}"},
}
)
# Execute the workflow
executor = EOExecutor(workflow, execution_args, save_logs=True)
executor.run(workers=4)
executor.make_report()
failed_ids = executor.get_failed_executions()
if failed_ids:
raise RuntimeError(
f"Execution failed EOPatches with IDs:\n{failed_ids}\n"
f"For more info check report at {executor.get_report_path()}"
)
#------------------------------------------------
save_tif = ExportToTiffTask(feature = (FeatureType.DATA_TIMELESS, 'BANDS_MOSAIC'),
folder = EOPATCH_TIF_FOLDER)
patch_folders = glob(EOPATCH_FOLDER + '/eopatch_*')
for patch_i, patch_f in enumerate(patch_folders):
patch = EOPatch.load(patch_f)
tif_filename = 'patch_{}.tif'.format(patch_i)
save_tif.execute(patch, filename=tif_filename)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment