Created
January 4, 2018 04:44
-
-
Save pdarche/957d6f937408dc738ea8d825ecca7792 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| # -*- coding: utf-8 -*- | |
| """ | |
| This program computes my transportation-related carbon footprint using | |
| Moves app data and the Brighter Planet carbon API. The Moves mobile | |
| app tracks personal activity and displays it as a set of activity segments | |
| like walking, running, and using transportation. I collected a sample of | |
| transportation segments, labeled the transportation type | |
| (car, airplane, subway etc), and then trained a ML algo | |
| to automate the classification. This code pulls my moves data, | |
| classifies the transportation segments, and then adds carbon | |
| information from the Brighter Planet API. The data is then loaded into | |
| a database that powers a dashboard: https://carbon-footprint.now.sh | |
| The main logic is in the "run" method of each class! | |
| """ | |
| import datetime | |
| import itertools | |
| import math | |
| import pickle | |
| from dateutil.parser import parse | |
| import moves as mvs | |
| import numpy as np | |
| import pymongo | |
| import requests | |
| from config import BRIGHTER_PLANET_API_KEY, USER_ID | |
| BASE = 'http://impact.brighterplanet.com/' | |
| LIMIT = 5 | |
| LABELS = ['subway', 'bus', 'car', 'airplane'] | |
| TYPES = { | |
| 'car': 'automobile_trips.json', | |
| 'subway': 'rail_trips.json?class=commuter', | |
| 'airplane': 'flights.json', | |
| 'bus': 'bus_trips.json' | |
| } | |
| # Helper functions | |
| def datetime_to_seconds(dt): | |
| """ Converts datetime to seconds """ | |
| return (dt.hour * 3600) + (dt.minute * 60) + dt.second | |
| def distance_on_unit_sphere(lat1, long1, lat2, long2): | |
| """ Computes the distance between two points on a sphere """ | |
| # Convert latitude and longitude to | |
| # spherical coordinates in radians. | |
| degrees_to_radians = math.pi/180.0 | |
| # phi = 90 - latitude | |
| phi1 = (90.0 - lat1)*degrees_to_radians | |
| phi2 = (90.0 - lat2)*degrees_to_radians | |
| # theta = longitude | |
| theta1 = long1*degrees_to_radians | |
| theta2 = long2*degrees_to_radians | |
| # Compute spherical distance from spherical coordinates. | |
| cos = (math.sin(phi1)*math.sin(phi2)*math.cos(theta1 - theta2) + | |
| math.cos(phi1)*math.cos(phi2)) | |
| arc = math.acos(cos) | |
| # Multiply arc by the radius of the earth | |
| earth_rad_miles = 3963.1676 | |
| earth_rad_feet = earth_rad_miles | |
| return arc * earth_rad_feet | |
| class TransportsExtractor(object): | |
| """ | |
| Extracts raw Moves App transport records for a user for | |
| a specified set of days | |
| """ | |
| def __init__(self, user, db, collection, moves): | |
| self.user = user | |
| self.db = db | |
| self.collection = collection | |
| self.moves = moves | |
| def _find_existing_dates(self): | |
| """ Finds the earliest update for a moves record. """ | |
| docs = self.db[self.collection].distinct('date') | |
| dates = [parse(doc).date() for doc in docs] | |
| return dates | |
| def _find_no_transport_dates(self): | |
| """ Finds the dates that don't have transport records """ | |
| docs = self.db.no_transport_dates.distinct('date') | |
| dates = [parse(date).date() for date in docs] | |
| return dates | |
| @staticmethod | |
| def _create_active_daterange(start_date): | |
| """ | |
| Creates a list of datatime objects starting with | |
| the date the person joined Moves to yesterday (the last full day). | |
| """ | |
| base_date = parse(start_date) | |
| yesterday = datetime.datetime.today() - datetime.timedelta(1) | |
| numdays = (yesterday - base_date).days | |
| for x in range(0, numdays): | |
| yield (yesterday - datetime.timedelta(days=x)).date() | |
| @staticmethod | |
| def _create_missing_dates(service_dates, existing_dates, | |
| no_transport_dates): | |
| """ Returns a list of dates that haven't been fetched """ | |
| return [date for date in service_dates | |
| if date not in existing_dates | |
| and date not in no_transport_dates] | |
| @staticmethod | |
| def _pluck_segments(collection, attr): | |
| """ | |
| Plucks a list attribute from a collection and | |
| combines it with other such segments | |
| """ | |
| segments = [d.get(attr) for d in collection | |
| if d.has_key(attr) and d.get(attr)] | |
| return itertools.chain(*segments) | |
| @staticmethod | |
| def _filter_transports(activities): | |
| """ Extracts the transportation activities from user activities """ | |
| return filter(lambda a: a.get('activity') | |
| in ('transport', 'airplane'), activities) | |
| def _fetch_resource(self, resource, date, update_since=None): | |
| """ Fetches a user's Moves summary for a given date range """ | |
| if resource not in ['summary', 'activities', 'places', 'storyline']: | |
| raise ValueError('Invalid Moves resource.') | |
| rsrc_path = "user/{}/daily/{}?".format(resource, date) | |
| if resource == 'storyline': | |
| rsrc_path = "%s&trackPoints=true" % rsrc_path | |
| if update_since: | |
| rsrc_path = "%s&updateSince>T%sZ" % (rsrc_path, update_since) | |
| resources = self.moves.api(rsrc_path, 'GET').json() | |
| return resources | |
| def run(self): | |
| """ Runs the extraction """ | |
| join_date = self.user.get('profile', {}).get('firstDate') | |
| active_dates = self._create_active_daterange(join_date) | |
| existing_dates = self._find_existing_dates() | |
| no_transport_dates = self._find_no_transport_dates() | |
| missing_dates = self._create_missing_dates( | |
| active_dates, existing_dates, no_transport_dates) | |
| transports = [] | |
| for date in missing_dates[:LIMIT]: | |
| storyline = self._fetch_resource('storyline', date) | |
| segments = self._pluck_segments(storyline, 'segments') | |
| activities = self._pluck_segments(segments, 'activities') | |
| transport = self._filter_transports(activities) | |
| if transport: | |
| transports.append(transport) | |
| else: | |
| db.no_transport_dates.insert( | |
| {'date': date.strftime('%Y-%m-%d')}) | |
| raw_transports = list(itertools.chain(*transports)) | |
| return raw_transports | |
| class TransportsTransformer(object): | |
| """ | |
| Transforms a collection of raw transport records by | |
| predicting transportation type and fetching carbon data from the | |
| Brigher Planet api | |
| """ | |
| def __init__(self, db, collection, transports, station_points, model): | |
| self.db = db | |
| self.collection = collection | |
| self.transports = transports | |
| self.station_points = station_points | |
| self.model = model | |
| @staticmethod | |
| def _create_geometry(trackPoint): | |
| """ Takes a lat lon dict and returns a geojson geometry """ | |
| return { | |
| 'type': 'Point', | |
| 'coordinates': [trackPoint['lon'], trackPoint['lat']] | |
| } | |
| @staticmethod | |
| def _create_property(trackPoint): | |
| """ Takes a lat lon dict and returns a geojson property """ | |
| return { | |
| 'latitude': trackPoint['lat'], | |
| 'longitude': trackPoint['lon'], | |
| 'time': trackPoint['time'], | |
| 'id': 'transport' | |
| } | |
| def _create_feature(self, trackPoint): | |
| """ Takes a lat lon dict and returns a geojson feature""" | |
| return { | |
| 'type': 'Feature', | |
| 'properties': self._create_property(trackPoint), | |
| 'geometry': self._create_geometry(trackPoint) | |
| } | |
| def _create_feature_collection(self, transport): | |
| """ Takes a list of lat lon dicts and returns a geojson feature collection """ | |
| return { | |
| 'type': 'FeatureCollection', | |
| 'features': [self._create_feature(tp) | |
| for tp in transport.get('trackPoints')] | |
| } | |
| @staticmethod | |
| def _transform_transport(transport): | |
| """ Updates time information for a transport dict """ | |
| transport['date'] = parse(transport['startTime']).strftime("%Y-%m-%d") | |
| transport['startDatetime'] = parse(transport['startTime']) | |
| transport['endDatetime'] = parse(transport['endTime']) | |
| return transport | |
| def _compute_min_distance(self, lat_lng): | |
| """ | |
| Computes the smallest distance between the start a point | |
| and a subway entrance. | |
| """ | |
| distances = [distance_on_unit_sphere(lat_lng[1], lat_lng[0], point[1], point[0]) | |
| for point in self.station_points] | |
| return min(distances) | |
| def _compute_total_distance(self, first_point, last_point): | |
| """ | |
| Computes distance between the smallest starting and end point | |
| of a given transport. | |
| """ | |
| start = self._compute_min_distance(first_point) | |
| end = self._compute_min_distance(last_point) | |
| return start + end | |
| def _create_features(self, transport): | |
| """ Creates the features for use in the model """ | |
| start_time = parse(transport['startTime']) | |
| end_time = parse(transport['endTime']) | |
| fp = transport['trackPoints'][0] | |
| lp = transport['trackPoints'][-1] | |
| features = [ | |
| transport['distance'], | |
| transport['duration'], | |
| datetime_to_seconds(start_time), | |
| datetime_to_seconds(end_time), | |
| start_time.hour, | |
| len(transport['trackPoints']), | |
| self._compute_total_distance( | |
| [fp['lon'], fp['lat']], | |
| [lp['lon'], lp['lat']]) | |
| ] | |
| return np.array(features).reshape(1, -1) | |
| def _predict_transport_type(self, transport): | |
| """ Predicts the transportation type from a transport dict """ | |
| X = self._create_features(transport) | |
| pred = self.model.predict(X) | |
| return LABELS[pred] | |
| @staticmethod | |
| def _compute_carbon(transport): | |
| """ Computes the kgs of carbon for a given trip | |
| TODO: move distance transformation to transform methods | |
| TODO: add error handling / response checking for request | |
| """ | |
| url = BASE + TYPES[transport['type']] | |
| params = { | |
| 'distance': transport['distance'] / 1000, # convert meters to kilometers | |
| 'key': BRIGHTER_PLANET_API_KEY | |
| } | |
| res = requests.get(url, params=params).json() | |
| kgs = res['decisions']['carbon']['object']['value'] | |
| return kgs | |
| def run(self): | |
| """ Runs the transformation """ | |
| transports = [] | |
| for transport in self.transports: | |
| transport = self._transform_transport(transport) | |
| transport['type'] = self._predict_transport_type(transport) | |
| transport['carbon'] = self._compute_carbon(transport) | |
| transport['geojson'] = self._create_feature_collection(transport) | |
| transports.append(transport) | |
| return transports | |
| def execute(): | |
| """ The main logic to be executed """ | |
| client = pymongo.MongoClient('localhost', 27017) | |
| db = client.carbon | |
| collection = 'moves2' | |
| # Get the user's profile | |
| user = db.users.find_one({'userId': USER_ID}) | |
| moves = mvs.MovesClient(access_token=user['user']['access_token']) | |
| # Find the subway station entrances | |
| subways_entrances = db.subway_entrances.find_one() | |
| features = subways_entrances['features'] | |
| station_points = [p['geometry']['coordinates'] for p in features] | |
| # Load the model | |
| model = pickle.load(open('../models/gradient_boosting.p', 'rb')) | |
| # Extract the raw transport records | |
| extractor = TransportsExtractor(user, db, collection, moves) | |
| raw_transports = extractor.run() | |
| # Transform the raw transport records | |
| transformer = TransportsTransformer(db, collection, raw_transports, | |
| station_points, model) | |
| transformed_transports = transformer.run() | |
| # Load the transformed transport records | |
| db[collection].insert(transformed_transports) | |
| if __name__ == '__main__': | |
| execute() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment