Skip to content

Instantly share code, notes, and snippets.

@esgn
Created November 15, 2024 08:44
Show Gist options
  • Save esgn/78a80bee116a633b65612241c8040d4a to your computer and use it in GitHub Desktop.
Save esgn/78a80bee116a633b65612241c8040d4a to your computer and use it in GitHub Desktop.
Generate PDAL pipeline to download from LIDAR HD COPC coverage
import sys
import argparse
import geopandas as gpd
import pandas as pd
import json
def parse_args():
parser = argparse.ArgumentParser(
"Create JSON PDAL pipelines to download point cloud extract")
parser.add_argument("--input_footprints", "-i",
help="input footprints file",
default="bati_bdtopo.gpkg")
parser.add_argument("--buffer_size", "-b",
help="buffer size in coordinates units",
default="20")
parser.add_argument("--output_laz", "-o",
help="output laz filename",
default="extract.laz")
parser.add_argument("--output_pipeline", "-p",
help="output pipeline filename",
default="pipeline.json")
return parser.parse_args()
def create_pipeline(urls, bbox, output_filename):
content = []
bounds = ([bbox[0], bbox[2]], [bbox[1], bbox[3]])
for url in urls:
tile = {}
tile["type"] = "readers.copc"
tile["filename"] = url
tile["bounds"] = str(bounds)
content.append(tile)
filter = {}
filter["type"] = "filters.assign"
filter["value"] = "Classification=6 WHERE Classification==67"
content.append(filter)
writer = {}
writer["type"] = "writers.las"
writer["forward"] = "all"
writer["extra_dims"] = "all"
writer["a_srs"] = "EPSG:2154"
writer["minor_version"] = "4"
writer["dataformat_id"] = "6"
writer["filename"] = output_filename
content.append(writer)
pipeline = json.dumps(content, indent=2)
return pipeline
def main():
args = parse_args()
# Load the footprints file
footprints = gpd.read_file(
args.input_footprints, engine='pyogrio', use_arrow=True)
# Get footprints global bounding box
bounds = footprints.total_bounds.tolist()
# Add buffer to bounding box
b = float(args.buffer_size)
buffered_bbox = [bounds[0]-b, bounds[1]-b, bounds[2]+b, bounds[3]+b]
# Load the copc index
bbox = ','.join(str(s) for s in buffered_bbox)
wfs_url=(
f"https://data.geopf.fr/private/wfs/?service=WFS&version=2.0.0&apikey=interface_catalogue"
f"&request=GetFeature"
f"&typeNames=ta_lidar-hd:dalle"
f"&bbox={bbox}"
f"&outputFormat=csv"
f"&PROPERTYNAME=url"
)
data = pd.read_csv(wfs_url)
urls = data["url"].tolist()
# write pdal pipeline
pipeline = create_pipeline(urls, buffered_bbox, args.output_laz)
with open(args.output_pipeline, "w") as dest:
dest.write(pipeline)
if __name__ == '__main__':
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment