Skip to content

Instantly share code, notes, and snippets.

@matkoniecz
Created August 20, 2014 21:41
Show Gist options
  • Save matkoniecz/a3aa6bbb736e9d2b11d2 to your computer and use it in GitHub Desktop.
Save matkoniecz/a3aa6bbb736e9d2b11d2 to your computer and use it in GitHub Desktop.
bicycle_parking_statistics (GNU GPLv3 license)
# coding=utf-8
import urllib2
import os.path
import argparse
import decimal
from lxml import etree
import sys
def get_tag_value(element, querried_tag):
for tag in element:
if tag.tag != "tag":
continue
if tag.attrib['k'] == querried_tag:
return tag.attrib['v'].encode('utf-8')
return None
def parsed_args():
parser = argparse.ArgumentParser(description='Validation of wikipedia tag in osm data.')
parser.add_argument('-file', '-f', dest='file', type=str, help='location of osm file')
parser.add_argument('-lat', '-latitude', dest='lat', type=float, help='location of area, OSM data will be fetched')
parser.add_argument('-lon', '-longitude', dest='lon', type=float, help='location of area, OSM data will be fetched')
parser.add_argument('-delta', '-d', dest='delta', type=float, help='size of area, OSM data will be fetched')
parser.add_argument('-flush_cache', dest='flush_cache', help='adding this parameter will trigger flushing cache',
action='store_true')
args = parser.parse_args()
if not (args.file or (args.lat and args.lon)):
parser.error('Provide file with OSM data or location')
if args.file and (args.lat and args.lon):
parser.error('Provide file with OSM data or location, not both.')
return args
class UrlResponse:
def __init__(self, content, code):
self.content = content
self.code = code
def fetch(url):
while True:
try:
f = urllib2.urlopen(url)
return UrlResponse(f.read(), f.getcode())
except urllib2.HTTPError as e:
return UrlResponse("", e.getcode())
except urllib2.URLError as e:
print "no response from server for url " + url
print e
continue
def fetch_osm_data(lat, lon, delta, filename):
osm_file = open(filename, 'w')
url = "http://overpass-api.de/api/interpreter?data=(node("
url += str(lat - delta) + "," + str(lon - delta) + "," + str(lat + delta) + "," + str(lon + delta)
url += ");<;);out%20meta;"
response = fetch(url)
osm_file.write(response.content)
osm_file.close()
def get_coords_of_complex_object(element, node_database, way_database):
min_lat = 180
max_lat = -180
min_lon = 180
max_lon = -180
if element.tag != "way" and element.tag != "relation":
raise ValueError("Not a proper element passed to get_coords_of_complex_object")
for tag in element:
if (tag.tag == "nd") or (tag.tag == "member" and tag.attrib['type'] == "node"):
node_id = int(tag.attrib['ref'])
try:
if node_database[node_id] is None:
raise KeyError
except KeyError:
return None #node outside of downloaded map
lat = node_database[node_id].lat
lon = node_database[node_id].lon
elif tag.tag == "member" and tag.attrib['type'] == "way":
way_id = int(tag.attrib['ref'])
try:
if way_database[way_id] is None:
raise KeyError
except KeyError:
return None #way outside of downloaded map
lat = way_database[way_id].lat
lon = way_database[way_id].lon
else:
continue
min_lat = min([min_lat, lat])
max_lat = max([max_lat, lat])
min_lon = min([min_lon, lon])
max_lon = max([max_lon, lon])
return Coord((min_lat + max_lat) / 2, (min_lon + max_lon) / 2)
class Coord:
def __init__(self, lat, lon):
self.lat = lat
self.lon = lon
def download_data_if_needed():
args = parsed_args()
if args.lat and args.lon:
delta = 0.02
if args.delta is not None:
delta = args.delta
args.file = str(args.lat) + "-" + str(args.lon) + "#" + str(delta) + ".osm"
if not os.path.isfile(args.file) or args.flush_cache or args.flush_cache_for_reported_situations:
fetch_osm_data(args.lat, args.lon, delta, args.file)
def report_element(element, text):
sys.stderr.write(("http://www.openstreetmap.org/" + element.tag + "/" + element.attrib['id']).encode('utf-8'))
sys.stderr.write(" - ")
sys.stderr.write(text)
sys.stderr.write("\n")
download_data_if_needed()
args = parsed_args()
data = etree.parse(args.file)
node_database = {}
way_database = {}
count_by_type = {}
count_by_operator_than_type = {}
contributors = {}
for element in data.getiterator():
if element.tag != "node" and element.tag != "way" and element.tag != "relation":
continue
if element.tag == "node":
lat = decimal.Decimal(element.attrib['lat'].encode('utf-8'))
lon = decimal.Decimal(element.attrib['lon'].encode('utf-8'))
osm_id = int(element.attrib['id'])
node_database[osm_id] = Coord(lat, lon)
if element.tag == "way":
coords = get_coords_of_complex_object(element, node_database, way_database)
osm_id = int(element.attrib['id'])
way_database[osm_id] = coords
username = element.attrib['user'].encode('utf-8')
try:
contributors[username] += 1
except KeyError:
contributors[username] = 1
amenity = get_tag_value(element, "amenity")
if amenity == "bicycle_parking":
access = get_tag_value(element, "access")
if access is None or access == "yes" or access == "permissive":
capacity = get_tag_value(element, "capacity")
if capacity is None:
capacity = 1
report_element(element, "capacity is missing for bicycle parking")
capacity = int(capacity)
bicycle_parking_type = get_tag_value(element, "bicycle_parking")
if bicycle_parking_type is None:
report_element(element, "type is missing for bicycle parking")
elif bicycle_parking_type != "stands" and bicycle_parking_type != "wall_loops":
print str(bicycle_parking_type) + " Unhandled bicycle parking type"
raise AssertionError
operator = get_tag_value(element, "operator")
if operator is None:
report_element(element, "operator is missing for bicycle parking (" + str(bicycle_parking_type) + ", " + str(capacity) + ")")
try:
count_by_operator_than_type[operator]
except KeyError:
count_by_operator_than_type[operator] = {}
try:
count_by_operator_than_type[operator][bicycle_parking_type] += capacity
except KeyError:
count_by_operator_than_type[operator][bicycle_parking_type] = capacity
try:
count_by_type[bicycle_parking_type] += capacity
except KeyError:
count_by_type[bicycle_parking_type] = capacity
total = 0
for i in count_by_type:
total += count_by_type[i]
print total
print count_by_type
print ""
for i in ["stands", "wall_loops"]:
for operator in count_by_operator_than_type:
try:
count_by_operator_than_type[operator][i]
except KeyError:
count_by_operator_than_type[operator][i] = 0
ordered = sorted(count_by_operator_than_type.items(), key=lambda x: -x[1]["stands"] * 5000 - x[1]["wall_loops"])
threshold = 12
for item in ordered[:threshold]:
print item
for item in ordered[threshold:-2]:
sys.stdout.write(item[0])
sys.stdout.write(', ')
sys.stdout.write(str(ordered[-2][0]))
sys.stdout.write(' and ')
sys.stdout.write(str(ordered[-1][0]))
print ""
print ""
contributors = sorted(contributors.items(), key=lambda x: -x[1])[:25]
for user in contributors[:-1]:
sys.stdout.write(user[0])
sys.stdout.write(", ")
sys.stdout.write(contributors[-1][0])
sys.stdout.write(" and others")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment