Skip to content

Instantly share code, notes, and snippets.

@betolink
Created September 22, 2023 02:01
Show Gist options
  • Save betolink/b6a9902257e12d5d18705af786654341 to your computer and use it in GitHub Desktop.
Save betolink/b6a9902257e12d5d18705af786654341 to your computer and use it in GitHub Desktop.
earthaccess-h5coro-coiled
#!/usr/bin/env python
import coiled
import geopandas as gpd
import numpy as np
import pandas as pd
from rich import print as rprint
from itertools import product
from pqdm.threads import pqdm
import earthaccess
from h5coro import s3driver, webdriver
import h5coro
def get_strong_beams(f):
"""Returns ground track for strong beams based on IS2 orientation"""
orient = f['orbit_info/sc_orient'][0]
if orient == 0:
return [f"gt{i}l" for i in [1, 2, 3]]
elif orient == 1:
return [f"gt{i}r" for i in [1, 2, 3]]
else:
raise KeyError("Spacecraft orientation neither forward nor backward")
def read_atl10(files, bounding_box=None, executors=4, environment="local", credentials=None):
"""Returns a consolidated GeoPandas dataframe for a set of ATL10 file pointers.
Parameters:
files (list[S3FSFile]): list of authenticated fsspec file references to ATL10 on S3 (via earthaccess)
executors (int): number of threads
"""
if environment == "local":
driver = webdriver.HTTPDriver
else:
driver = s3driver.S3Driver
GPS_EPOCH = pd.to_datetime('1980-01-06 00:00:00')
def read_h5coro(file):
"""Reads datasets required for creating gridded freeboard from a single ATL10 file
file: an authenticated fsspec file reference on S3 (returned by earthaccess)
returns: a list of geopandas dataframes
"""
# Open file object
h5 = h5coro.H5Coro(file, driver, credentials=credentials)
# Get strong beams based on orientation
ancillary_datasets = ["orbit_info/sc_orient", "ancillary_data/atlas_sdp_gps_epoch"]
f = h5.readDatasets(datasets=ancillary_datasets, block=True)
strong_beams = get_strong_beams(f)
atlas_sdp_gps_epoch = f["ancillary_data/atlas_sdp_gps_epoch"][:]
# Create list of datasets to load
datasets = ["freeboard_segment/latitude",
"freeboard_segment/longitude",
"freeboard_segment/delta_time",
"freeboard_segment/seg_dist_x",
"freeboard_segment/heights/height_segment_length_seg",
"freeboard_segment/beam_fb_height",
"freeboard_segment/heights/height_segment_type"]
ds_list = ["/".join(p) for p in list(product(strong_beams, datasets))]
# Load datasets
f = h5.readDatasets(datasets=ds_list, block=True)
# rprint(f["gt2l/freeboard_segment/latitude"], type(f["gt2l/freeboard_segment/latitude"]))
# Create a list of geopandas.DataFrames containing beams
tracks = []
for beam in strong_beams:
ds = {dataset.split("/")[-1]: f[dataset][:] for dataset in ds_list if dataset.startswith(beam)}
# Convert delta_time to datetime
ds["delta_time"] = GPS_EPOCH + pd.to_timedelta(ds["delta_time"]+atlas_sdp_gps_epoch, unit='s')
# we don't need nanoseconds to grid daily let alone weekly
ds["delta_time"] = ds["delta_time"].astype('datetime64[s]')
# Add beam identifier
ds["beam"] = beam
# Set fill values to NaN - assume 100 m as threshold
ds["beam_fb_height"] = np.where(ds["beam_fb_height"] > 100, np.nan, ds["beam_fb_height"])
geometry = gpd.points_from_xy(ds["longitude"], ds["latitude"])
del ds["longitude"]
del ds["latitude"]
gdf = gpd.GeoDataFrame(ds, geometry=geometry, crs="EPSG:4326")
gdf.dropna(axis=0, inplace=True)
if bounding_box is not None:
bbox = [float(coord) for coord in bounding_box.split(",")]
gdf = gdf.cx[bbox[0]:bbox[2],bbox[1]:bbox[3]]
tracks.append(gdf)
df = pd.concat(tracks)
return df
dfs = pqdm(files, read_h5coro, n_jobs=executors)
combined = pd.concat(dfs)
return combined
#!/usr/bin/env python
import coiled
import geopandas as gpd
import numpy as np
import pandas as pd
from rich import print as rprint
from itertools import product
import argparse
import earthaccess
from h5coro import h5coro, s3driver
from read_atl10 import read_atl10
if __name__ == "__main__":
rprint(f"executing locally")
parser = argparse.ArgumentParser()
parser.add_argument('--bbox', help='bbox')
parser.add_argument('--year', help='year to process')
parser.add_argument('--env', help='execute in the cloud or local, default:local')
parser.add_argument('--out', help='output file name')
args = parser.parse_args()
auth = earthaccess.login()
# ross_sea = (-180, -78, -160, -74)
# antarctic = (-180, -90, 180, -60)
year = int(args.year)
bbox = tuple([float(c) for c in args.bbox.split(",")])
print(f"Searching ATL10 data for year {year} ...")
granules = earthaccess.search_data(
short_name = 'ATL10',
version = '006',
cloud_hosted = True,
bounding_box = bbox,
temporal = (f'{args.year}-06-01',f'{args.year}-09-30'),
count=4,
debug=True
)
if args.env == "local":
files = [g.data_links(access="out_of_region")[0] for g in granules]
credentials = earthaccess.__auth__.token["access_token"]
df = read_atl10(files, bounding_box=args.bbox, environment="local", credentials=credentials)
else:
files = [g.data_links(access="direct")[0].replace("s3://", "") for g in granules]
aws_credentials = earthaccess.get_s3_credentials("NSIDC")
credentials = {
"aws_access_key_id": aws_credentials["accessKeyId"],
"aws_secret_access_key": aws_credentials["secretAccessKey"],
"aws_session_token": aws_credentials["sessionToken"]
}
@coiled.function(region= "us-west-2",
memory= "4 GB",
keepalive="1 HOUR")
def cloud_runnner(files, bounding_box, credentials):
df = read_atl10(files, bounding_box=bounding_box, environment="cloud", credentials=credentials)
return df
df = cloud_runnner(files, args.bbox, credentials=credentials)
df.to_parquet(f"{args.out}.parquet")
rprint(df)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment