Created
August 20, 2014 21:41
-
-
Save matkoniecz/a3aa6bbb736e9d2b11d2 to your computer and use it in GitHub Desktop.
bicycle_parking_statistics (GNU GPLv3 license)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding=utf-8 | |
import urllib2 | |
import os.path | |
import argparse | |
import decimal | |
from lxml import etree | |
import sys | |
def get_tag_value(element, querried_tag): | |
for tag in element: | |
if tag.tag != "tag": | |
continue | |
if tag.attrib['k'] == querried_tag: | |
return tag.attrib['v'].encode('utf-8') | |
return None | |
def parsed_args(): | |
parser = argparse.ArgumentParser(description='Validation of wikipedia tag in osm data.') | |
parser.add_argument('-file', '-f', dest='file', type=str, help='location of osm file') | |
parser.add_argument('-lat', '-latitude', dest='lat', type=float, help='location of area, OSM data will be fetched') | |
parser.add_argument('-lon', '-longitude', dest='lon', type=float, help='location of area, OSM data will be fetched') | |
parser.add_argument('-delta', '-d', dest='delta', type=float, help='size of area, OSM data will be fetched') | |
parser.add_argument('-flush_cache', dest='flush_cache', help='adding this parameter will trigger flushing cache', | |
action='store_true') | |
args = parser.parse_args() | |
if not (args.file or (args.lat and args.lon)): | |
parser.error('Provide file with OSM data or location') | |
if args.file and (args.lat and args.lon): | |
parser.error('Provide file with OSM data or location, not both.') | |
return args | |
class UrlResponse: | |
def __init__(self, content, code): | |
self.content = content | |
self.code = code | |
def fetch(url): | |
while True: | |
try: | |
f = urllib2.urlopen(url) | |
return UrlResponse(f.read(), f.getcode()) | |
except urllib2.HTTPError as e: | |
return UrlResponse("", e.getcode()) | |
except urllib2.URLError as e: | |
print "no response from server for url " + url | |
print e | |
continue | |
def fetch_osm_data(lat, lon, delta, filename): | |
osm_file = open(filename, 'w') | |
url = "http://overpass-api.de/api/interpreter?data=(node(" | |
url += str(lat - delta) + "," + str(lon - delta) + "," + str(lat + delta) + "," + str(lon + delta) | |
url += ");<;);out%20meta;" | |
response = fetch(url) | |
osm_file.write(response.content) | |
osm_file.close() | |
def get_coords_of_complex_object(element, node_database, way_database): | |
min_lat = 180 | |
max_lat = -180 | |
min_lon = 180 | |
max_lon = -180 | |
if element.tag != "way" and element.tag != "relation": | |
raise ValueError("Not a proper element passed to get_coords_of_complex_object") | |
for tag in element: | |
if (tag.tag == "nd") or (tag.tag == "member" and tag.attrib['type'] == "node"): | |
node_id = int(tag.attrib['ref']) | |
try: | |
if node_database[node_id] is None: | |
raise KeyError | |
except KeyError: | |
return None #node outside of downloaded map | |
lat = node_database[node_id].lat | |
lon = node_database[node_id].lon | |
elif tag.tag == "member" and tag.attrib['type'] == "way": | |
way_id = int(tag.attrib['ref']) | |
try: | |
if way_database[way_id] is None: | |
raise KeyError | |
except KeyError: | |
return None #way outside of downloaded map | |
lat = way_database[way_id].lat | |
lon = way_database[way_id].lon | |
else: | |
continue | |
min_lat = min([min_lat, lat]) | |
max_lat = max([max_lat, lat]) | |
min_lon = min([min_lon, lon]) | |
max_lon = max([max_lon, lon]) | |
return Coord((min_lat + max_lat) / 2, (min_lon + max_lon) / 2) | |
class Coord: | |
def __init__(self, lat, lon): | |
self.lat = lat | |
self.lon = lon | |
def download_data_if_needed(): | |
args = parsed_args() | |
if args.lat and args.lon: | |
delta = 0.02 | |
if args.delta is not None: | |
delta = args.delta | |
args.file = str(args.lat) + "-" + str(args.lon) + "#" + str(delta) + ".osm" | |
if not os.path.isfile(args.file) or args.flush_cache or args.flush_cache_for_reported_situations: | |
fetch_osm_data(args.lat, args.lon, delta, args.file) | |
def report_element(element, text): | |
sys.stderr.write(("http://www.openstreetmap.org/" + element.tag + "/" + element.attrib['id']).encode('utf-8')) | |
sys.stderr.write(" - ") | |
sys.stderr.write(text) | |
sys.stderr.write("\n") | |
download_data_if_needed() | |
args = parsed_args() | |
data = etree.parse(args.file) | |
node_database = {} | |
way_database = {} | |
count_by_type = {} | |
count_by_operator_than_type = {} | |
contributors = {} | |
for element in data.getiterator(): | |
if element.tag != "node" and element.tag != "way" and element.tag != "relation": | |
continue | |
if element.tag == "node": | |
lat = decimal.Decimal(element.attrib['lat'].encode('utf-8')) | |
lon = decimal.Decimal(element.attrib['lon'].encode('utf-8')) | |
osm_id = int(element.attrib['id']) | |
node_database[osm_id] = Coord(lat, lon) | |
if element.tag == "way": | |
coords = get_coords_of_complex_object(element, node_database, way_database) | |
osm_id = int(element.attrib['id']) | |
way_database[osm_id] = coords | |
username = element.attrib['user'].encode('utf-8') | |
try: | |
contributors[username] += 1 | |
except KeyError: | |
contributors[username] = 1 | |
amenity = get_tag_value(element, "amenity") | |
if amenity == "bicycle_parking": | |
access = get_tag_value(element, "access") | |
if access is None or access == "yes" or access == "permissive": | |
capacity = get_tag_value(element, "capacity") | |
if capacity is None: | |
capacity = 1 | |
report_element(element, "capacity is missing for bicycle parking") | |
capacity = int(capacity) | |
bicycle_parking_type = get_tag_value(element, "bicycle_parking") | |
if bicycle_parking_type is None: | |
report_element(element, "type is missing for bicycle parking") | |
elif bicycle_parking_type != "stands" and bicycle_parking_type != "wall_loops": | |
print str(bicycle_parking_type) + " Unhandled bicycle parking type" | |
raise AssertionError | |
operator = get_tag_value(element, "operator") | |
if operator is None: | |
report_element(element, "operator is missing for bicycle parking (" + str(bicycle_parking_type) + ", " + str(capacity) + ")") | |
try: | |
count_by_operator_than_type[operator] | |
except KeyError: | |
count_by_operator_than_type[operator] = {} | |
try: | |
count_by_operator_than_type[operator][bicycle_parking_type] += capacity | |
except KeyError: | |
count_by_operator_than_type[operator][bicycle_parking_type] = capacity | |
try: | |
count_by_type[bicycle_parking_type] += capacity | |
except KeyError: | |
count_by_type[bicycle_parking_type] = capacity | |
total = 0 | |
for i in count_by_type: | |
total += count_by_type[i] | |
print total | |
print count_by_type | |
print "" | |
for i in ["stands", "wall_loops"]: | |
for operator in count_by_operator_than_type: | |
try: | |
count_by_operator_than_type[operator][i] | |
except KeyError: | |
count_by_operator_than_type[operator][i] = 0 | |
ordered = sorted(count_by_operator_than_type.items(), key=lambda x: -x[1]["stands"] * 5000 - x[1]["wall_loops"]) | |
threshold = 12 | |
for item in ordered[:threshold]: | |
print item | |
for item in ordered[threshold:-2]: | |
sys.stdout.write(item[0]) | |
sys.stdout.write(', ') | |
sys.stdout.write(str(ordered[-2][0])) | |
sys.stdout.write(' and ') | |
sys.stdout.write(str(ordered[-1][0])) | |
print "" | |
print "" | |
contributors = sorted(contributors.items(), key=lambda x: -x[1])[:25] | |
for user in contributors[:-1]: | |
sys.stdout.write(user[0]) | |
sys.stdout.write(", ") | |
sys.stdout.write(contributors[-1][0]) | |
sys.stdout.write(" and others") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment