Skip to content

Instantly share code, notes, and snippets.

@grischard
Created April 11, 2025 06:38
Show Gist options
  • Save grischard/9e8833859f89cd7c8da38c1e3719bc67 to your computer and use it in GitHub Desktop.
Save grischard/9e8833859f89cd7c8da38c1e3719bc67 to your computer and use it in GitHub Desktop.
Compare OpenStreetMap admin boundaries to official open data
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "click",
# "requests",
# "geopandas",
# "shapely",
# "pyproj",
# "rich",
# "types-geopandas",
# "types-shapely",
# "types-requests",
# "types-click",
# ]
# ///
# This script computes the distance from OSM nodes to the administrative boundaries
# of Luxembourg, using the Overpass API to fetch the boundaries and a shapefile
# of the administrative boundaries. It then annotates the OSM nodes with the computed
# distance, and exports the results to a GeoPackage file for QGIS analysis.
import xml.etree.ElementTree as ET
from concurrent.futures import ThreadPoolExecutor, as_completed
import click
import requests
import geopandas as gpd
from shapely.geometry import Point
from pyproj import Transformer
from rich.progress import Progress
from typing import Tuple, Optional, Any
def load_boundaries(shapefile: str):
"""
Load the shapefile and compute a unified boundary geometry using unary_union.
"""
gdf = gpd.read_file(shapefile)
boundaries_union = gdf.boundary.union_all()
return gdf, boundaries_union
def fetch_osm_boundaries() -> str:
"""
Fetch Luxembourg's admin_level 8 administrative boundaries from Overpass,
filtering out any with a website ending in .be or .be/ (border cases) and
only recursing on ways to avoid admin_center nodes.
"""
overpass_query = """
area["name"="Lëtzebuerg"]["admin_level"="2"] -> .lu;
rel(area.lu)["admin_level"="8"]["website"!~".*\\.be(/?)$"] -> .admin8;
way(r.admin8) -> .way8;
node(w.way8);
out skel qt;
"""
response = requests.post(
"http://overpass-api.de/api/interpreter",
data={"data": overpass_query},
timeout=60,
)
if response.status_code != 200:
click.echo(
f"Overpass request failed, code {response.status_code}, response: {response.text}"
)
raise RuntimeError(
f"Overpass API request failed with code {response.status_code}"
)
return response.text
def compute_distance_for_node(
node: ET.Element, boundaries: Any, transformer: Optional[Transformer]
) -> Optional[float]:
"""
For a given OSM node, compute the distance (in metres) to the boundaries.
The result is rounded to the nearest 0.1 metre.
"""
try:
lat = float(node.attrib["lat"])
lon = float(node.attrib["lon"])
except (KeyError, ValueError):
click.echo(
f"Node {node.attrib.get('id')} does not have valid lat/lon attributes; skipping."
)
return None
if transformer is not None:
x, y = transformer.transform(lon, lat)
pt = Point(x, y)
else:
pt = Point(lon, lat)
distance = pt.distance(boundaries)
return round(distance, 1)
def process_osm(
osm_xml: str, boundaries, transformer: Transformer | None
) -> ET.Element:
"""
Process all nodes in the OSM XML concurrently.
Each node is annotated with a 'lim_admin_distance' tag.
"""
root = ET.fromstring(osm_xml)
# relations = root.findall("relation")
# if len(relations) != 100:
# raise ValueError(f"Expected 100 relations in the OSM data. Got: {len(relations)}")
# else:
# click.echo("Found 100 relations in the OSM data. Choo choo!")
nodes = root.findall("node")
with Progress() as progress:
task = progress.add_task("[cyan]Processing nodes...", total=len(nodes))
with ThreadPoolExecutor(max_workers=10) as executor:
future_to_node = {
executor.submit(
compute_distance_for_node, node, boundaries, transformer
): node
for node in nodes
}
for future in as_completed(future_to_node):
node = future_to_node[future]
try:
distance = future.result()
if distance is not None:
# Append a new tag with the computed distance.
tag_elem = ET.Element(
"tag", k="lim_admin_distance", v=str(distance)
)
node.append(tag_elem)
except ValueError as exc:
click.echo(f"Error processing node {node.attrib.get('id')}: {exc}")
progress.update(task, advance=1)
return root
def export_nodes_to_gpkg(
root: ET.Element, transformer: Optional[Transformer], crs: str, output_gpkg: str
) -> None:
"""
Export OSM nodes (with the lim_admin_distance attribute) to a GeoPackage file,
creating a layer named 'osm_nodes' for QGIS analysis.
"""
records = []
for node in root.findall("node"):
try:
lat = float(node.attrib["lat"])
lon = float(node.attrib["lon"])
except KeyError:
continue
distance = None
for tag in node.findall("tag"):
if tag.attrib.get("k") == "lim_admin_distance":
distance = tag.attrib.get("v")
break
if distance is None:
click.echo(
f"Node at {lat}, {lon} does not have a 'lim_admin_distance' tag; skipping."
)
continue
if transformer is not None:
x, y = transformer.transform(lon, lat)
pt = Point(x, y)
else:
pt = Point(lon, lat)
records.append(
{
"id": node.attrib.get("id"),
"lim_admin_distance": distance,
"geometry": pt,
}
)
if records:
gdf_nodes = gpd.GeoDataFrame(records, geometry="geometry")
gdf_nodes.set_crs(crs, inplace=True)
gdf_nodes.to_file(output_gpkg, driver="GPKG", layer="osm_nodes")
click.echo(f"GeoPackage file written to {output_gpkg}")
else:
click.echo(
"No nodes with computed distance were found; GeoPackage not created."
)
@click.command()
@click.option(
"--shapefile",
type=click.Path(exists=True),
required=True,
help="Path to the administrative boundaries shapefile (e.g. LIMADM_COMMUNES.shp).",
)
@click.option(
"--output", default="lim_admin_debug.osm", help="Output OSM file path for JOSM."
)
@click.option(
"--gpkg-output",
default="lim_admin_debug.gpkg",
help="Output GeoPackage file path for QGIS analysis.",
)
def main(shapefile: str, output: str, gpkg_output: str):
click.echo("Loading shapefile and extracting boundaries...")
gdf, boundaries_union = load_boundaries(shapefile)
# Create transformer if shapefile CRS is different from EPSG:4326.
transformer: Transformer | None = None
if gdf.crs is not None and gdf.crs.to_epsg() != 4326:
transformer = Transformer.from_crs("EPSG:4326", gdf.crs, always_xy=True)
click.echo(f"Transformer created: {gdf.crs} to EPSG:4326")
click.echo("Fetching OSM boundaries via Overpass API, takes a while...")
osm_xml = fetch_osm_boundaries()
click.echo("Processing OSM nodes, reticulating splines, and computing distances...")
new_osm_root = process_osm(osm_xml, boundaries_union, transformer)
tree = ET.ElementTree(new_osm_root)
tree.write(output, encoding="utf-8", xml_declaration=True)
click.echo(f"OSM file written to {output}")
click.echo("Exporting nodes to GeoPackage for QGIS analysis...")
export_nodes_to_gpkg(
new_osm_root,
transformer,
str(gdf.crs) if gdf.crs is not None else "EPSG:4326",
gpkg_output,
)
if __name__ == "__main__":
main(
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment