-
-
Save mvexel/2668003 to your computer and use it in GitHub Desktop.
OSM Tiger expansion code
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
"""This is the base library that can used to run various OSM bots | |
which are implemented as plugins""" | |
import sys | |
import os | |
from xml.sax.handler import ContentHandler | |
from xml.sax import make_parser | |
import codecs | |
VERSION = '0.1' | |
BOTNAME = 'TIGER name expansion 2012' | |
QUIET = False | |
DEBUG = False | |
class OSMHandler(ContentHandler): | |
"""This is a base OSMHandler class which sets up the XML parsing, etc. | |
You will want to override the selectElement and transformElement | |
functions""" | |
def __init__(self, file_prefix): | |
self.path = file_prefix | |
self.file_prefix = file_prefix | |
self.object_counter = 0 | |
self.clear() | |
self.max_objects_per_file = 10000 | |
self.file_counter = 0 | |
self.out = None | |
self.roads = 0 | |
self.total_fixed = 0 | |
self.fixed = False | |
self.tried_to_fix = 0 | |
self.unrecognized_tags = set() | |
self.unrecognized_direction_tags = set() | |
self.ambigious_expansions = [] | |
def _open(self): | |
if not os.path.isdir(self.path): | |
os.mkdir(self.path) | |
fname = self.path + '/' + "%s_%04d.osm" % (self.file_prefix, | |
self.file_counter) | |
fh = codecs.open(fname, 'w', encoding='utf-8') | |
self.out = fh | |
self.out.write('<?xml version=\'1.0\' encoding=\'UTF-8\'?>\n') | |
self.out.write('<osm version="0.6" generator="pyxbot">\n') | |
def _close(self): | |
self.out.write('</osm>\n') | |
self.out.close() | |
self.out = None | |
self.object_counter = 0 | |
self.file_counter = self.file_counter + 1 | |
# The output methods don't do any kind of data validation | |
def _emit_node(self): | |
"Output a node" | |
if self.tags: | |
self.out.write('<node %s >\n' % | |
' '.join(['%s="%s"' % (x,y) | |
for x,y in self.attrs.items()])) | |
for tag in self.tags: | |
self.out.write(' <tag k="%s" v="%s" />\n' % | |
(tag, self.tags[tag])) | |
self.out.write('</node>\n') | |
else: | |
self.out.write('<node %s />\n' % | |
' '.join(['%s="%s"' % (x,y) | |
for x,y in self.attrs.items()])) | |
def _emit_way(self): | |
"Output a way" | |
self.out.write('<way %s >\n' % ' '.join(['%s="%s"' % (x, y) | |
for x, y in self.attrs.items()])) | |
if self.tags or self.nodes: | |
for nodeid in self.nodes: | |
self.out.write(' <nd ref="%s" />\n' % nodeid) | |
for tag in self.tags: | |
self.out.write(' <tag k="%s" v="%s" />\n' | |
% (tag, unicode(self.tags[tag]))) | |
self.out.write('</way>\n') | |
else: | |
self.out.write('<way %s />\n' % | |
' '.join(['%s="%s"' % (x,y) for x,y in self.attrs])) | |
def _emit_relation(self): | |
"Output a relation" | |
if self.members or self.tags: | |
self.out.write('<relation %s >\n' % | |
' '.join(['%s="%s"' % (x,y) | |
for x,y in self.attrs.items()])) | |
for member in self.members: | |
self.out.write(' <member %s />\n' % | |
' '.join(['%s="%s"' % (x,y) | |
for x,y in member.items()])) | |
for tag in self.tags: | |
self.out.write(u' <tag k="%s" v="%s" />\n' | |
% (tag, self.tags[tag])) | |
self.out.write('</relation>\n') | |
else: | |
self.out.write('<relation %s />\n' % | |
' '.join(['%s="%s"' % (x,y) | |
for x,y in self.attrs.items()])) | |
def emit(self): | |
"Output the current element" | |
if self.name == 'node': | |
self._emit_node() | |
elif self.name == 'way': | |
self._emit_way() | |
elif self.name == 'relation': | |
self._emit_relation() | |
def clear(self): | |
"Initialize the state machine" | |
self.name = None | |
self.tags = {} | |
self.nodes = [] | |
self.members = [] | |
self.attrs = {} | |
def startElement(self, name, attrs): | |
"This function is called at the start of the element (as per SAX)" | |
if name == 'node': | |
self.name = 'node' | |
self.attrs = attrs.copy() | |
elif name == 'way': | |
self.name = 'way' | |
self.attrs = attrs.copy() | |
elif name == 'relation': | |
self.name = 'relation' | |
self.attrs = attrs.copy() | |
elif name == 'tag': | |
self.tags[attrs.get('k')] = attrs.get('v') | |
elif name == 'member': | |
self.members.append(attrs.copy()) | |
elif name == 'nd': | |
self.nodes.append(attrs.get('ref')) | |
def selectElement(self): | |
"""Select whether or not we care about the OSM object (True or | |
False). Override this function in your handler""" | |
return False | |
def transformElement(self): | |
"""Transform the element. Override this function in your | |
handler""" | |
pass | |
def deleteElement(self): | |
"""Returns the string to delete the element. Please use with | |
caution!""" | |
self.out.write('<delete version="%s" generator="%s">\n' % | |
(VERSION, BOTNAME)) | |
self.emit() | |
self.out.write('</delete>\n') | |
def endElement(self, name): | |
"""As per the SAX handler, this method is where any work is | |
done. You may want to override it, but probably not""" | |
# If there's no open output, we need to open it | |
if not self.out: | |
self._open() | |
if name == 'way': | |
self.nodes = tuple(self.nodes) | |
elif name == 'relation': | |
self.members = tuple(self.members) | |
if name == 'node' or name == 'way' or name == 'relation': | |
if self.selectElement(): | |
self.transformElement() | |
self.emit() | |
self.object_counter = self.object_counter + 1 | |
if self.object_counter > self.max_objects_per_file: | |
self._close() | |
self.clear() | |
road_types = { | |
'Aly': 'Alley', | |
'Ave': 'Avenue', | |
'Blvd': 'Boulevard', | |
'Br': 'Branch', | |
'Brg': 'Bridge', | |
'Byp': 'Bypass', | |
'Cir': 'Circle', | |
'Cres': 'Crescent', | |
'Ct': 'Court', | |
'Ctr': 'Center', | |
'Cv': 'Cove', | |
'Dr': 'Drive', | |
'Expy': 'Expressway', | |
'Fwy': 'Freeway', | |
'Hwy': 'Highway', | |
'Ln': 'Lane', | |
'Mal': 'Mall', | |
'Pky': 'Parkway', | |
'Pl': 'Place', | |
'Plz': 'Plaza', | |
'Rd': 'Road', | |
'Rte': 'Route', | |
'Sq': 'Square', | |
'St': 'Street', | |
'Ter': 'Terrace', | |
'Thwy': 'Throughway', | |
'Trce': 'Terrace', | |
'Trl' : 'Trail', | |
'Wkwy': 'Walkway', | |
'Xing': 'Crossing'} | |
ignore_road_types = { | |
'Way': None, | |
'Run': None, | |
'Path': None, | |
'Spur': None, | |
'Pike': None, | |
'Ramp': None, | |
'Loop': None, | |
'Square': None, | |
'Walk': None, | |
'Pass': None, | |
'Avenue': None, | |
'Row': None} | |
directions = { | |
'N': 'North', | |
'S': 'South', | |
'E': 'East', | |
'W': 'West', | |
'NE': 'Northeast', | |
'NW': 'Northwest', | |
'SE': 'Southeast', | |
'SW': 'Southwest'} | |
class TigerRoadExpansionHandler(OSMHandler): | |
def expand_road_type(self): | |
short_name = self.road_type | |
long_name = road_types[short_name] | |
name = self.tags['name'] | |
basename = self.tags['tiger:name_base'] | |
if not basename in name: | |
# Someone has modified the name. | |
return | |
rest = name[name.index(basename) + len(basename):].split() | |
if short_name in rest: | |
rest[rest.index(short_name)] = long_name | |
rest_str = ' '.join(rest) | |
name = name[:name.index(basename) + len(basename)] + ' ' + rest_str | |
self.tags['name'] = name | |
self.fixed = True | |
if DEBUG: | |
print "fixed to " + name | |
else: | |
self.ambigious_expansions.append(name) | |
def expand_direction_prefix(self): | |
short_direction = self.dir_tag_prefix | |
long_direction = directions[short_direction] | |
# Let's assume the prefix is always the first thing in the name | |
name_list = self.tags['name'].split() | |
if name_list[0] == short_direction: | |
name_list[0] = long_direction | |
self.tags['name'] = ' '.join(name_list) | |
self.fixed = True | |
if DEBUG: | |
print "fixed to" + ' '.join(name_list) | |
def expand_direction_suffix(self): | |
short_direction = self.dir_tag_suffix | |
long_direction = directions[short_direction] | |
# Let's assume the suffix is always the last thing in the name | |
name_list = self.tags['name'].split() | |
if name_list[-1] == short_direction: | |
name_list[-1] = long_direction | |
self.tags['name'] = ' '.join(name_list) | |
self.fixed = True | |
if DEBUG: | |
print "fixed to " + " ".join(name_list) | |
def selectElement(self): | |
tags = self.tags | |
# Eliminate most objects straight away | |
if not (self.name == 'way' and tags.has_key('highway') and | |
tags.has_key('tiger:name_base')): | |
return | |
if not tags.has_key('name'): | |
return | |
name = tags['name'] | |
namel = name.split() | |
self.roads += 1 | |
if DEBUG and self.roads % 1000 == 0: | |
print str(self.roads) + "..." | |
# If we have a name_type that we haven't seen, store it. | |
# If the name is ambigious, store it. | |
road_type = tags.get('tiger:name_type') | |
if road_type: | |
if road_type in ignore_road_types: | |
road_type = None | |
if not road_type in road_types: | |
self.unrecognized_tags.add(road_type) | |
road_type = None | |
elif namel.count(road_type) > 2: | |
self.ambigious_expansions.append(name) | |
road_type = None | |
elif namel.count(road_type) < 1: | |
road_type = None | |
self.road_type = road_type | |
# Same with the direction tags prefix | |
dir_tag_prefix = tags.get('tiger:name_direction_prefix') | |
if dir_tag_prefix: | |
if not dir_tag_prefix in directions: | |
self.unrecognized_direction_tags.add(dir_tag_prefix) | |
dir_tag_prefix = None | |
else: | |
if namel.count(dir_tag_prefix) > 2: | |
self.ambigious_expansions.append(name) | |
dir_tag_prefix = None | |
elif namel.count(dir_tag_prefix) < 1: | |
dir_tag_prefix = None | |
self.dir_tag_prefix = dir_tag_prefix | |
dir_tag_suffix = tags.get('tiger:name_direction_suffix') | |
if dir_tag_suffix: | |
if not dir_tag_suffix in directions: | |
self.unrecognized_direction_tags.add(dir_tag_suffix) | |
dir_tag_suffix = None | |
else: | |
if namel.count(dir_tag_suffix) > 2: | |
self.ambigious_expansions.append(name) | |
dir_tag_suffix = None | |
elif namel.count(dir_tag_suffix) < 1: | |
dir_tag_suffix = None | |
self.dir_tag_suffix = dir_tag_suffix | |
if road_type or dir_tag_suffix or dir_tag_prefix: | |
self.tried_to_fix += 1 | |
if DEBUG: | |
print "trying to fix " + name | |
return True | |
def transformElement(self): | |
if self.road_type: | |
self.expand_road_type() | |
if self.dir_tag_prefix: | |
self.expand_direction_prefix() | |
if self.dir_tag_suffix: | |
self.expand_direction_suffix() | |
if self.fixed: | |
self.total_fixed += 1 | |
parser = make_parser() | |
handler = TigerRoadExpansionHandler("expansions") | |
parser.setContentHandler(handler) | |
if len(sys.argv) < 2: | |
print "usage: tiger.py filename [--quiet|--debug] (use - for stdin)" | |
sys.exit(0) | |
if len(sys.argv) == 3: | |
QUIET = (sys.argv[2] == '--quiet') | |
DEBUG = (sys.argv[2] == '--debug') | |
if DEBUG: | |
print "Debugging output enabled." | |
sys.stdout.flush() | |
fname = sys.argv[1] | |
if fname == '-': | |
fd = sys.stdin | |
else: | |
fd = open(fname, 'r') | |
parser.parse(fd) | |
if not QUIET: | |
print "%d total roads" % handler.roads | |
print "%d roads we tried to fix" % handler.tried_to_fix | |
print "%d fixed" % handler.total_fixed | |
print "%d unrecognized tags" % len(handler.unrecognized_tags) | |
print "%d ambigious road names" % len(handler.ambigious_expansions) | |
print "Ambigious Names" | |
print "================" | |
for n in handler.ambigious_expansions: | |
print n | |
print "Unrecognized Tags" | |
print "=================" | |
for n in handler.unrecognized_tags: | |
print n |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment