Last active
February 23, 2021 11:39
-
-
Save gnn/5b7e4476ac25013ff02d3d685c1b5811 to your computer and use it in GitHub Desktop.
Filter CSV lines using a shapefile
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python | |
from pathlib import Path | |
import csv | |
import json | |
from shapely.geometry import Point, shape | |
from shapely.prepared import prep | |
from egon.data import subprocess | |
from egon.data.db import credentials | |
state = "Schleswig-Holstein" | |
def target(source): | |
"""Generate the target path corresponding to a source path.""" | |
return Path(source.stem + "." + state + source.suffix) | |
# Select the union of the geometries of Schleswig-Holstein from the | |
# database, convert their projection to the one used in the CSV file, | |
# output the result to stdout as a GeoJSON string and read it into a | |
# prepared shape for filtering. | |
db = credentials() | |
geojson = subprocess.run( | |
["ogr2ogr"] | |
+ ["-s_srs", "epsg:4326"] | |
+ ["-t_srs", "epsg:3035"] | |
+ ["-f", "GeoJSON"] | |
+ ["/vsistdout/"] | |
+ [ | |
f"PG:host={db['HOST']} user='{db['POSTGRES_USER']}'" | |
f" password='{db['POSTGRES_PASSWORD']}' port={db['PORT']}" | |
f" dbname='{db['POSTGRES_DB']}'" | |
] | |
+ [ | |
"-sql", | |
"SELECT ST_Union(geometry)" | |
" FROM boundaries.vg250_lan" | |
f" WHERE gen = '{state}'", | |
], | |
text=True, | |
) | |
features = json.loads(geojson.stdout)["features"] | |
assert ( | |
len(features) == 1 | |
), f"Found {len(features)} geometry features, expected exactly one." | |
schleswig_holstein = prep(shape(features[0]["geometry"])) | |
# This block actually filters lines in the source CSV file and copies | |
# the appropriate ones to the destination. While doing so, it keeps | |
# track of the value of the "Gitter_ID_100m" field of those lines, which | |
# where not discarded. These values are then used, to filter additional | |
# files, also having a "Gitter_ID_100m" field, but lacking "x_mp_100m" | |
# and "y_mp_100m" coordinate fields. | |
csv_file = Path("Zensus_Bevoelkerung_100m-Gitter.csv").resolve(strict=True) | |
with open(csv_file, mode="r", newline="") as input_lines: | |
rows = csv.DictReader(input_lines, delimiter=";") | |
gitter_ids = set() | |
with open(target(csv_file), mode="w", newline="") as destination: | |
output = csv.DictWriter( | |
destination, delimiter=";", fieldnames=rows.fieldnames | |
) | |
output.writeheader() | |
output.writerows( | |
gitter_ids.add(row["Gitter_ID_100m"]) or row | |
for row in rows | |
if schleswig_holstein.intersects( | |
Point(float(row["x_mp_100m"]), float(row["y_mp_100m"])) | |
) | |
) | |
for path in [ | |
Path(p).resolve(strict=True) | |
for p in ["Geb100m.csv", "Haushalte100m.csv", "Wohnungen100m.csv"] | |
]: | |
with open(path, mode="r", newline="", encoding="iso-8859-1") as inputs: | |
rows = csv.DictReader(inputs, delimiter=",") | |
with open( | |
target(path), | |
mode="w", | |
newline="", | |
encoding="iso-8859-1", | |
) as destination: | |
output = csv.DictWriter( | |
destination, delimiter=",", fieldnames=rows.fieldnames | |
) | |
output.writeheader() | |
output.writerows( | |
row for row in rows if row["Gitter_ID_100m"] in gitter_ids | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment