Created
December 16, 2010 06:09
-
-
Save xydrolase/743098 to your computer and use it in GitHub Desktop.
Sample code for retrieving all bus stops using Google's webservice. Theoretically, one could extract all venues within proximity of a given location.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import json | |
import re | |
import urllib | |
import random | |
import itertools | |
import time | |
import sys | |
class GMapsBusQuery: | |
def __init__(self): | |
self.bounded = False | |
self.boundbox = [] | |
self.zoomlevel = 0 | |
self.busstops = {} | |
def bound_by_tile(self, bb, zoom): | |
if not len(bb) == 4: | |
return None | |
self.boundbox = bb | |
self.zoomlevel = zoom | |
self.bounded = True | |
def generate_callback(self): | |
"""Generate a random callback hash (for simulating "real" request | |
purpose)""" | |
return ''.join([chr(97+x) if x < 26 else chr(48+x-26) | |
for x in [random.randint(0, 35) for i in range(9)]]) | |
def geohash_to_tile(self, hash): | |
"""Extract the tile number (x, y) from the 4-radix "geospatial hash" (kind | |
of) representataion. | |
Each 4-base digit is encoded with characters 't', 'u', 'v' ,'w', | |
corresponding to 0, 1, 2, 3 respetively. | |
Consider each digit within the 4-radix number as two bits in binary. The | |
higher bit maps to x, while the lower bit maps to y. | |
""" | |
exponent = len(hash)-1 | |
bits = map(lambda x: ord(x) - 116, list(hash)) | |
x = y = 0 | |
for idx in range(len(bits)): | |
shift = exponent-idx | |
x += ((bits[idx] & 2) >> 1) << shift | |
y += (bits[idx] & 1) << shift | |
return x, y | |
def tile_to_geohash(self, x, y, zoom=None): | |
if zoom == None: | |
zoom = self.zoomlevel | |
x_bits = map(lambda b: int(b), list(bin(x)[2:])) | |
y_bits = map(lambda b: int(b), list(bin(y)[2:])) | |
x_bits = [0] * (zoom - len(x_bits)) + x_bits | |
y_bits = [0] * (zoom - len(y_bits)) + y_bits | |
return ''.join([chr(((bx << 1) | by) + 116) | |
for bx, by in zip(x_bits, y_bits)]) | |
def query_landmark(self, tx, ty, step): | |
# Find all possible combinations of tile (x, y), convert them into | |
# service compatible tile hashes. | |
tile_hash = [self.tile_to_geohash(x, y) | |
for x, y in itertools.product( | |
range(tx, tx+step if tx+step < self.boundbox[2] + 2 else | |
self.boundbox[2] + 1), | |
range(ty, ty+step if ty+step < self.boundbox[3] + 2 else | |
self.boundbox[3] + 1) | |
)] | |
url_params = urllib.urlencode( | |
{ | |
'lyrs': 'm@140', # default layer | |
'las': ','.join(tile_hash), # tiles to query | |
'gl': 'us', | |
'hl': 'en', | |
'xc': 1, | |
'z': self.zoomlevel, | |
'opts': 'z', | |
'callback': '_xdc_._%s' % self.generate_callback() | |
}) | |
# Randomly balance requests to different servers. | |
url = 'http://mt%(server)d.google.com/vt/ft?%(param)s' % \ | |
({ | |
'server': random.randint(0, 1), | |
'param': url_params | |
}) | |
try: | |
f = urllib.urlopen(url) | |
assert f.getcode() == 200 | |
return f.read() | |
except Exception: | |
return None | |
def extract_bus_stops(self, response): | |
"""Find all bus stops within response from API service: | |
http://mt0.google.com/vt/ft?lyrs=...&las=... | |
We could identify bus stop metadata by recognizing its unique (or not?) | |
bounding box [-7, -7, 6, 6].""" | |
# Seems to be response w/o useful data | |
if response.find('features') == -1: | |
return None | |
# Some cleaning before JSON library could start its job. | |
json_raw = re.sub(r'([{|,])([a-z_]+):', r'\1"\2":', | |
response[response.find('(') + 1:response.rfind(')')]) | |
json_raw = re.sub(r'\"c\":\"\{1:\{(.+?)\}\}"', r'"c":{\1}', json_raw) | |
json_raw = json_raw.replace(r'\"', '"') | |
try: | |
# Find out all nodes with "features" list. | |
metadata = filter(lambda d: 'features' in d, json.loads(json_raw)) | |
for node in metadata: | |
for feat in node['features']: | |
# Identify bus stop by its bounding box (might has problems?) | |
if feat['bb'] == [-7, -7, 6, 6] and \ | |
feat['id'] not in self.busstops: | |
self.busstops.setdefault(feat['id'], {}) | |
self.busstops[feat['id']]['caption'] = feat['c']['title'] | |
except ValueError: | |
return False | |
def update_stops_detail(self, pause=0.5): | |
convert_chars = lambda match: chr(int(match.group(1), 16)) \ | |
if match.group(1) else match.group(0) | |
for id in self.busstops.iterkeys(): | |
print 'Update ', self.busstops[id]['caption'] | |
# Construct URL request | |
url_params = { | |
'ftid': id, | |
'lyr' : 'm@140', # default layer | |
'iwp' : 'maps_app', | |
'callback' : '_xdc_._%s' % self.generate_callback | |
} | |
uri = "http://maps.google.com/maps/iw?%s" % \ | |
urllib.urlencode(url_params) | |
# Convert all \x[a-z0-9]{2} representation into characters. | |
try: | |
f = urllib.urlopen(uri) | |
assert f.getcode() == 200 | |
response = re.sub( | |
r'\\x(\w{2})', | |
convert_chars, | |
f.read()) | |
self.update_stop(id, response) | |
time.sleep(pause) | |
except Exception: | |
continue | |
def update_stop(self, id, response): | |
bus_node = self.busstops[id] | |
meta = json.loads(response) | |
bus_node['latlng'] = (meta['latlng']['lat'], meta['latlng']['lng']) | |
bus_node['name'] = meta['name'] | |
if 'infoWindow' in meta: | |
info = meta['infoWindow'] | |
schedule = info['transitSchedules']['stationSchedules'] | |
bus_node['agency'] = schedule['agencies'][0]['agency_name'] | |
bus_node['lines'] = [] | |
lines = schedule['line_groups'][0]['lines'] | |
for line in lines: | |
line_node = {} | |
line_node['name'] = line['name'] | |
line_node['color'] = line['backgroundColor'] | |
bus_node['lines'].append(line_node) | |
def run(self, step=3, pause=0.5): | |
if not self.bounded: | |
return None | |
for tilex in range(self.boundbox[0], self.boundbox[2] + 1, step): | |
for tiley in range(self.boundbox[1], self.boundbox[3] + 1, step): | |
response = self.query_landmark(tilex, tiley, step) | |
if response: | |
self.extract_bus_stops(response) | |
time.sleep(pause) # stop for a short while | |
self.update_stops_detail(pause) | |
return self.busstops | |
def main(): | |
busq = GMapsBusQuery() | |
#busq.bound_by_tile([31451, 48646, 31454, 48667], 17) | |
fd = open('cyride_stops.json', 'r') | |
busq.busstops = json.load(fd) | |
busq.update_stops_detail() | |
fd.close() | |
fd = open("cyride_full_stops.json", 'w') | |
json.dump(busq.busstops, fd) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment