Created
February 1, 2023 20:19
-
-
Save arbakker/5356d2fc7b0d21a352339612357c4c3e to your computer and use it in GitHub Desktop.
Reproject GML geometries by naively grepping GML file on <gml:poslist> , inserting geometries in GPKG, running transform query in GPKG and writing back <gml:poslist> to GML file #python #gml #fiona #wkt #ogr #xml
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import re | |
import os | |
import sys | |
import tempfile | |
from shapely.geometry import mapping, Polygon | |
import fiona | |
from fiona.crs import from_epsg | |
from osgeo import ogr | |
import lxml.etree as etree | |
def remove_file(filepath): | |
try: | |
os.remove(filepath) | |
except OSError as e: | |
# If it fails, inform the user. | |
print("Error: %s - %s." % (e.filename, e.strerror)) | |
def pretty_print_xml(xml_file_path): | |
parser = etree.XMLParser(remove_blank_text=True) | |
tree = etree.parse(xml_file_path, parser) | |
tree.write(xml_file_path, pretty_print=True, xml_declaration=True) | |
def main(gml_file_path, sql_query, limit=-1): | |
pretty_print_xml(gml_file_path) | |
file = open(gml_file_path, "r") | |
geom_tuples=[] | |
count=0 | |
match_count=0 | |
for line in file: | |
poslist_match = re.match("^\s+<gml:posList>(.*?)</gml:posList>$", line) | |
if poslist_match: | |
geom_tuples.append((count, poslist_match[1].split(" "))) | |
if limit != -1 and match_count > limit: | |
break | |
match_count+=1 | |
count+=1 | |
proj = from_epsg(3035) | |
# Define a polygon feature geometry with one attribute | |
schema = { | |
'geometry': 'Polygon', | |
'properties': {'linenr': 'int'}, | |
} | |
# Write a new Shapefile | |
count=0 | |
tmp_dir = tempfile.mkdtemp() | |
output_gpkg = os.path.join(tmp_dir, 'gml_geometries.gpkg') | |
with fiona.open(output_gpkg, 'w', driver='GPKG', schema=schema, layer="gml_geometries",crs=proj) as c: | |
for geom_tuple in geom_tuples: | |
linenr = geom_tuple[0] | |
poslist = geom_tuple[1] | |
poly = Polygon(list(zip(poslist[1::2], poslist[0::2]))) | |
c.write({ | |
'geometry': mapping(poly), | |
'properties': {'linenr': linenr}, | |
}) | |
if limit != -1 and count > limit: | |
break | |
count+=1 | |
gpkg_source = ogr.Open(output_gpkg, update=True) | |
gpkg_source.ExecuteSQL("select InitSpatialMetaData()") | |
gpkg_source.ExecuteSQL(sql_query) | |
result = gpkg_source.ExecuteSQL("select linenr, ST_AsText(geom) as wkt_geom from gml_geometries;") | |
gml_poslists = {} | |
for line in result: | |
line_nr = int(line.GetField(0)) | |
wkt_geom = line.GetField(1) | |
poslist_match = re.match("^POLYGON\(\((.*?)\)\)$", wkt_geom) | |
if not poslist_match: | |
raise ValueError(f"unable to retrieve coordinates from WKT string: {wkt_geom}") | |
coord_string = poslist_match[1] | |
gml_poslist=" ".join([" ".join(list(reversed(x.strip().split(" ")))) for x in coord_string.split(",")]) | |
gml_poslists[line_nr] = gml_poslist | |
with open(gml_file_path, "r") as f: | |
data = f.readlines() | |
for line_nr in gml_poslists: | |
data[line_nr] = f"<gml:posList>{gml_poslists[line_nr]}</gml:posList>\n" | |
with open(gml_file_path, "w") as f: | |
f.write("".join(data)) | |
del gpkg_source | |
remove_file(output_gpkg) | |
pretty_print_xml(gml_file_path) | |
print("reprojected GML features") | |
if __name__ == "__main__": | |
if len(sys.argv) != 3: | |
raise ValueError("Requires one argument; GML file path to extract geometries") | |
# "update gml_geometries set geom = ST_Transform(SwapCoordinates(ST_Transform(geom, 28992)), 3035) where st_minx(geom) > 4278162 and st_maxy(geom) < 2912726" # | |
main(sys.argv[1], sys.argv[2]) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment