Skip to content

Instantly share code, notes, and snippets.

@pdarche
Created January 4, 2018 04:44
Show Gist options
  • Save pdarche/957d6f937408dc738ea8d825ecca7792 to your computer and use it in GitHub Desktop.
Save pdarche/957d6f937408dc738ea8d825ecca7792 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
This program computes my transportation-related carbon footprint using
Moves app data and the Brighter Planet carbon API. The Moves mobile
app tracks personal activity and displays it as a set of activity segments
like walking, running, and using transportation. I collected a sample of
transportation segments, labeled the transportation type
(car, airplane, subway etc), and then trained a ML algo
to automate the classification. This code pulls my moves data,
classifies the transportation segments, and then adds carbon
information from the Brighter Planet API. The data is then loaded into
a database that powers a dashboard: https://carbon-footprint.now.sh
The main logic is in the "run" method of each class!
"""
import datetime
import itertools
import math
import pickle
from dateutil.parser import parse
import moves as mvs
import numpy as np
import pymongo
import requests
from config import BRIGHTER_PLANET_API_KEY, USER_ID
BASE = 'http://impact.brighterplanet.com/'
LIMIT = 5
LABELS = ['subway', 'bus', 'car', 'airplane']
TYPES = {
'car': 'automobile_trips.json',
'subway': 'rail_trips.json?class=commuter',
'airplane': 'flights.json',
'bus': 'bus_trips.json'
}
# Helper functions
def datetime_to_seconds(dt):
""" Converts datetime to seconds """
return (dt.hour * 3600) + (dt.minute * 60) + dt.second
def distance_on_unit_sphere(lat1, long1, lat2, long2):
""" Computes the distance between two points on a sphere """
# Convert latitude and longitude to
# spherical coordinates in radians.
degrees_to_radians = math.pi/180.0
# phi = 90 - latitude
phi1 = (90.0 - lat1)*degrees_to_radians
phi2 = (90.0 - lat2)*degrees_to_radians
# theta = longitude
theta1 = long1*degrees_to_radians
theta2 = long2*degrees_to_radians
# Compute spherical distance from spherical coordinates.
cos = (math.sin(phi1)*math.sin(phi2)*math.cos(theta1 - theta2) +
math.cos(phi1)*math.cos(phi2))
arc = math.acos(cos)
# Multiply arc by the radius of the earth
earth_rad_miles = 3963.1676
earth_rad_feet = earth_rad_miles
return arc * earth_rad_feet
class TransportsExtractor(object):
"""
Extracts raw Moves App transport records for a user for
a specified set of days
"""
def __init__(self, user, db, collection, moves):
self.user = user
self.db = db
self.collection = collection
self.moves = moves
def _find_existing_dates(self):
""" Finds the earliest update for a moves record. """
docs = self.db[self.collection].distinct('date')
dates = [parse(doc).date() for doc in docs]
return dates
def _find_no_transport_dates(self):
""" Finds the dates that don't have transport records """
docs = self.db.no_transport_dates.distinct('date')
dates = [parse(date).date() for date in docs]
return dates
@staticmethod
def _create_active_daterange(start_date):
"""
Creates a list of datatime objects starting with
the date the person joined Moves to yesterday (the last full day).
"""
base_date = parse(start_date)
yesterday = datetime.datetime.today() - datetime.timedelta(1)
numdays = (yesterday - base_date).days
for x in range(0, numdays):
yield (yesterday - datetime.timedelta(days=x)).date()
@staticmethod
def _create_missing_dates(service_dates, existing_dates,
no_transport_dates):
""" Returns a list of dates that haven't been fetched """
return [date for date in service_dates
if date not in existing_dates
and date not in no_transport_dates]
@staticmethod
def _pluck_segments(collection, attr):
"""
Plucks a list attribute from a collection and
combines it with other such segments
"""
segments = [d.get(attr) for d in collection
if d.has_key(attr) and d.get(attr)]
return itertools.chain(*segments)
@staticmethod
def _filter_transports(activities):
""" Extracts the transportation activities from user activities """
return filter(lambda a: a.get('activity')
in ('transport', 'airplane'), activities)
def _fetch_resource(self, resource, date, update_since=None):
""" Fetches a user's Moves summary for a given date range """
if resource not in ['summary', 'activities', 'places', 'storyline']:
raise ValueError('Invalid Moves resource.')
rsrc_path = "user/{}/daily/{}?".format(resource, date)
if resource == 'storyline':
rsrc_path = "%s&trackPoints=true" % rsrc_path
if update_since:
rsrc_path = "%s&updateSince>T%sZ" % (rsrc_path, update_since)
resources = self.moves.api(rsrc_path, 'GET').json()
return resources
def run(self):
""" Runs the extraction """
join_date = self.user.get('profile', {}).get('firstDate')
active_dates = self._create_active_daterange(join_date)
existing_dates = self._find_existing_dates()
no_transport_dates = self._find_no_transport_dates()
missing_dates = self._create_missing_dates(
active_dates, existing_dates, no_transport_dates)
transports = []
for date in missing_dates[:LIMIT]:
storyline = self._fetch_resource('storyline', date)
segments = self._pluck_segments(storyline, 'segments')
activities = self._pluck_segments(segments, 'activities')
transport = self._filter_transports(activities)
if transport:
transports.append(transport)
else:
db.no_transport_dates.insert(
{'date': date.strftime('%Y-%m-%d')})
raw_transports = list(itertools.chain(*transports))
return raw_transports
class TransportsTransformer(object):
"""
Transforms a collection of raw transport records by
predicting transportation type and fetching carbon data from the
Brigher Planet api
"""
def __init__(self, db, collection, transports, station_points, model):
self.db = db
self.collection = collection
self.transports = transports
self.station_points = station_points
self.model = model
@staticmethod
def _create_geometry(trackPoint):
""" Takes a lat lon dict and returns a geojson geometry """
return {
'type': 'Point',
'coordinates': [trackPoint['lon'], trackPoint['lat']]
}
@staticmethod
def _create_property(trackPoint):
""" Takes a lat lon dict and returns a geojson property """
return {
'latitude': trackPoint['lat'],
'longitude': trackPoint['lon'],
'time': trackPoint['time'],
'id': 'transport'
}
def _create_feature(self, trackPoint):
""" Takes a lat lon dict and returns a geojson feature"""
return {
'type': 'Feature',
'properties': self._create_property(trackPoint),
'geometry': self._create_geometry(trackPoint)
}
def _create_feature_collection(self, transport):
""" Takes a list of lat lon dicts and returns a geojson feature collection """
return {
'type': 'FeatureCollection',
'features': [self._create_feature(tp)
for tp in transport.get('trackPoints')]
}
@staticmethod
def _transform_transport(transport):
""" Updates time information for a transport dict """
transport['date'] = parse(transport['startTime']).strftime("%Y-%m-%d")
transport['startDatetime'] = parse(transport['startTime'])
transport['endDatetime'] = parse(transport['endTime'])
return transport
def _compute_min_distance(self, lat_lng):
"""
Computes the smallest distance between the start a point
and a subway entrance.
"""
distances = [distance_on_unit_sphere(lat_lng[1], lat_lng[0], point[1], point[0])
for point in self.station_points]
return min(distances)
def _compute_total_distance(self, first_point, last_point):
"""
Computes distance between the smallest starting and end point
of a given transport.
"""
start = self._compute_min_distance(first_point)
end = self._compute_min_distance(last_point)
return start + end
def _create_features(self, transport):
""" Creates the features for use in the model """
start_time = parse(transport['startTime'])
end_time = parse(transport['endTime'])
fp = transport['trackPoints'][0]
lp = transport['trackPoints'][-1]
features = [
transport['distance'],
transport['duration'],
datetime_to_seconds(start_time),
datetime_to_seconds(end_time),
start_time.hour,
len(transport['trackPoints']),
self._compute_total_distance(
[fp['lon'], fp['lat']],
[lp['lon'], lp['lat']])
]
return np.array(features).reshape(1, -1)
def _predict_transport_type(self, transport):
""" Predicts the transportation type from a transport dict """
X = self._create_features(transport)
pred = self.model.predict(X)
return LABELS[pred]
@staticmethod
def _compute_carbon(transport):
""" Computes the kgs of carbon for a given trip
TODO: move distance transformation to transform methods
TODO: add error handling / response checking for request
"""
url = BASE + TYPES[transport['type']]
params = {
'distance': transport['distance'] / 1000, # convert meters to kilometers
'key': BRIGHTER_PLANET_API_KEY
}
res = requests.get(url, params=params).json()
kgs = res['decisions']['carbon']['object']['value']
return kgs
def run(self):
""" Runs the transformation """
transports = []
for transport in self.transports:
transport = self._transform_transport(transport)
transport['type'] = self._predict_transport_type(transport)
transport['carbon'] = self._compute_carbon(transport)
transport['geojson'] = self._create_feature_collection(transport)
transports.append(transport)
return transports
def execute():
""" The main logic to be executed """
client = pymongo.MongoClient('localhost', 27017)
db = client.carbon
collection = 'moves2'
# Get the user's profile
user = db.users.find_one({'userId': USER_ID})
moves = mvs.MovesClient(access_token=user['user']['access_token'])
# Find the subway station entrances
subways_entrances = db.subway_entrances.find_one()
features = subways_entrances['features']
station_points = [p['geometry']['coordinates'] for p in features]
# Load the model
model = pickle.load(open('../models/gradient_boosting.p', 'rb'))
# Extract the raw transport records
extractor = TransportsExtractor(user, db, collection, moves)
raw_transports = extractor.run()
# Transform the raw transport records
transformer = TransportsTransformer(db, collection, raw_transports,
station_points, model)
transformed_transports = transformer.run()
# Load the transformed transport records
db[collection].insert(transformed_transports)
if __name__ == '__main__':
execute()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment