Last active
January 24, 2024 13:45
-
-
Save jgomezdans/c245d3f3c98243a8b5ce422a5ca1c148 to your computer and use it in GitHub Desktop.
Sentinel1 from Planetary Computer: Angle of incidence
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from io import StringIO | |
from typing import Optional, List | |
import numpy as np | |
import xml.etree.ElementTree as ET | |
import pandas as pd | |
import requests | |
import xarray as xr | |
from scipy.interpolate import griddata | |
import pyproj | |
import stackstac | |
class CatalogItem: | |
# Assuming this class exists with necessary attributes | |
pass | |
def get_schema_product_urls(item: CatalogItem, catalog) -> List[str]: | |
""" | |
Extracts and returns a list of URLs for a given catalog item. | |
This function addresses issues with STAC objects pointing to incorrect | |
metadata files. | |
Parameters: | |
- item (CatalogItem): An object representing a catalog item. | |
- catalog: A stac catalogue | |
Returns: | |
- List[str]: A list of URL strings. | |
""" | |
grd_item = catalog.get_collection("sentinel-1-grd").get_item( | |
item.id.replace("_rtc", "") | |
) | |
if grd_item is None: | |
raise ValueError("Catalog item not found.") | |
urls = [] | |
for polarization in ["vv", "vh"]: | |
href = grd_item.assets[f"schema-product-{polarization}"].href | |
rfi_path = f"/rfi/rfi-iw-{polarization}.xml" | |
clean_url = ( | |
href.replace(rfi_path, f"/iw-{polarization}.xml") | |
if rfi_path in href | |
else href | |
) | |
urls.append(clean_url) | |
return urls | |
def extract_geolocation_data(xml_source: str) -> Optional[pd.DataFrame]: | |
""" | |
Extracts geolocation data from an XML source (file or URL) and returns it | |
as a pandas DataFrame. | |
This function looks for 'geolocationGridPoint' elements in the XML source | |
and extracts the 'latitude', 'longitude', 'incidenceAngle', and | |
'elevationAngle' from each element. | |
Parameters: | |
- xml_source (str): The path to the XML file or a URL to the XML content. | |
Returns: | |
- pd.DataFrame: A DataFrame containing the extracted data, or None if no | |
data is found. | |
Raises: | |
- FileNotFoundError: If the specified file does not exist and the source | |
is not a valid URL. | |
- ET.ParseError: If there is an error in parsing the XML content. | |
- requests.RequestException: If downloading from the URL fails. | |
""" | |
try: | |
# Check if the source is a URL | |
if xml_source.startswith("http://") or xml_source.startswith( | |
"https://" | |
): | |
response = requests.get(xml_source) | |
response.raise_for_status() # Raises HTTPError for bad requests | |
xml_content = StringIO(response.text) | |
tree = ET.parse(xml_content) | |
else: | |
# Load and parse the XML file | |
tree = ET.parse(xml_source) | |
root = tree.getroot() | |
# Extract geolocationGridPoint elements | |
geolocation_grid_points = root.findall(".//geolocationGridPoint") | |
# Sanity check: Ensure that geolocationGridPoint elements are found | |
if not geolocation_grid_points: | |
print("No geolocationGridPoint elements found in the XML source.") | |
return None | |
# Extract the required data | |
data = [] | |
for point in geolocation_grid_points: | |
data.append( | |
{ | |
"latitude": point.find("latitude").text | |
if point.find("latitude") is not None | |
else None, | |
"longitude": point.find("longitude").text | |
if point.find("longitude") is not None | |
else None, | |
"incidenceAngle": point.find("incidenceAngle").text | |
if point.find("incidenceAngle") is not None | |
else None, | |
"elevationAngle": point.find("elevationAngle").text | |
if point.find("elevationAngle") is not None | |
else None, | |
} | |
) | |
# Create and return a pandas DataFrame | |
return pd.DataFrame(data) | |
except FileNotFoundError as e: | |
raise FileNotFoundError(f"File not found: {xml_source}") from e | |
except ET.ParseError as e: | |
raise ET.ParseError( | |
f"Error parsing XML content from source: {xml_source}" | |
) from e | |
except requests.RequestException as e: | |
raise requests.RequestException( | |
f"Error downloading content from URL: {xml_source}" | |
) from e | |
def interpolate_to_raster( | |
data_array: xr.DataArray, | |
df: pd.DataFrame, | |
dst_crs: str, | |
src_crs: str = "EPSG:4326", | |
field: str = "incidenceAngle", | |
) -> np.ndarray: | |
""" | |
Perform bilinear interpolation on a DataFrame with lat/long coordinates | |
to create a 2D raster matching the shape of an xarray DataArray with | |
an arbitrary geographical projection. | |
:param data_array: xarray DataArray to match the shape, with a known CRS. | |
:param df: DataFrame containing lat/long coordinates and values. | |
:param src_crs: Source CRS in PROJ format (e.g., 'EPSG:4326' for lat/long). | |
:param dst_crs: Destination CRS of the DataArray. | |
:return: 2D numpy array with interpolated values. | |
""" | |
# Initialize coordinate transformation | |
transformer = pyproj.Transformer.from_crs(src_crs, dst_crs, always_xy=True) | |
# Transform coordinates from lat/long to DataArray's CRS | |
transformed_points = np.array( | |
[ | |
transformer.transform(x, y) | |
for x, y in zip(df["longitude"], df["latitude"]) | |
] | |
) | |
# Extract transformed coordinates and values | |
points = transformed_points | |
values = df[field].to_numpy() | |
grid_x, grid_y = np.meshgrid(data_array.coords["x"], data_array.coords["y"]) | |
# Perform bilinear interpolation | |
raster = griddata(points, values, (grid_x, grid_y), method="linear") | |
return raster | |
def convert_array_stack_to_xarray( | |
ds: xr.Dataset, new_band1: xr.DataArray, new_band2: xr.DataArray | |
) -> xr.Dataset: | |
""" | |
Converts two numpy arrays (new_band1 and new_band2) to xarray | |
DataArrays and integrates them into an existing xarray Dataset. | |
Parameters: | |
- ds (xr.Dataset): The existing xarray Dataset. | |
- new_band1 (xr.DataArray): The first numpy array to be converted. | |
- new_band2 (xr.DataArray): The second numpy array to be converted. | |
Returns: | |
- xr.Dataset: The updated xarray Dataset with new_band1 and new_band2 | |
added. | |
""" | |
# Extract the non-band specific coordinates from existing data | |
non_band_coords = { | |
k: v for k, v in ds.coords.items() if "band" not in v.dims | |
} | |
# Convert new_band1 and new_band2 to xarray DataArrays with | |
# dimensions and coordinates | |
new_band1_da = xr.DataArray( | |
new_band1, dims=["time", "y", "x"], coords=non_band_coords | |
) | |
new_band2_da = xr.DataArray( | |
new_band2, dims=["time", "y", "x"], coords=non_band_coords | |
) | |
# Assign metadata to new DataArrays | |
for k in ["title", "description"]: | |
new_band1_da.coords[k] = "Incidence angle [deg] VV" | |
new_band2_da.coords[k] = "Incidence angle [deg] VH" | |
# Add a new band dimension to these DataArrays and update the dataset | |
new_band1_da = new_band1_da.expand_dims({"band": [2]}) | |
new_band1_da.coords["band"] = ["incidenceAngleVH"] | |
new_band2_da = new_band2_da.expand_dims({"band": [3]}) | |
new_band2_da.coords["band"] = ["incidenceAngleVV"] | |
# Combine the original and new DataArrays into a single dataset | |
return xr.concat([ds, new_band1_da, new_band2_da], dim="band") | |
def process_angles(catalog, items: List[CatalogItem], ds: xr.Dataset) -> xr.Dataset: | |
""" | |
Processes angle data for a given list of catalog items and integrates | |
it into an xarray Dataset. | |
Parameters: | |
- catalog [?]: A stac catalog object | |
- items (List[CatalogItem]): A list of catalog items to process. | |
- ds (xr.Dataset): The initial xarray Dataset. | |
Returns: | |
- xr.Dataset: The updated xarray Dataset with processed angle data. | |
""" | |
raster_vv, raster_vh = [], [] | |
missing_metadata = [] | |
for ii, item in enumerate(items): | |
try: | |
metadatas = get_schema_product_urls(item, catalog) | |
except ValueError: | |
print("No metadata found for item.") | |
missing_metadata.append(ii) | |
continue | |
for polarization, url in zip(["vv", "vh"], metadatas): | |
df = extract_geolocation_data(url) | |
if polarization == "vv": | |
raster_vv.append(interpolate_to_raster(ds, df, dst_crs=ds.crs)) | |
elif polarization == "vh": | |
raster_vh.append(interpolate_to_raster(ds, df, dst_crs=ds.crs)) | |
valid_items = [x for j, x in enumerate(items) if j not in missing_metadata] | |
raster_vv, raster_vh = np.array(raster_vv), np.array(raster_vh) | |
# Assuming stackstac and convert_array_stack_to_xarray functions are defined elsewhere | |
ds_new = stackstac.stack( | |
valid_items, bounds_latlon=ds.attrs["bounds_latlon"], epsg=ds.crs | |
) | |
combined_array = convert_array_stack_to_xarray( | |
ds_new, raster_vv, raster_vh | |
) | |
return combined_array |
This file has been truncated, but you can view the full file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from io import StringIO | |
from typing import Optional, List | |
import numpy as np | |
import xml.etree.ElementTree as ET | |
import pandas as pd | |
import requests | |
import xarray as xr | |
from scipy.interpolate import griddata | |
import pyproj | |
import stackstac | |
class CatalogItem: | |
# Assuming this class exists with necessary attributes | |
pass | |
def get_schema_product_urls(item: CatalogItem, catalog) -> List[str]: | |
""" | |
Extracts and returns a list of URLs for a given catalog item. | |
This function addresses issues with STAC objects pointing to incorrect | |
metadata files. | |
Parameters: | |
- item (CatalogItem): An object representing a catalog item. | |
- catalog: A stac catalogue | |
Returns: | |
- List[str]: A list of URL strings. | |
""" | |
grd_item = catalog.get_collection("sentinel-1-grd").get_item( | |
item.id.replace("_rtc", "") | |
) | |
if grd_item is None: | |
raise ValueError("Catalog item not found.") | |
urls = [] | |
for polarization in ["vv", "vh"]: | |
href = grd_item.assets[f"schema-product-{polarization}"].href | |
rfi_path = f"/rfi/rfi-iw-{polarization}.xml" | |
clean_url = ( | |
href.replace(rfi_path, f"/iw-{polarization}.xml") | |
if rfi_path in href | |
else href | |
) | |
urls.append(clean_url) | |
return urls | |
def extract_geolocation_data(xml_source: str) -> Optional[pd.DataFrame]: | |
""" | |
Extracts geolocation data from an XML source (file or |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment