Last active
March 4, 2016 13:16
-
-
Save roman-yepishev/5de31e0ae7446b46b7ff to your computer and use it in GitHub Desktop.
Autonumbering magic for Boston
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import os | |
import sys | |
import xml.sax | |
import xml.sax.saxutils | |
import xml.sax.xmlreader | |
import sqlite3 | |
from shapely.geometry import (Point, MultiPoint, LineString, Polygon, CAP_STYLE) | |
from shapely.geometry.polygon import LinearRing | |
METERS_IN_DEG = 111325 | |
BUFFER_DEFAULT = 500 #meters | |
class BoundedPolygon(Polygon): | |
def bounds_polygon(self): | |
p = self.bounds | |
return Polygon([ | |
(p[0],p[1]), | |
(p[2],p[1]), | |
(p[2],p[3]), | |
(p[0],p[3]) | |
]) | |
class StreetLocator(object): | |
def __init__(self, cache): | |
self._cache = cache | |
def get_street(self, street): | |
# street can consist of multiple ways | |
nodes = [] | |
for way_id in self._cache['way']: | |
is_highway = False | |
is_name_matched = False | |
for key, value in self._cache['way'][way_id]['tags'].items(): | |
if key == 'highway': | |
is_highway = True | |
if key == 'name' and value.lower() == street.lower(): | |
is_name_matched = True | |
if is_highway and is_name_matched: | |
print("Found street segment: {} ({} points)".format( | |
way_id, len(self._cache['way'][way_id]['nodes']))) | |
nodes.extend(self._cache['way'][way_id]['nodes']) | |
points = [] | |
for node_id in nodes: | |
points.append(self._cache['node'][node_id]['location']) | |
street = LineString(MultiPoint(points)) | |
return street | |
class BuildingLocator(object): | |
def __init__(self, cache): | |
self._cache = cache | |
def get_buildings(self, street, buf): | |
approx = buf / METERS_IN_DEG | |
buildings = {} | |
for way_id in self._cache['way']: | |
is_building = False | |
for key, value in self._cache['way'][way_id]['tags'].items(): | |
if key == 'building': | |
is_building = True | |
if is_building: | |
buildings[way_id] = self._cache['way'][way_id]['nodes'] | |
found = 0 | |
matched = 0 | |
result = [] | |
for way, nodes in buildings.items(): | |
found +=1 | |
points = [ self._cache['node'][node]['location'] for node in nodes ] | |
# Figure out geom center | |
outline = BoundedPolygon(MultiPoint(points)).bounds_polygon() | |
if street.distance(outline) <= approx: | |
matched += 1 | |
result.append((way, outline)) | |
print("Matched {} buildings out of {} on the map".format(matched, found)) | |
return result | |
class PropertyDatabase(object): | |
def __init__(self, path): | |
self._conn = sqlite3.connect(path) | |
def match(self, buildings, st_name, st_suf, osm_street): | |
c = self._conn.cursor() | |
c.execute(''' | |
SELECT DISTINCT Location, ST_NUM FROM Property_Assessment_2015 | |
WHERE TRIM(ST_NAME)=? AND TRIM(ST_NAME_SUF)=? | |
''', (st_name, st_suf)) | |
cache = {} | |
for row in c: | |
location = Point( | |
[float(x) for x in row[0].lstrip('(').rstrip(')').split(',')] | |
) | |
housenumber = row[1] | |
cache[location] = housenumber | |
c.close() | |
missing = [] | |
found = 0 | |
matched = 0 | |
result = [] | |
for location, housenumber in cache.items(): | |
matched_iter = False | |
found += 1 | |
for way, outline in buildings: | |
if outline.contains(location): | |
housenumber = housenumber.strip() | |
if housenumber != '': | |
tags = { | |
'addr:housenumber': housenumber, | |
'addr:street': osm_street | |
} | |
else: | |
tags = { | |
'fixme': 'City has no housenumber for this building' | |
} | |
result.append({ | |
'_type': 'way', | |
'id': way, | |
'tags': tags | |
}) | |
matched_iter = True | |
matched += 1 | |
if not matched_iter: | |
print("MISS: {} ({})".format(housenumber, location)) | |
print("Matched {} buildings out of {} found in database".format( | |
matched, found)) | |
return result | |
class JOSMGenerator(xml.sax.saxutils.XMLGenerator): | |
def __init__(self, out, updates): | |
self._updates = {}; | |
self._updating = None | |
for item in updates: | |
if item['_type'] not in self._updates: | |
self._updates[item['_type']] = {} | |
if item['id'] in self._updates[item['_type']]: | |
item['tags']['fixme'] = \ | |
'different address maps to the same building' | |
else: | |
self._updates[item['_type']][item['id']] = item['tags'] | |
super(JOSMGenerator, self).__init__(out, encoding='UTF-8', short_empty_elements=True) | |
def startElement(self, name, attrs): | |
if name in self._updates: | |
if 'id' in attrs and attrs['id'] in self._updates[name]: | |
self._updating = attrs['id'] | |
attrs = dict(attrs) | |
attrs['action'] = 'modify' | |
attrs['user'] = 'ryebread' | |
attrs = xml.sax.xmlreader.AttributesImpl(attrs) | |
super(JOSMGenerator, self).startElement(name, attrs) | |
def endElement(self, name): | |
if name in self._updates: | |
if self._updating is not None: | |
# create fake data. If this works, then it is AMAZING | |
for key, value in self._updates[name][self._updating].items(): | |
self.startElement('tag', {'k': key, 'v': value}) | |
self.endElement('tag') | |
self._updating = None | |
super(JOSMGenerator, self).endElement(name) | |
class OSMContentHandler(xml.sax.ContentHandler): | |
def __init__(self): | |
self._id = None | |
self._tags = {} | |
self.cache = { | |
'node': {}, | |
'way': {}, | |
'relation': {} | |
} | |
def startElement(self, name, attrs): | |
if 'id' in attrs: | |
self._id = attrs['id'] | |
if name == 'node': | |
self.cache['node'][self._id] = { | |
'location': Point(float(attrs['lat']), float(attrs['lon'])) | |
} | |
elif name == 'way': | |
self.cache['way'][self._id] = { | |
'nodes': [] | |
} | |
elif name == 'relation': | |
self.cache['relation'][self._id] = { | |
} | |
elif name == 'tag': | |
key = attrs['k'].strip() | |
value = attrs['v'].strip() | |
if key != '' and value != '': | |
self._tags[key] = value | |
elif name == 'nd': | |
# node reference | |
self.cache['way'][self._id]['nodes'].append( | |
attrs['ref'] | |
) | |
def endElement(self, name): | |
if name in self.cache: | |
self.cache[name][self._id]['tags'] = self._tags | |
self._tags = {} | |
self._id = None | |
class OSMReader(object): | |
def __init__(self): | |
self._parser = xml.sax.make_parser() | |
def read(self, path): | |
handler = OSMContentHandler() | |
self._parser.setContentHandler(handler) | |
with open(path, 'r') as fh: | |
self._parser.parse(fh) | |
return handler.cache | |
class JOSMWriter(object): | |
def __init__(self, input_path, output_path): | |
self._input = input_path | |
self._output = output_path | |
def merge(self, updates): | |
with open(self._input, 'r') as input_fh: | |
with open(self._output, 'w') as output_fh: | |
generator = JOSMGenerator(output_fh, updates) | |
parser = xml.sax.make_parser() | |
parser.setContentHandler(generator) | |
parser.parse(input_fh) | |
if __name__ == "__main__": | |
buf = BUFFER_DEFAULT | |
path = sys.argv[1] | |
street_name = sys.argv[2] | |
property_street_name = sys.argv[3] | |
property_street_name_suf = sys.argv[4] | |
output = os.path.join(os.path.dirname(path), | |
os.path.basename(path).rsplit('.', 1)[0] + | |
'-numbered.osm') | |
cache = OSMReader().read(path) | |
locator = StreetLocator(cache) | |
street = locator.get_street(street_name) | |
locator = BuildingLocator(cache) | |
buildings = locator.get_buildings(street, buf) | |
db = PropertyDatabase('Property_Assessment_2015.sqlite') | |
updates = db.match(buildings, | |
property_street_name, | |
property_street_name_suf, | |
street_name) | |
writer = JOSMWriter(path, output) | |
writer.merge(updates) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment