Last active
March 12, 2024 21:25
-
-
Save sixy6e/3a0d857de5185a1736d6defb1fc01218 to your computer and use it in GitHub Desktop.
density-ingest
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sample code for tiledb data ingestion to be used as input into cellular density calcs. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ausseabed-pl019-ingested-data/L2/0331_NETasmania_Bergersen_10G/ga_ausseabed_419ae2f5-48c1-4d81-bd1f-18bf80e658cd_stac-metadata.geojson | |
ausseabed-pl019-ingested-data/L2/0337_NETasmania_Challenge072012/ga_ausseabed_db85ad19-235e-4ec9-a0c7-d5dbc4cb635b_stac-metadata.geojson | |
ausseabed-pl019-ingested-data/L2/IN2015_T01-em122/ga_ausseabed_0a505d7e-8e6d-4dfc-9560-8d0833508f29_stac-metadata.geojson | |
ausseabed-pl019-ingested-data/L2/IN2015_T01-em710/ga_ausseabed_25625f0b-ea69-46f5-bceb-edfc6791374e_stac-metadata.geojson | |
ausseabed-pl019-ingested-data/L2/IN2015_T02-em122/ga_ausseabed_ac199f29-20eb-4e9c-956f-c562e069f3c1_stac-metadata.geojson | |
ausseabed-pl019-ingested-data/L2/IN2016_E02-em122/ga_ausseabed_24b4168e-05ae-4061-abb1-da0b6876305b_stac-metadata.geojson | |
ausseabed-pl019-ingested-data/L2/IN2016_T02-em122/ga_ausseabed_cfcffeff-abec-4736-b380-a19c189ef52f_stac-metadata.geojson | |
ausseabed-pl019-ingested-data/L2/IN2016_T02-em710/ga_ausseabed_83ebeb43-4a11-4c85-9596-e2e2867b53f7_stac-metadata.geojson | |
ausseabed-pl019-ingested-data/L2/IN2017_V03-em122/ga_ausseabed_38e02f71-29c5-454c-9788-6c45d06d21bd_stac-metadata.geojson | |
ausseabed-pl019-ingested-data/L2/IN2017_V03-em710/ga_ausseabed_b3d4643d-6b32-4e99-94bf-b5eacd26ed99_stac-metadata.geojson | |
ausseabed-pl019-ingested-data/L2/IN2018_T01-em122/ga_ausseabed_89348d5b-19f0-4b1b-b946-90cdfe73bdfd_stac-metadata.geojson | |
ausseabed-pl019-ingested-data/L2/IN2018_T02-em122/ga_ausseabed_06470ef3-7a88-4097-89a1-ee3cea817241_stac-metadata.geojson | |
ausseabed-pl019-ingested-data/L2/IN2018_T02-em710/ga_ausseabed_e229a7ca-d65b-4bb7-9bd8-22274f07890f_stac-metadata.geojson | |
ausseabed-pl019-ingested-data/L2/IN2018_V04-em122/ga_ausseabed_2ee400ee-d8ce-4fda-a15c-e494d0facb9a_stac-metadata.geojson |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import numpy | |
import tiledb | |
import boto3 | |
import click | |
import structlog | |
from shapely import geometry | |
structlog.configure( | |
processors=[ | |
structlog.stdlib.add_log_level, | |
structlog.processors.TimeStamper(fmt="ISO"), | |
structlog.processors.StackInfoRenderer(), | |
structlog.processors.format_exc_info, | |
structlog.processors.JSONRenderer(sort_keys=True), | |
], | |
) | |
LOG = structlog.get_logger() | |
ROI = geometry.shape( | |
{ | |
"coordinates": ( | |
( | |
(148.526, -40.153), | |
(149.483, -40.153), | |
(149.483, -40.716), | |
(148.526, -40.716), | |
(148.526, -40.153), | |
), | |
), | |
"type": "Polygon", | |
} | |
) | |
STAC_LISTING = "datasets.txt" | |
OUT_URI = "s3://ardc-gmrt-test-data/density-mapping/gmrt-roi-point-cloud.tiledb" | |
def read_stac_doc(uri, ctx): | |
vfs = tiledb.vfs.VFS(ctx=ctx) | |
with vfs.open(uri) as src: | |
md = json.load(src) | |
return md | |
def context(profile): | |
session = boto3.Session(profile_name=profile) | |
creds = session.get_credentials() | |
# region = "us-east-1" if profile == "pl019-data" else "ap-southeast-2" | |
# region = "" if profile == "pl019-data" else "ap-southeast-2" | |
# was having issues when passing None to config; | |
# results in 'None' which is not correct | |
if profile == "pl019-data": | |
config = tiledb.Config( | |
{ | |
"vfs.s3.aws_access_key_id": creds.access_key, | |
"vfs.s3.aws_secret_access_key": creds.secret_key, | |
} | |
) | |
else: | |
config = tiledb.Config( | |
{ | |
"vfs.s3.aws_access_key_id": creds.access_key, | |
"vfs.s3.aws_secret_access_key": creds.secret_key, | |
"vfs.s3.region": "ap-southeast-2", | |
"vfs.s3.aws_session_token": creds.token, | |
} | |
) | |
ctx = tiledb.Ctx(config=config) | |
return ctx | |
def lonlat_domain(): | |
"""Set array lon/lat domain.""" | |
index_filters = tiledb.FilterList([tiledb.ZstdFilter(level=16)]) | |
xdim = tiledb.Dim( | |
"X", | |
domain=(None, None), | |
tile=1000, | |
dtype=numpy.float64, | |
filters=index_filters, | |
) | |
ydim = tiledb.Dim( | |
"Y", | |
domain=(None, None), | |
tile=1000, | |
dtype=numpy.float64, | |
filters=index_filters, | |
) | |
domain = tiledb.Domain(xdim, ydim) | |
return domain | |
def xyz_schema(ctx=None): | |
domain = lonlat_domain() # only 2 dims for the GMRT project | |
attributes = [ | |
tiledb.Attr("Z", dtype=numpy.float32, filters=[tiledb.ZstdFilter(level=16)]), | |
] | |
schema = tiledb.ArraySchema( | |
domain=domain, | |
sparse=True, | |
attrs=attributes, | |
cell_order="hilbert", | |
tile_order="row-major", | |
capacity=100_000, | |
allows_duplicates=True, | |
ctx=ctx, | |
) | |
return schema | |
def create_mbes_array(array_uri, schema, ctx=None): | |
"""Create the TileDB array.""" | |
# with tiledb.scope_ctx(ctx): | |
# tiledb.Array.create(array_uri, schema) | |
tiledb.Array.create(array_uri, schema, ctx=ctx, overwrite=True) | |
def append_ping_dataframe(dataframe, array_uri, ctx=None, chunks=100000, timestamp=None): | |
""" | |
Append the ping dataframe read from a GSF file. | |
Only to be used with sparse arrays. | |
""" | |
kwargs = { | |
"mode": "append", | |
"sparse": True, | |
"ctx": ctx, | |
"timestamp": timestamp, | |
} | |
idxs = [ | |
(start, start + chunks) for start in numpy.arange(0, dataframe.shape[0], chunks) | |
] | |
for idx in idxs: | |
subset = dataframe[idx[0] : idx[1]] | |
tiledb.dataframe_.from_pandas(array_uri, subset, **kwargs) | |
@click.command() | |
@click.option("--profile-name", type=str, required=True, help="AWS profile to use for writing") | |
def main(profile_name): | |
"""Main func.""" | |
with open(STAC_LISTING, "r") as src: | |
stac_docs = [d.strip() for d in src.readlines()] | |
pl019_ctx = context("pl019-data") | |
ctx = context(profile_name) | |
vfs = tiledb.vfs.VFS(ctx=ctx) | |
if vfs.is_dir(OUT_URI): | |
LOG.info("Removing previous array", array_uri=OUT_URI) | |
vfs.remove_dir(OUT_URI) | |
# need to remove the array if we already have one established | |
LOG.info("Creating array", array_uri=OUT_URI) | |
schema = xyz_schema(ctx) | |
tiledb.Array.create(OUT_URI, schema) | |
# tiledb.Array.create(OUT_URI, schema, overwrite=True) | |
# create_mbes_array(OUT_URI, schema, ctx) | |
minx, miny, maxx, maxy = ROI.bounds | |
n_points = 0 | |
versioning_md = [] | |
for i, doc_name in enumerate(stac_docs): | |
tstamp = i + 1 | |
stac_uri = f"s3://{doc_name}" | |
LOG.info("Processing STAC document:", stac_uri=stac_uri, tiledb_timestamp=tstamp) | |
md = read_stac_doc(stac_uri, pl019_ctx) | |
array_uri = md["assets"]["bathymetry_data"]["href"] | |
survey_points = 0 | |
with tiledb.open(array_uri, ctx=pl019_ctx) as ds: | |
LOG.info("Querying points:", array_uri=array_uri) | |
query = ds.query(attrs=["Z"], return_incomplete=True) | |
for df in query.df[minx:maxx, miny:maxy]: | |
n_points += df.shape[0] | |
survey_points += df.shape[0] | |
append_ping_dataframe(df, OUT_URI, ctx=ctx, timestamp=tstamp) | |
versioning_md.append( | |
{ | |
"dataset_uri": stac_uri, | |
"tiledb_timestamp": tstamp, | |
"points_contributed": survey_points, | |
} | |
) | |
LOG.info("Survey Ingested:", stac_uri=stac_uri, points_ingested=f"{survey_points:_}") | |
with tiledb.open(OUT_URI, "w", ctx=ctx) as outds: | |
outds.meta["tiledb_timestamps"] = json.dumps(versioning_md) | |
LOG.info("Finished point cloud ingestion", uri=OUT_URI, total_point=f"{n_points:_}") | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import geopandas | |
import json | |
import matplotlib.pyplot as plt | |
import s3fs | |
from shapely import geometry | |
roi = geometry.shape( | |
{ | |
"coordinates": ( | |
( | |
(148.526, -40.153), | |
(149.483, -40.153), | |
(149.483, -40.716), | |
(148.526, -40.716), | |
(148.526, -40.153), | |
), | |
), | |
"type": "Polygon", | |
} | |
) | |
fs = s3fs.S3FileSystem(profile="pl019-data") | |
with open("datasets.txt") as src: | |
datasets = [i.strip() for i in src.readlines()] | |
geoms = [] | |
for uri in datasets: | |
with fs.open(uri) as src: | |
md = json.load(src) | |
geoms.append(roi.intersection(geometry.shape(md["geometry"]))) | |
basenames = [k.split("/")[2] for k in datasets] | |
gdf = geopandas.GeoDataFrame( | |
{"uri": datasets, "geometry": geoms, "name": basenames}, crs="epsg:4326" | |
) | |
gdf.plot( | |
column="name", | |
categorical=True, | |
legend=True, | |
legend_kwds={"loc": "center left", "bbox_to_anchor": (1, 0.5), "fmt": "{:.0f}"}, | |
cmap="tab20", | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import click | |
import structlog | |
LOG = structlog.get_logger() | |
ROI_TILES = "roi-tiles-info.json" | |
ARRAY_URI = "s3://ardc-gmrt-test-data/density-mapping/gmrt-roi-point-cloud.tiledb/" | |
OUT_CRS = "EPSG:6933" | |
TILEDB_CONFIG = "tdb-config.cfg" | |
def reader_pipe(): | |
pipe = { | |
"type": "readers.tiledb", | |
"strict": False, | |
"array_name": ARRAY_URI, | |
"config_file": TILEDB_CONFIG, | |
"override_srs": "EPSG:4326", | |
} | |
return pipe | |
def proj_pipe(): | |
pipe = { | |
"type": "filters.reprojection", | |
"out_srs": OUT_CRS, | |
} | |
return pipe | |
def writer_pipe_density(tile_info): | |
cell = f"{tile_info['tile_idx'][0]}_{tile_info['tile_idx'][1]}" | |
res = tile_info["transform"][1] | |
out_uri = f"s3://ardc-gmrt-test-data/density-mapping/cell-{cell}-density-{res}.tiledb" | |
pipe = { | |
"type": "writers.gdal", | |
"binmode": True, | |
"filename": out_uri, | |
"gdaldriver": "TileDB", | |
"resolution": res, | |
"origin_x": tile_info["bbox"][0][0], | |
"origin_y": tile_info["bbox"][0][1], | |
"width": tile_info["width"], | |
"height": tile_info["height"], | |
"override_srs": OUT_CRS, | |
"nodata": 0, | |
"output_type": [ | |
"count", | |
], | |
"gdalopts": [ | |
"COMPRESSION=ZSTD", | |
"COMPRESSION_LEVEL=16", | |
"BLOCKXSIZE=256", | |
"BLOCKYSIZE=256", | |
f"TILEDB_CONFIG={TILEDB_CONFIG}", | |
], | |
"data_type": "uint32", | |
} | |
return pipe | |
def writer_pipe_stats(tile_info): | |
cell = f"{tile_info['tile_idx'][0]}_{tile_info['tile_idx'][1]}" | |
res = tile_info["transform"][1] | |
out_uri = f"s3://ardc-gmrt-test-data/density-mapping/cell-{cell}-stats-{res}.tiledb" | |
pipe = { | |
"type": "writers.gdal", | |
"binmode": True, | |
"filename": out_uri, | |
"gdaldriver": "TileDB", | |
"resolution": res, | |
"origin_x": tile_info["bbox"][0][0], | |
"origin_y": tile_info["bbox"][0][1], | |
"width": tile_info["width"], | |
"height": tile_info["height"], | |
"override_srs": OUT_CRS, | |
"nodata": -9999, | |
"output_type": [ | |
"min", | |
"max", | |
"mean", | |
"stdev", | |
], | |
"gdalopts": [ | |
"COMPRESSION=ZSTD", | |
"COMPRESSION_LEVEL=16", | |
"BLOCKXSIZE=256", | |
"BLOCKYSIZE=256", | |
f"TILEDB_CONFIG={TILEDB_CONFIG}", | |
], | |
"data_type": "float32", | |
} | |
return pipe | |
def main(): | |
with open(ROI_TILES, "r") as src: | |
tiles_info = json.load(src) | |
for tile in tiles_info: | |
LOG.info("Processing tile:", **tile) | |
pipeline = [ | |
reader_pipe(), | |
proj_pipe(), | |
writer_pipe_density(tile), | |
] | |
cell = f"{tile['tile_idx'][0]}_{tile['tile_idx'][1]}" | |
res = tile["transform"][1] | |
out_fname = f"cell-{cell}-pdal-pipeline-{res}-density.json" | |
with open(out_fname, "w") as out_src: | |
json.dump(pipeline, out_src, indent=4) | |
pipeline = [ | |
reader_pipe(), | |
proj_pipe(), | |
writer_pipe_stats(tile), | |
] | |
out_fname = f"cell-{cell}-pdal-pipeline-{res}-stats.json" | |
with open(out_fname, "w") as out_src: | |
json.dump(pipeline, out_src, indent=4) | |
if __name__ == "__main__": | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment