Created
August 12, 2021 21:08
-
-
Save hadisfr/01895e40b5262636c9a8b48eda44a030 to your computer and use it in GitHub Desktop.
plot graphs and animations of COVID-19 pandemic data from UNICEF population data and https://github.com/owid/covid-19-data
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import subprocess | |
import re | |
from os import path | |
import pandas as pd | |
import geopandas as gpd | |
import numpy as np | |
import seaborn as sns | |
from tqdm import tqdm | |
from matplotlib import rcParams, pyplot as plt, dates as mdates | |
rcParams["svg.fonttype"] = "none" | |
rcParams['font.family'] = 'Times New Roman' | |
per_capita = True | |
log = False | |
overlay_path = "overlay.png" | |
first_date = { | |
"new_cases": "2020-01-23", | |
"new_cases_smoothed": "2020-01-23", | |
"new_deaths": "2020-01-23", | |
"new_deaths_smoothed": "2020-01-23", | |
"new_tests": "2020-01-01", | |
"new_tests_smoothed": "2020-01-01", | |
"people_vaccinated": "2020-12-04", | |
"people_vaccinated_smoothed": "2020-12-04", | |
"people_fully_vaccinated": "2020-12-27", | |
"people_fully_vaccinated_smoothed": "2020-12-27", | |
} | |
countries = {"IRN", "RUS", "TUR", "USA"} | |
countries = ["IRN", "ISL", "ARE", "SWE", "GBR", "RUS", "QAT", "CHN", "PER", "HUN", "ITA", "NLD", "TUR", "USA"] | |
countries = ["IRN", "ARE", "CHN", "PER", "HUN", "ITA", "NLD", "USA", "CUB"] | |
countries = {"IRN"} | |
def from_unicef(src_key="Total population (thousands)", dst_key="population"): | |
df = pd.read_csv("UNICEF_latest.csv") | |
df = df[df["Indicator"] == src_key] | |
df = df[["LOCATION", "Value"]].rename(columns={"LOCATION": "iso_a3", "Value": dst_key}) | |
return df | |
def get_world(): | |
world = gpd.read_file(gpd.datasets.get_path('naturalearth_lowres')) | |
# fix for bug https://github.com/geopandas/geopandas/issues/1041 | |
world.loc[world['name'] == 'France', 'iso_a3'] = 'FRA' | |
world.loc[world['name'] == 'Norway', 'iso_a3'] = 'NOR' | |
world.loc[world['name'] == 'N. Cyprus', 'iso_a3'] = 'CYP' | |
world.loc[world['name'] == 'Somaliland', 'iso_a3'] = 'SOM' | |
world.loc[world['name'] == 'Kosovo', 'iso_a3'] = 'RKS' | |
pop_df = from_unicef("Total population (thousands)", "population") | |
world = pd.merge(world, pop_df, on="iso_a3", how="left") | |
world["population"] = world["population"] * 1000 | |
world["population"] = world["population"].fillna(world["pop_est"]) | |
world["population"] = world["population"].astype("int32") | |
world = world[(world.pop_est > 0) & (world.name != "Antarctica")] | |
print("World DS:\n%s\n" % world) | |
return world | |
def get_covid(): | |
df = pd.read_csv("owid-covid-data.csv") | |
print("COVID-19 DS Columns:\n%s\n" % "\n".join(list(map(lambda c: "\t%s" % c, df.columns)))) | |
df = df.rename(columns={"iso_code": "iso_a3"}) | |
print("COVID-19 DS:\n%s\n" % df) | |
return df | |
def compare_countries(world, covid_df): | |
world_countries = set(world["iso_a3"]) | |
covid_df_countries = set(covid_df["iso_a3"]) | |
print("missing COVID-19 data:\t%s" % (world_countries - covid_df_countries)) | |
print("missing country data:\t%s" % (covid_df_countries - world_countries)) | |
def interpolate_covid(covid_per_col, col): | |
covid_per_col = covid_per_col.rename(columns={col: "Metric"}) | |
covid_per_col = covid_per_col.pivot(index=["date"], columns=["iso_a3"], values=["Metric"]).reset_index() | |
# covid_per_col = covid_per_col.fillna(method="bfill") | |
covid_per_col = covid_per_col.interpolate() | |
covid_per_col = covid_per_col.fillna(0) | |
covid_per_col = covid_per_col.melt(id_vars="date", var_name=["Metric", "iso_a3"], value_name=col) | |
covid_per_col = covid_per_col.drop("Metric", axis=1) | |
return covid_per_col | |
def filter_by_date(covid_per_col, col): | |
covid_per_col = covid_per_col[covid_per_col["date"] >= first_date[col]] | |
dates = list(sorted(set(covid_per_col["date"]))) | |
print("\nfrom %s to %s (%d days)\n" % (dates[0], dates[-1], len(dates))) | |
return covid_per_col, dates | |
def fill_new_col(df, col): | |
new_col = col.title().replace("_", " ").replace("New", "Daily") | |
new_col = "COVID-19 " + new_col | |
if per_capita: | |
new_col += " per Million" if "vaccin" not in col.lower() else " (%)" | |
if log: | |
new_col += " (log10)" | |
new_col = re.sub(r"(.*) Smoothed(.*)", r"\1\2 (Smoothed)", new_col) | |
df[new_col] = df[col] | |
df[new_col] = df[new_col].mask(df[new_col] < 0, 0) | |
if per_capita: | |
df[new_col] = df[new_col] / df["population"] * (1_000_000 if "vaccin" not in col.lower() else 100) | |
if log: | |
df[new_col] = df[new_col].replace(0, np.nan) | |
df[new_col] = np.log10(df[new_col]) | |
df[new_col] = df[new_col].fillna(0 if not per_capita else np.nanmin(df[new_col])) | |
# print(df[df[new_col].notna()]) | |
return df, new_col | |
def plot_map_timeline(world, covid_df): | |
fig = plt.figure(figsize=(15, 5)) | |
# for col in ["new_deaths"]: | |
for col in ["new_cases", "new_deaths", "new_tests", "people_vaccinated", "people_fully_vaccinated"]: | |
print(col) | |
path_prefix = path.join("results", col) | |
subprocess.run(["rm", "-r", path_prefix]) | |
subprocess.run(["mkdir", "-p", path_prefix]) | |
covid_per_col = covid_df[["iso_a3", "date", col]] | |
covid_per_col, dates = filter_by_date(covid_per_col, col) | |
covid_per_col = interpolate_covid(covid_per_col, col) | |
# for date in tqdm(dates[100:103]): | |
# for date in tqdm(dates[::100]): | |
# for date in tqdm(dates[::10]): | |
for date in tqdm(dates): | |
world_per_date = world.copy() | |
covid_per_date = covid_per_col[covid_per_col["date"] == date] | |
world_per_date = pd.merge(world_per_date, covid_per_date, on="iso_a3", how="left") | |
world_per_date, new_col = fill_new_col(world_per_date, col) | |
# fig = plt.figure(figsize=(15, 5)) | |
ax = fig.add_subplot(1, 1, 1) | |
world_per_date.plot(column=new_col, ax=ax, legend=True) | |
plt.axis("off") | |
plt.title("%s (%s)" % (new_col, date)) | |
# plt.show() | |
plt.savefig(path.join(path_prefix, "%s_%s.png" % (col, date))) | |
plt.clf() | |
intermediate_movie_path = path.join(path_prefix, "%s.mp4" % col) | |
final_movie_path = path.join(path_prefix, "%s_final.mp4" % col) | |
subprocess.run("ffmpeg -pattern_type glob -i '%s' -pix_fmt yuv420p %s" % | |
(path.join(path_prefix, "*.png"), intermediate_movie_path), shell=True) | |
subprocess.run("ffmpeg -i %s -i %s -filter_complex \"overlay=(main_w-overlay_w)/2:(main_h-overlay_h)/2\" -qscale:v 0 -codec:a copy %s" % | |
(intermediate_movie_path, overlay_path, final_movie_path), shell=True) | |
def plot_timeline_graph(world, covid_df): | |
sns.set_theme() | |
sns.set(font="Times New Roman") | |
# for col in ["new_deaths_smoothed"]: | |
for col in ["new_cases_smoothed", "new_deaths_smoothed", "new_tests_smoothed", "new_cases", "new_deaths", "new_tests", "people_vaccinated", "people_fully_vaccinated"]: | |
print(col) | |
covid_per_col = covid_df[["iso_a3", "date", col]] | |
covid_per_col = covid_per_col[covid_per_col["iso_a3"].isin(countries)] | |
if "vaccin" in col.lower(): | |
covid_per_col = covid_per_col[covid_per_col["iso_a3"] != "CHN"] | |
covid_per_col, dates = filter_by_date(covid_per_col, col) | |
covid_per_col = interpolate_covid(covid_per_col, col) | |
covid_per_col = pd.merge(covid_per_col, world, on="iso_a3", how="left") | |
covid_per_col, new_col = fill_new_col(covid_per_col, col) | |
covid_per_col["date"] = pd.to_datetime(covid_per_col["date"], format='%Y-%m-%d') | |
fig, ax = plt.subplots(figsize=(8, 4.5)) | |
plt.title(new_col, fontweight="bold") | |
ax.xaxis.set_major_locator(mdates.AutoDateLocator()) | |
ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y %b')) | |
fig.autofmt_xdate() | |
covid_per_col["Country"] = covid_per_col["iso_a3"] | |
sns.lineplot(data=covid_per_col, x="date", y=new_col, hue="Country") | |
if "USA" in countries: | |
biden_date = "2021-01-20" | |
biden_y = float(covid_per_col[(covid_per_col["date"] == biden_date) & (covid_per_col["iso_a3"] == "USA")][new_col]) | |
plt.scatter(np.datetime64(biden_date), biden_y) | |
ax.annotate("Joe Biden", (np.datetime64(biden_date), biden_y)) | |
plt.tight_layout() | |
# plt.show() | |
path_prefix = path.join("results") | |
plt.savefig(path.join(path_prefix, "%s.svg" % (col))) | |
def main(): | |
world = get_world() | |
world = world[["iso_a3", "geometry", "population"]] | |
covid_df = get_covid() | |
compare_countries(world, covid_df) | |
plot_map_timeline(world, covid_df) | |
plot_timeline_graph(world, covid_df) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment