Created
August 27, 2018 21:25
-
-
Save rmaceissoft/7212a82149efe7518ce0a378cb2cc6e3 to your computer and use it in GitHub Desktop.
OTPClient is a python class to make requests to the OTP API and process the response according to our needs from the server side:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import math | |
import re | |
import time | |
from django.conf import settings | |
import requests | |
# used to remove stop id from stop names included into pattern description | |
regex_stop_id = re.compile('(\(\w+:\d+\))') | |
regex_to_from = [ | |
{ | |
'regex': re.compile('([\S\s]+)\s+to\s+([\S\s]+)\s+from\s+([\S\s]+)\s+like trip\s+([\S\s]+)'), | |
'to': 2, | |
'from': 3 | |
}, | |
{ | |
'regex': re.compile('([\S\s]+)\s+to\s+([\S\s]+)\s+from\s+([\S\s]+)\s+via\s+([\S\s]+)'), | |
'to': 2, | |
'from': 3 | |
}, | |
{ | |
'regex': re.compile('([\S\s]+)\s+to\s+([\S\s]+)\s+from\s+([\S\s]+)'), | |
'to': 2, | |
'from': 3 | |
}, | |
{ | |
'regex': re.compile('([\S\s]+)\s+to\s+([\S\s]+)'), | |
'to': 2, | |
'from': None | |
}, | |
] | |
def get_matched_alerts(alerts, route_id, stop_id): | |
matched_alerts_id = [] | |
matched_alerts = [] | |
for alert in alerts: | |
for service in alert['affected_services']['services']: | |
try: | |
if alert['alert_id'] not in matched_alerts_id: | |
if ('stop_id' not in service and service['route_id'] == route_id) or ( | |
'stop_id' in service and service['stop_id'] == stop_id): | |
matched_alerts_id.append(alert['alert_id']) | |
matched_alerts.append(alert) | |
except KeyError: | |
pass | |
return matched_alerts | |
def calculate_initial_compass_bearing(pointA, pointB): | |
""" | |
Calculates the bearing between two points. | |
:Parameters: | |
- `pointA: The tuple representing the latitude/longitude for the | |
first point. Latitude and longitude must be in decimal degrees | |
- `pointB: The tuple representing the latitude/longitude for the | |
second point. Latitude and longitude must be in decimal degrees | |
:Returns: | |
The bearing in degrees | |
:Returns Type: | |
float | |
""" | |
if (type(pointA) != tuple) or (type(pointB) != tuple): | |
raise TypeError("Only tuples are supported as arguments") | |
lat1 = math.radians(pointA[0]) | |
lat2 = math.radians(pointB[0]) | |
diffLong = math.radians(pointB[1] - pointA[1]) | |
x = math.sin(diffLong) * math.cos(lat2) | |
y = math.cos(lat1) * math.sin(lat2) - (math.sin(lat1) * math.cos(lat2) * math.cos(diffLong)) | |
initial_bearing = math.atan2(x, y) | |
# Now we have the initial bearing but math.atan2 return values | |
# from -180 to + 180 which is not what we want for a compass bearing | |
# The solution is to normalize the initial bearing as shown below | |
initial_bearing = math.degrees(initial_bearing) | |
compass_bearing = (initial_bearing + 360) % 360 | |
return compass_bearing | |
def get_cardinal_direction(compass_bearing): | |
directions = ['N', 'NE', 'E', 'SE', 'S', 'SW', 'W', 'NW'] | |
delta = 360 / len(directions) | |
index = int((compass_bearing + (delta / 2)) / delta) | |
return directions[index % len(directions)] | |
class OTPException(): | |
pass | |
class OTPNotFoundException(OTPException): | |
pass | |
class OTPClient(object): | |
BASE_URL = '%s/otp/routers/default/' % settings.OTP_HOST | |
@classmethod | |
def _request(cls, url_path, api_resource='index', params=None): | |
url = '%s%s%s' % (cls.BASE_URL, api_resource, url_path) | |
resp = requests.get(url, params=params) | |
if resp.status_code == 200: | |
return resp.json() | |
else: | |
if resp.status_code == 404: | |
raise OTPNotFoundException() | |
return None | |
@classmethod | |
def stops(cls, lat, lon, radius): | |
params = { | |
'lat': lat, | |
'lon': lon, | |
'radius': radius | |
} | |
return cls._request('/stops/', params=params) | |
@classmethod | |
def stop_info(cls, stop_id): | |
return cls._request('/stops/%s/' % stop_id) | |
@classmethod | |
def stop_routes(cls, stop_id): | |
return cls._request('/stops/%s/routes/' % stop_id) | |
@classmethod | |
def stop_times(cls, stop_id): | |
params = { | |
'omitNonPickups': True, | |
'numberOfDepartures': 3, | |
'startTime': int(time.time()), | |
'timeRange': 2700 | |
} | |
return cls._request( | |
'/stops/%s/stoptimes/' % stop_id, params=params) | |
@classmethod | |
def route_stops(cls, route_id): | |
return cls._request('/routes/%s/stops/' % route_id) | |
@classmethod | |
def route_trips(cls, route_id): | |
return cls._request('/routes/%s/trips' % route_id) | |
@classmethod | |
def trip_info(cls, trip_id): | |
return cls._request('/trips/%s/' % trip_id) | |
@classmethod | |
def trip_stop_times(cls, trip_id): | |
return cls._request('/trips/%s/stoptimes/' % trip_id) | |
@classmethod | |
def trip_geometry(cls, trip_id): | |
resp = cls._request('/trips/%s/geometry/' % trip_id) | |
return resp | |
@classmethod | |
def parse_pattern_description(cls, parsers, value): | |
_to, _from = None, None | |
for item in parsers: | |
match = item['regex'].match(value) | |
if match: | |
if item['to']: | |
_to = match.group(item['to']) | |
if item['from']: | |
_from = match.group(item['from']) | |
break | |
# removing stop id from the name | |
if _to: | |
_to = regex_stop_id.sub('', _to).strip() | |
if _from: | |
_from = regex_stop_id.sub('', _from).strip() | |
return _to, _from | |
@classmethod | |
def serialize_time(cls, _time, current_unix_timestamp=None): | |
data = { | |
'arrivalDelay': _time['arrivalDelay'], | |
# 'blockId': time['blockId'], | |
# 'departureDelay': time['departureDelay'], | |
'headsign': _time.get('headsign'), | |
# 'realtime': time['realtime'], | |
'rA': _time['realtimeArrival'], | |
# 'realtimeDeparture': time['realtimeDeparture'], | |
# 'realtimeState': time['realtimeState'], | |
# 'scheduledArrival': time['scheduledArrival'], | |
# 'scheduledDeparture': time['scheduledDeparture'], | |
'serviceDay': _time['serviceDay'], | |
# 'stopId': time['stopId'], | |
'timepoint': _time['timepoint'], | |
'tripId': _time['tripId'] | |
} | |
if current_unix_timestamp: | |
data['rAM'] = (_time['serviceDay'] + _time['realtimeArrival'] - current_unix_timestamp) // 60 | |
return data | |
@classmethod | |
def serialize_stop_time(cls, item, current_unix_timestamp=None, stops=None): | |
data = { | |
'rA': item['realtimeArrival'], | |
'stopid': item['stopId'], | |
'serviceDay': item['serviceDay'], | |
'headsign': item.get('headsign') | |
} | |
if current_unix_timestamp: | |
data['rAM'] = (item['serviceDay'] + item['realtimeArrival'] - current_unix_timestamp) // 60 | |
if stops: | |
data['stop'] = stops.get(item['stopId']) | |
return data | |
@classmethod | |
def trip_details(cls, trip_id): | |
trip_info = cls.trip_info(trip_id) | |
route_stops = cls.route_stops(trip_info['route']['id']) | |
dict_stops = dict([(stop['id'], stop) for stop in route_stops]) | |
geometry = cls.trip_geometry(trip_id) | |
stops = cls.trip_stop_times(trip_id) | |
current_unix_timestamp = int(time.time()) | |
stops = [cls.serialize_stop_time(item, current_unix_timestamp=current_unix_timestamp, | |
stops=dict_stops) for item in stops] | |
p1 = (stops[0]['stop']['lat'], stops[0]['stop']['lon']) | |
p2 = (stops[-1]['stop']['lat'], stops[-1]['stop']['lon']) | |
try: | |
compass_bearing = calculate_initial_compass_bearing(p1, p2) | |
cardinal_direction = get_cardinal_direction(compass_bearing) | |
except Exception: | |
cardinal_direction = None | |
return { | |
'route': trip_info['route'], | |
'geometry': geometry, | |
'stops': stops, | |
'cardinal_direction': cardinal_direction | |
} | |
@classmethod | |
def get_and_group_arrivals(cls, dict_stops, dict_routes, alerts=None, exclude_stop_at_response=False, | |
similar_to_trip=None): | |
current_unix_timestamp = int(time.time()) | |
grouped_arrivals = {} | |
for stop_id in dict_stops.keys(): | |
for item in cls.stop_times(stop_id): | |
pattern = item['pattern'] | |
pattern_bits = pattern['id'].split(':') | |
route_id = ':'.join(pattern_bits[0:2]) | |
route_id_without_prefix = pattern_bits[1] | |
if similar_to_trip and ( | |
similar_to_trip['directionId'] != pattern_bits[2] or | |
similar_to_trip['route']['id'] != route_id): | |
continue | |
# filtering times | |
times = (time for time in item['times'] | |
if 0 <= (time['serviceDay'] + time['realtimeArrival'] - current_unix_timestamp) <= 2700 and | |
time['stopIndex'] + 1 < time['stopCount']) | |
for _time in times: | |
key = ":".join((pattern['id'], _time['stopId'], )) | |
if key not in grouped_arrivals: | |
_to, _from = cls.parse_pattern_description(regex_to_from, pattern['desc']) | |
if not _to: | |
_to = _time.get('headsign') | |
# match and attach alerts | |
matched_alerts = get_matched_alerts(alerts, route_id_without_prefix, stop_id) if alerts else [] | |
grouped_arrivals[key] = { | |
'route': dict_routes[route_id], | |
'pattern': pattern, | |
'times': [cls.serialize_time(_time, current_unix_timestamp=current_unix_timestamp)], | |
'to': _to, | |
'from': _from, | |
'alerts': matched_alerts | |
} | |
if not exclude_stop_at_response: | |
grouped_arrivals[key]['stop'] = dict_stops[stop_id] | |
else: | |
grouped_arrivals[key]['times'].append( | |
cls.serialize_time(_time, current_unix_timestamp=current_unix_timestamp)) | |
# order times and cap to 3 times | |
for key, value in grouped_arrivals.items(): | |
grouped_arrivals[key]['times'] = sorted(value['times'])[:3] | |
return grouped_arrivals.values() | |
@classmethod | |
def arrival_times(cls, lat, lon, radius=805, limit_stops=10, alerts=None): | |
stops = cls.stops(lat, lon, radius) | |
# sort stops by distance and take the 10 first stops | |
stops = sorted(stops, key=lambda item: item['dist'])[:limit_stops] | |
dict_stops = dict([(stop['id'], stop) for stop in stops]) | |
dict_routes = {} | |
for stop_id in dict_stops.keys(): | |
routes = cls.stop_routes(stop_id) | |
if not routes: | |
# remove stop_id from dict_stops since it has not routes | |
dict_stops.pop(stop_id) | |
dict_routes.update(dict([(route['id'], route) for route in routes])) | |
return cls.get_and_group_arrivals(dict_stops, dict_routes, alerts=alerts) | |
@classmethod | |
def arrival_times_by_stop(cls, stop_id, alerts=None, similar_to_trip=None): | |
if isinstance(stop_id, basestring): | |
stop_ids = [stop_id] | |
else: | |
stop_ids = stop_id | |
if similar_to_trip: | |
trip_info = cls.trip_info(similar_to_trip) | |
else: | |
trip_info = None | |
dict_stops = dict([(_stop_id, cls.stop_info(_stop_id)) for _stop_id in stop_ids]) | |
dict_routes = {} | |
for _stop_id in dict_stops.keys(): | |
routes = cls.stop_routes(_stop_id) | |
if not routes: | |
dict_stops.pop(_stop_id) | |
dict_routes.update(dict([(route['id'], route) for route in routes])) | |
if not dict_stops: | |
return [] | |
exclude_stop_at_response = len(stop_ids) == 1 | |
return cls.get_and_group_arrivals( | |
dict_stops, dict_routes, alerts=alerts, exclude_stop_at_response=exclude_stop_at_response, | |
similar_to_trip=trip_info) | |
@classmethod | |
def plan_trip(cls, fromPlace, toPlace, mode, optimize): | |
return cls._request('', api_resource='plan', params={ | |
'arriveBy': False, | |
'fromPlace': fromPlace, | |
'toPlace': toPlace, | |
'maxWalkDistance': 1.4 * 1609.34, | |
'mode': mode, | |
'numItineraries': 1, | |
'optimize': optimize, | |
'wheelchair': False, | |
}) | |
@classmethod | |
def bike_rental(cls, lower_left=None, upper_right=None): | |
params = {} | |
if lower_left and upper_right: | |
params.update({ | |
'lowerLeft': lower_left, | |
'upperRight': upper_right | |
}) | |
response = cls._request('', api_resource='bike_rental', params=params) | |
return response |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment