|
"""Pulls data from the https://github.com/CSSEGISandData/COVID-19 repository.""" |
|
import argparse |
|
import json |
|
import hashlib |
|
import base64 |
|
import os |
|
from io import StringIO |
|
from datetime import datetime |
|
import requests |
|
import pandas as pd |
|
|
|
|
|
class GitHub: |
|
"""Simple wrapper around the GitHub API.""" |
|
_headers = { |
|
"Authotization": f"token {os.environ['GHTOKEN']}" |
|
} |
|
|
|
_endpoint = "https://api.github.com" |
|
|
|
def get_file(self, user: str, repo: str, path: str): |
|
"""Parses a file from the github API. |
|
|
|
Parameters |
|
---------- |
|
user : str |
|
self-explanatory. |
|
repo : str |
|
self-explanatory |
|
path : str |
|
Path to the file which needs to be fetched.""" |
|
return requests.get( |
|
f"{self._endpoint}/repos/{user}/{repo}/contents/{path}", |
|
headers=self.get_headers()).json() |
|
|
|
def get_headers(self): |
|
"""Simply gets the headers for the Github authentication. |
|
|
|
Returns |
|
------- |
|
dict""" |
|
return self._headers |
|
|
|
|
|
class Transform: |
|
"""Transforms the CSV data into a json format. |
|
|
|
Parameters |
|
---------- |
|
data : str |
|
CSV-like string.""" |
|
_data = None |
|
_df = None |
|
|
|
def __init__(self, data: str): |
|
self._data = data |
|
self._parse() |
|
|
|
def _parse(self): |
|
"""Parses the CSV string and turns it into |
|
a Pandas DataFrame. |
|
|
|
Returns |
|
------- |
|
pd.DataFrame""" |
|
self._df = pd.read_csv(StringIO(base64.b64decode(self._data).decode())) |
|
|
|
def run(self): |
|
"""Does all the magic. |
|
|
|
Returns |
|
------- |
|
list""" |
|
result = [] |
|
for _, row_value in self._df.iterrows(): |
|
result.append(dict(zip( |
|
[datetime.strptime(i, "%m/%d/%y").strftime("%Y-%m-%d") |
|
if k > 3 else i for k, i in |
|
enumerate(row_value.keys().tolist())], |
|
row_value.tolist()))) |
|
|
|
fin_ = [] |
|
for _, val in enumerate(result): |
|
row_data = { |
|
"province": val["Province/State"], |
|
"country": val["Country/Region"], |
|
"lat": val["Lat"], |
|
"lon": val["Long"], |
|
"data": [] |
|
} |
|
for date, count in list(val.items())[4:]: |
|
row_data["data"].append({"date": date, "count": count}) |
|
fin_.append(row_data) |
|
return fin_ |
|
|
|
def __repr__(self): |
|
"""Printing and whatnot...""" |
|
return str(self) |
|
|
|
|
|
class Monitoring(GitHub): |
|
"""The final wrapper, calls github, fetches the data, |
|
checks if any modifications have been made and updates the files. |
|
|
|
|
|
Parameters |
|
---------- |
|
user : str |
|
self-explanatory. |
|
repo : str |
|
self-explanatory. |
|
files : dict |
|
k-v name-path in repo. |
|
compare : str |
|
path to the file where the checksums are stored. |
|
path : str |
|
where the files should be stored.""" |
|
_files = [] |
|
_user = None |
|
_repo = None |
|
_compare = None |
|
_cmp = None |
|
_report = None |
|
_path = None |
|
|
|
def __init__(self, |
|
user: str, |
|
repo: str, |
|
files: dict, |
|
compare: str, |
|
path: str): |
|
self._user = user |
|
self._repo = repo |
|
self._files = files |
|
self._compare = compare |
|
self._parse_checksum() |
|
self._report = datetime.utcnow().isoformat() |
|
self._path = path |
|
|
|
def _parse_checksum(self): |
|
"""Parses the JSON file with the existing checksums. |
|
|
|
Returns |
|
------- |
|
None""" |
|
if os.path.exists(self._compare) and not self._cmp: |
|
with open(self._compare) as file_handler: |
|
self._cmp = json.loads(file_handler.read()) |
|
else: |
|
self._cmp = {} |
|
|
|
def _store_checksum(self): |
|
"""Stores the checksums of the newly fetched files. |
|
|
|
Returns |
|
------- |
|
None""" |
|
with open(self._compare, "w") as file_handler: |
|
file_handler.write(json.dumps(self._cmp, indent=4)) |
|
|
|
def compare(self, repofile: str, checksum: str): |
|
"""Compares the sha checksum from Github against |
|
the one from the previous run. |
|
|
|
Parameters |
|
---------- |
|
repofile : str |
|
path to the file from the repo. |
|
checksum : str |
|
The sha checksum provided by the Github API. |
|
|
|
Returns |
|
------- |
|
bool""" |
|
return self._cmp.get(repofile) == checksum |
|
|
|
@staticmethod |
|
def create_signature(data): |
|
"""Creates a hmac signature of the data. |
|
|
|
Parameters |
|
---------- |
|
data : str |
|
Literally anything. |
|
|
|
Returns |
|
------- |
|
bytearray""" |
|
return hashlib.sha512(f"{data}:{os.environ['BLEEPBLOOP']}".encode()).hexdigest() |
|
|
|
@staticmethod |
|
def save(path: str, data: str): |
|
"""Saves the output of the transformation. |
|
|
|
Parameters |
|
---------- |
|
path : str |
|
Full path |
|
data : dict |
|
json-serializable dict.""" |
|
with open(path, "w") as file_handler: |
|
file_handler.write(json.dumps(data)) |
|
|
|
def fetch(self): |
|
"""Where all the magic Happens.""" |
|
for name, github_file in self._files.items(): |
|
data = self.get_file(self._user, self._repo, github_file) |
|
if not self.compare(github_file, data["sha"]): |
|
transformed = Transform(data["content"]).run() |
|
json_str = json.dumps(transformed) |
|
checksum = hashlib.sha256(json_str.encode()).hexdigest() |
|
result = { |
|
"checksum": checksum, |
|
"timestamp": self._report, |
|
"source": github_file, |
|
"user": self._user, |
|
"repo": self._repo, |
|
"signature": Monitoring.create_signature(json_str), |
|
"data": transformed |
|
} |
|
self.set_checksum(github_file, data["sha"]) |
|
Monitoring.save(f"{self._path}/{name}.json", result) |
|
self._store_checksum() |
|
|
|
def set_checksum(self, file_path: str, checksum: str): |
|
"""Updates the existing checksum. |
|
|
|
Parametes |
|
--------- |
|
file_path : str |
|
Github file path. |
|
checksum : str |
|
sha checksum from the github API.""" |
|
self._cmp[file_path] = checksum |
|
|
|
def __repr__(self): |
|
"""Printing and whatnot...""" |
|
return str(self) |
|
|
|
|
|
if __name__ == "__main__": |
|
PARSER = argparse.ArgumentParser() |
|
PARSER.add_argument( |
|
"--csfile", |
|
required=True, |
|
type=str, |
|
help="Path to a file where the checksums of old runs should be stored." |
|
) |
|
PARSER.add_argument( |
|
"--output", |
|
required=True, |
|
type=str, |
|
help="Where the output should be stored" |
|
) |
|
FLAGS, _ = PARSER.parse_known_args() |
|
CSVS = { |
|
"recovered": "csse_covid_19_data/csse_covid_19_time_series/time_series_19-covid-Recovered.csv", |
|
"deaths": "csse_covid_19_data/csse_covid_19_time_series/time_series_19-covid-Deaths.csv", |
|
"confirmed": "csse_covid_19_data/csse_covid_19_time_series/time_series_19-covid-Confirmed.csv" |
|
} |
|
MONITOR = Monitoring("CSSEGISandData", "COVID-19", CSVS, FLAGS.csfile, FLAGS.output) |
|
MONITOR.fetch() |