Created
April 11, 2025 06:38
-
-
Save grischard/9e8833859f89cd7c8da38c1e3719bc67 to your computer and use it in GitHub Desktop.
Compare OpenStreetMap admin boundaries to official open data
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env -S uv run --script | |
# /// script | |
# requires-python = ">=3.12" | |
# dependencies = [ | |
# "click", | |
# "requests", | |
# "geopandas", | |
# "shapely", | |
# "pyproj", | |
# "rich", | |
# "types-geopandas", | |
# "types-shapely", | |
# "types-requests", | |
# "types-click", | |
# ] | |
# /// | |
# This script computes the distance from OSM nodes to the administrative boundaries | |
# of Luxembourg, using the Overpass API to fetch the boundaries and a shapefile | |
# of the administrative boundaries. It then annotates the OSM nodes with the computed | |
# distance, and exports the results to a GeoPackage file for QGIS analysis. | |
import xml.etree.ElementTree as ET | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
import click | |
import requests | |
import geopandas as gpd | |
from shapely.geometry import Point | |
from pyproj import Transformer | |
from rich.progress import Progress | |
from typing import Tuple, Optional, Any | |
def load_boundaries(shapefile: str): | |
""" | |
Load the shapefile and compute a unified boundary geometry using unary_union. | |
""" | |
gdf = gpd.read_file(shapefile) | |
boundaries_union = gdf.boundary.union_all() | |
return gdf, boundaries_union | |
def fetch_osm_boundaries() -> str: | |
""" | |
Fetch Luxembourg's admin_level 8 administrative boundaries from Overpass, | |
filtering out any with a website ending in .be or .be/ (border cases) and | |
only recursing on ways to avoid admin_center nodes. | |
""" | |
overpass_query = """ | |
area["name"="Lëtzebuerg"]["admin_level"="2"] -> .lu; | |
rel(area.lu)["admin_level"="8"]["website"!~".*\\.be(/?)$"] -> .admin8; | |
way(r.admin8) -> .way8; | |
node(w.way8); | |
out skel qt; | |
""" | |
response = requests.post( | |
"http://overpass-api.de/api/interpreter", | |
data={"data": overpass_query}, | |
timeout=60, | |
) | |
if response.status_code != 200: | |
click.echo( | |
f"Overpass request failed, code {response.status_code}, response: {response.text}" | |
) | |
raise RuntimeError( | |
f"Overpass API request failed with code {response.status_code}" | |
) | |
return response.text | |
def compute_distance_for_node( | |
node: ET.Element, boundaries: Any, transformer: Optional[Transformer] | |
) -> Optional[float]: | |
""" | |
For a given OSM node, compute the distance (in metres) to the boundaries. | |
The result is rounded to the nearest 0.1 metre. | |
""" | |
try: | |
lat = float(node.attrib["lat"]) | |
lon = float(node.attrib["lon"]) | |
except (KeyError, ValueError): | |
click.echo( | |
f"Node {node.attrib.get('id')} does not have valid lat/lon attributes; skipping." | |
) | |
return None | |
if transformer is not None: | |
x, y = transformer.transform(lon, lat) | |
pt = Point(x, y) | |
else: | |
pt = Point(lon, lat) | |
distance = pt.distance(boundaries) | |
return round(distance, 1) | |
def process_osm( | |
osm_xml: str, boundaries, transformer: Transformer | None | |
) -> ET.Element: | |
""" | |
Process all nodes in the OSM XML concurrently. | |
Each node is annotated with a 'lim_admin_distance' tag. | |
""" | |
root = ET.fromstring(osm_xml) | |
# relations = root.findall("relation") | |
# if len(relations) != 100: | |
# raise ValueError(f"Expected 100 relations in the OSM data. Got: {len(relations)}") | |
# else: | |
# click.echo("Found 100 relations in the OSM data. Choo choo!") | |
nodes = root.findall("node") | |
with Progress() as progress: | |
task = progress.add_task("[cyan]Processing nodes...", total=len(nodes)) | |
with ThreadPoolExecutor(max_workers=10) as executor: | |
future_to_node = { | |
executor.submit( | |
compute_distance_for_node, node, boundaries, transformer | |
): node | |
for node in nodes | |
} | |
for future in as_completed(future_to_node): | |
node = future_to_node[future] | |
try: | |
distance = future.result() | |
if distance is not None: | |
# Append a new tag with the computed distance. | |
tag_elem = ET.Element( | |
"tag", k="lim_admin_distance", v=str(distance) | |
) | |
node.append(tag_elem) | |
except ValueError as exc: | |
click.echo(f"Error processing node {node.attrib.get('id')}: {exc}") | |
progress.update(task, advance=1) | |
return root | |
def export_nodes_to_gpkg( | |
root: ET.Element, transformer: Optional[Transformer], crs: str, output_gpkg: str | |
) -> None: | |
""" | |
Export OSM nodes (with the lim_admin_distance attribute) to a GeoPackage file, | |
creating a layer named 'osm_nodes' for QGIS analysis. | |
""" | |
records = [] | |
for node in root.findall("node"): | |
try: | |
lat = float(node.attrib["lat"]) | |
lon = float(node.attrib["lon"]) | |
except KeyError: | |
continue | |
distance = None | |
for tag in node.findall("tag"): | |
if tag.attrib.get("k") == "lim_admin_distance": | |
distance = tag.attrib.get("v") | |
break | |
if distance is None: | |
click.echo( | |
f"Node at {lat}, {lon} does not have a 'lim_admin_distance' tag; skipping." | |
) | |
continue | |
if transformer is not None: | |
x, y = transformer.transform(lon, lat) | |
pt = Point(x, y) | |
else: | |
pt = Point(lon, lat) | |
records.append( | |
{ | |
"id": node.attrib.get("id"), | |
"lim_admin_distance": distance, | |
"geometry": pt, | |
} | |
) | |
if records: | |
gdf_nodes = gpd.GeoDataFrame(records, geometry="geometry") | |
gdf_nodes.set_crs(crs, inplace=True) | |
gdf_nodes.to_file(output_gpkg, driver="GPKG", layer="osm_nodes") | |
click.echo(f"GeoPackage file written to {output_gpkg}") | |
else: | |
click.echo( | |
"No nodes with computed distance were found; GeoPackage not created." | |
) | |
@click.command() | |
@click.option( | |
"--shapefile", | |
type=click.Path(exists=True), | |
required=True, | |
help="Path to the administrative boundaries shapefile (e.g. LIMADM_COMMUNES.shp).", | |
) | |
@click.option( | |
"--output", default="lim_admin_debug.osm", help="Output OSM file path for JOSM." | |
) | |
@click.option( | |
"--gpkg-output", | |
default="lim_admin_debug.gpkg", | |
help="Output GeoPackage file path for QGIS analysis.", | |
) | |
def main(shapefile: str, output: str, gpkg_output: str): | |
click.echo("Loading shapefile and extracting boundaries...") | |
gdf, boundaries_union = load_boundaries(shapefile) | |
# Create transformer if shapefile CRS is different from EPSG:4326. | |
transformer: Transformer | None = None | |
if gdf.crs is not None and gdf.crs.to_epsg() != 4326: | |
transformer = Transformer.from_crs("EPSG:4326", gdf.crs, always_xy=True) | |
click.echo(f"Transformer created: {gdf.crs} to EPSG:4326") | |
click.echo("Fetching OSM boundaries via Overpass API, takes a while...") | |
osm_xml = fetch_osm_boundaries() | |
click.echo("Processing OSM nodes, reticulating splines, and computing distances...") | |
new_osm_root = process_osm(osm_xml, boundaries_union, transformer) | |
tree = ET.ElementTree(new_osm_root) | |
tree.write(output, encoding="utf-8", xml_declaration=True) | |
click.echo(f"OSM file written to {output}") | |
click.echo("Exporting nodes to GeoPackage for QGIS analysis...") | |
export_nodes_to_gpkg( | |
new_osm_root, | |
transformer, | |
str(gdf.crs) if gdf.crs is not None else "EPSG:4326", | |
gpkg_output, | |
) | |
if __name__ == "__main__": | |
main( | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment