Last active
October 31, 2021 11:28
-
-
Save tim-fan/5f601c274a30505b1ae6b989a0150877 to your computer and use it in GitHub Desktop.
Animated map of New Zealand's August 2021 Covid outbreak locations of interest over time
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pathlib import Path | |
from git import Repo | |
import pandas as pd | |
import numpy as np | |
from datetime import datetime, timedelta | |
import dateutil.parser as dateparser | |
import plotly.express as px | |
# # requirements.txt: | |
# gitpython | |
# pandas | |
# plotly | |
poi_repo_url = "https://github.com/minhealthnz/nz-covid-data.git" | |
repo_dir = "/tmp/nz-covid-data" | |
loi_csv_path = Path(repo_dir)/"locations-of-interest"/"august-2021"/"locations-of-interest.csv" | |
def read_locations_of_interest(csv_path:Path) -> pd.DataFrame: | |
loi = pd.read_csv(csv_path, encoding = "ISO-8859-1", engine='python') | |
if "id" not in loi.columns: | |
raise ValueError("Exepected id column not found in location of interest data") | |
if "Updated" not in loi.columns: | |
# if updated field not present, use Start | |
loi["Added"] = loi.Start | |
loi["Updated"] = loi.Start | |
for column_name in ["Start", "End", "Added", "Updated"]: | |
loi[column_name] = loi[column_name].apply(parse_locations_of_interest_time_str) | |
return loi | |
def parse_locations_of_interest_time_str(time_str:str) -> datetime: | |
""" | |
Convert time string as recorded in location of interest date to | |
datetime type. | |
Complicated because at least four formats exist: | |
12/09/2021 10:29 | |
2021-09-13 16:33:29 | |
15/09/2021, 10:30 am | |
NA | |
dateutil parser can handle these | |
""" | |
try: | |
return dateparser.parse(time_str, dayfirst=True, yearfirst=True) | |
except TypeError: | |
return pd.NaT | |
except ValueError: | |
return pd.NaT | |
def read_all_location_of_interest_data(repo_dir) -> pd.DataFrame: | |
repo = Repo(repo_dir) | |
assert not repo.bare | |
repo.git.checkout("main") | |
repo.git.pull('origin','main') | |
locations_of_interest = read_locations_of_interest(loi_csv_path) | |
# read LOI file at all commits, and append into one tall dataframe | |
# for commit in list(repo.iter_commits())[0:10]: | |
for commit in repo.iter_commits(): | |
print(f"{commit} ({commit.authored_date}): {commit.message}") | |
repo.git.checkout(commit) | |
if loi_csv_path.is_file(): | |
try: | |
loi_at_commit = read_locations_of_interest(loi_csv_path) | |
loi_at_commit["source_sha1"] = str(commit) # for tracing info back to source | |
locations_of_interest = locations_of_interest.append(loi_at_commit) | |
except UnicodeDecodeError as e: | |
print(f"Warning: Discarding data due to unicode decode error parsing LOI file {loi_csv_path.name} at commit {commit}") | |
print(e) | |
except ValueError as e: | |
print(f"Warning: discarding data at commit {commit} due to error (error follows)") | |
print(e) | |
else: | |
print(f"Warning, LOI file {loi_csv_path.name} does not exist in commit {commit}") | |
# get rid of rows without coord data | |
locations_of_interest = locations_of_interest.dropna(subset=["LAT", "LNG"]) | |
# reset index | |
locations_of_interest = locations_of_interest.reset_index(drop=True) | |
# correct apparent typo: exposure a0l4a0000004XEG is recorded ending at | |
# 30/9, although at time of writing that date is in the future | |
# note that start date was 30/8, so I'm assuming end date is supposed to be the same | |
locations_of_interest.loc[locations_of_interest.id == "a0l4a0000004XEG", "End"] = parse_locations_of_interest_time_str("30/08/2021, 3:00 pm") | |
# Will now have a big dataframe, but many locations will be duplicated. | |
# De-duplication policy: | |
# - detect duplicates by ID | |
# - keep the one with the most recent updated date | |
# All locations have an 'added' date, but not all have an 'updated' date | |
# To fill out the 'updated' date, wherever the 'updated' date does not | |
# exist, fill with the 'added' date | |
locations_of_interest.loc[locations_of_interest.Updated.isnull(),"Updated"] = locations_of_interest.Added[locations_of_interest.Updated.isnull()] | |
# for dup ids with different values, keep most recent | |
num_unique_locations = len(locations_of_interest.id.unique()) | |
idx=locations_of_interest.groupby(by='id')['Updated'].idxmax() | |
locations_of_interest = locations_of_interest.iloc[idx,] | |
# make sure we end up with a df with one row per exposure id | |
assert locations_of_interest.shape[0] == num_unique_locations | |
assert len(locations_of_interest.id.unique()) == num_unique_locations | |
locations_of_interest = locations_of_interest.reset_index(drop=True) | |
return locations_of_interest | |
def get_time_since_active(query_time:datetime, start_time:datetime, end_time:datetime) -> timedelta: | |
""" | |
Given a query time and location of interest start and end times return 'days_since_active', | |
specifying the timedelta between the query time and the location end time. | |
If the location is active at the given time, time since active will be zero. | |
If the given time is before the location start time, time since active will be negative. | |
""" | |
if query_time > end_time: | |
# location no longer active | |
return query_time - end_time # result will be positive | |
elif query_time < start_time: | |
# location not yet active | |
return query_time - start_time # result will be negative | |
else: | |
# location curently active | |
return timedelta(0) | |
def create_timeseries(locations_of_interest:pd.DataFrame) -> pd.DataFrame: | |
""" | |
Create dataframe where, for each date over a range of dates covering the outbreak, | |
all the LOI info is present along with a time_since_active column for that date | |
(see get_time_since_active) | |
""" | |
# start animation a bit before latest location of interest | |
timeseries_start_time=locations_of_interest.Start.min() - timedelta(days=1) | |
timeseries_end_time=locations_of_interest.End.max() | |
date_samples = pd.date_range(start=timeseries_start_time, end=timeseries_end_time, freq='6H') | |
loi_timeseries = pd.DataFrame() | |
for query_time in date_samples: | |
time_since_active_for_row = lambda row : get_time_since_active(query_time, row.Start, row.End) | |
time_since_active:pd.Series = locations_of_interest.apply(time_since_active_for_row, axis=1) | |
time_since_active.name = "TimeSinceActive" | |
location_info_for_time = pd.concat([locations_of_interest, time_since_active], axis=1) | |
location_info_for_time["Time"] = query_time | |
loi_timeseries = loi_timeseries.append(location_info_for_time) | |
return loi_timeseries | |
def alpha_from_time_since_active(time_since_active: timedelta) -> float: | |
""" | |
Return an alpha value for visualising a given location of interest, based on | |
how long it has been since it was active | |
""" | |
# desired time_since_active to alpha mapping: | |
# -ve -> 0 | |
# 0 -> 1 | |
# 1/2 fade_time -> 0.5 | |
# fade_time -> 0 | |
# | |
# fade time determines how long after active the location becomes invisible | |
fade_time = timedelta(days=5) | |
max_alpha = 0.75 | |
alpha = (fade_time - time_since_active) / fade_time | |
if alpha > 1.0 or alpha < 0.0: | |
alpha = 0 | |
return alpha * max_alpha | |
def size_from_time_since_active(time_since_active: timedelta) -> float: | |
# for size, decay nonlinear (want more emphasis initially, then more gradual decay) | |
fade_time = timedelta(days=5) | |
fade_portion = time_since_active / fade_time | |
if fade_portion < 0: | |
size = 0. | |
elif fade_portion > 1.0: | |
size = 0. | |
else: | |
size = 1 - np.sin(fade_portion * np.pi/2) # take a segment of a sin curve | |
return size | |
def animate_loi_timeseries(loi_timeseries:pd.DataFrame): | |
# fade locations based on time since active | |
loi_timeseries["alpha"] = loi_timeseries.TimeSinceActive.apply(alpha_from_time_since_active) | |
# scatter_mapbox errors if time is provided as datetime - so convert to str | |
loi_timeseries["Time"] = loi_timeseries.Time.apply(lambda t : str(t)) | |
loi_timeseries["size"] = loi_timeseries.TimeSinceActive.apply(size_from_time_since_active) | |
fig = px.scatter_mapbox(loi_timeseries, lat="LAT", lon="LNG", | |
animation_frame = 'Time', animation_group = 'id', | |
size="size", | |
size_max=20, | |
title = 'August outbreak locations of interest', | |
hover_name='Event', | |
hover_data = ['Start'], | |
mapbox_style="carto-positron") | |
# fig.update_traces( hovertemplate=None, hoverinfo='skip') | |
fig.layout.updatemenus[0].buttons[0].args[1]["frame"]["duration"] = 100 | |
fig.write_html("/tmp/fig.html") | |
fig.show() | |
if __name__ == "__main__": | |
print("begin") | |
if not Path(repo_dir).is_dir(): | |
Repo.clone_from(poi_repo_url, repo_dir) | |
locations_of_interest = read_all_location_of_interest_data(repo_dir) | |
# cache save/load to speed up dev: | |
# locations_of_interest.to_pickle("/tmp/loi.pickle") | |
# locations_of_interest = pd.read_pickle("/tmp/loi.pickle") | |
# write locations to .csv (can be uploaded to kepler.gl for further exploration) | |
locations_of_interest.to_csv("/tmp/loi.csv") | |
loi_timeseries = create_timeseries(locations_of_interest) | |
animate_loi_timeseries(loi_timeseries) | |
print("done") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment