Created
December 19, 2018 08:52
-
-
Save HTenkanen/063d2ff6aacba1dbf920220c37040977 to your computer and use it in GitHub Desktop.
# Digiroad to Graph
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import networkx as nx | |
from osgeo import ogr | |
import os | |
def load_graph_from_Digiroad_shape(filepath, direction='AJOSUUNTA', both_ways=2, against=3, along=4, strict=True): | |
"""Generates nx.MultiDiGraph() object from Digiroad 2.0 Shapefile. | |
Parameters | |
---------- | |
filepath : str | |
Filepath to Digiroad 2.0 Shapefile. | |
direction : str | |
Name for column that contains information about the allowed driving directions | |
both_ways : int | |
Value specifying that the road is drivable to both directions. | |
against : int | |
Value specifying that the road is drivable against the digitizing direction. | |
along : int | |
Value specifying that the road is drivable along the digitizing direction. | |
strict : bool | |
Raise error if error is being caught when building the graph. | |
""" | |
graph = nx.MultiDiGraph() | |
shp = ogr.Open(filepath) | |
if shp is None: | |
raise RuntimeError("Unable to open {}".format(filepath)) | |
print("Loading graph from file ..") | |
for lyr in shp: | |
fields = [x.GetName() for x in lyr.schema] | |
for f in lyr: | |
g = f.geometry() | |
if g is None: | |
if strict: | |
raise nx.NetworkXError("Bad data: feature missing geometry") | |
else: | |
continue | |
flddata = [f.GetField(f.GetFieldIndex(x)) for x in fields] | |
attributes = dict(zip(fields, flddata)) | |
attributes["ShpName"] = lyr.GetName() | |
# Digiroad 2.0 specific GeometryType is somekind of number always | |
if isinstance(g.GetGeometryType(), int): | |
coords = g.GetPoint_2D(0) | |
x, y = coords[0], coords[1] | |
# Add x and y to attributes | |
attributes['x'] = x | |
attributes['y'] = y | |
attributes['nodeid'] = coords | |
# Add node to the graph | |
graph.add_node(coords, **attributes) | |
# Get edges | |
for edge in edges_from_line(g, attributes): | |
e1, e2, attr = edge | |
# If road is bi-directional add it also in reversed order | |
if attr[direction] == both_ways: | |
graph.add_edge(e1, e2, **attr) | |
graph.add_edge(e2, e1, **attr) | |
# If road is drivable against the digitization direction (reverse) | |
elif attr[direction] == against: | |
graph.add_edge(e2, e1, **attr) | |
# Otherwise add the edge "normally" | |
elif attr[direction] == along: | |
graph.add_edge(e1, e2, **attr) | |
else: | |
if strict: | |
raise nx.NetworkXError("GeometryType {} not supported". | |
format(g.GetGeometryType())) | |
# Add name | |
graph.graph['name'] = os.path.basename(filepath)[:-4] | |
# Add CRS | |
try: | |
crs = lyr.GetSpatialRef().ExportToProj4() | |
except: | |
crs = None | |
graph.graph['crs'] = crs | |
return graph | |
def edges_from_line(geom, attrs): | |
""" | |
Generate edges for each line in geom | |
Written as a helper for read_shp | |
Parameters | |
---------- | |
geom: ogr line geometry | |
To be converted into an edge or edges | |
attrs: dict | |
Attributes to be associated with all geoms | |
Returns | |
------- | |
edges: generator of edges | |
each edge is a tuple of form | |
(node1_coord, node2_coord, attribute_dict) | |
suitable for expanding into a networkx Graph add_edge call | |
""" | |
if geom.GetGeometryName().startswith('MULTI'): | |
print("Multilinestring") | |
edge_attrs = attrs.copy() | |
last = geom.GetPointCount() - 1 | |
edge_attrs["Wkt"] = geom.ExportToWkt() | |
yield (geom.GetPoint_2D(0), geom.GetPoint_2D(last), edge_attrs) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment