Created
June 1, 2021 09:11
-
-
Save aymkx/a4466d68652d18332355168ab1215924 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
import datetime as dt | |
import json | |
from os.path import exists, join | |
import sys | |
from typing import Any, Dict, List, Optional, Tuple, Union | |
from urllib.parse import urlunsplit | |
from dateutil import tz | |
import requests | |
from tabulate import tabulate | |
JST = tz.gettz("Asia/Tokyo") | |
OBSERVATORY_LIST = "observatories.json" | |
JMA = "www.jma.go.jp" | |
PATH_AMEDAS_DATA_POINT = "/bosai/amedas/data/point" | |
PATH_AMEDAS_CONST = "/bosai/amedas/const" | |
JsonValue = Union[ | |
str, | |
int, | |
float, | |
bool, | |
None, | |
List["JsonValue"], | |
Dict[str, "JsonValue"] | |
] | |
Observatories: dict[str, int] = {} | |
if exists(OBSERVATORY_LIST): | |
with open(OBSERVATORY_LIST) as f: | |
Observatories = json.load(f) | |
def load_json(s: str) -> JsonValue: | |
return json.loads(s) | |
def gen_url(path: str, query: str = "", fragment: str = "") -> str: | |
return urlunsplit(["https", JMA, path, query, fragment]) | |
def search_observatory_number(name: str) -> Optional[int]: | |
global Observatories | |
if not name in Observatories: | |
try: | |
amedastable = load_json(requests.get( | |
gen_url(join(PATH_AMEDAS_CONST, "amedastable.json"))).text) | |
except json.JSONDecodeError: | |
pass | |
else: | |
if isinstance(amedastable, dict): | |
english: dict[str, int] = {} | |
kanji: dict[str, int] = {} | |
kana: dict[str, int] = {} | |
for number, detail in amedastable.items(): | |
if isinstance(detail, dict): | |
enName = detail.get("enName") | |
kjName = detail.get("kjName") | |
knName = detail.get("knName") | |
if not isinstance(enName, str): | |
enName = "" | |
if not isinstance(kjName, str): | |
kjName = "" | |
if not isinstance(knName, str): | |
knName = "" | |
try: | |
num = int(number) | |
except ValueError: | |
continue | |
english[enName] = num | |
kanji[kjName] = num | |
kana[knName] = num | |
Observatories.update(english) | |
Observatories.update(kanji) | |
Observatories.update(kana) | |
with open(OBSERVATORY_LIST, "w") as f: | |
json.dump(Observatories, f, ensure_ascii=False, indent=4) | |
return Observatories.get(name) | |
def get_latest_two_filenames() -> dict[str, str]: | |
now = dt.datetime.now().astimezone(JST) | |
threehours_before = now - dt.timedelta(hours=3) | |
return { | |
"later": now.strftime("%Y%m%d") + f"_{now.hour // 3 * 3:02d}.json", | |
"earlier": threehours_before.strftime("%Y%m%d") + | |
f"_{threehours_before.hour // 3 * 3:02d}.json", | |
} | |
def get_amedas_data(observatory: int) -> dict[dt.datetime, dict | |
[str, JsonValue]]: | |
filenames = get_latest_two_filenames() | |
later_url = gen_url( | |
join( | |
PATH_AMEDAS_DATA_POINT, str(observatory), | |
filenames["later"])) | |
earlier_url = gen_url( | |
join( | |
PATH_AMEDAS_DATA_POINT, str(observatory), | |
filenames["earlier"])) | |
later = requests.get(later_url) | |
earlier = requests.get(earlier_url) | |
data: dict[str, JsonValue] = {} | |
try: | |
if earlier.status_code == 200: | |
parsed = load_json(earlier.text) | |
if isinstance(parsed, dict): | |
data.update(parsed) | |
except json.JSONDecodeError: | |
pass | |
try: | |
if later.status_code == 200: | |
parsed = load_json(later.text) | |
if isinstance(parsed, dict): | |
data.update(parsed) | |
except json.JSONDecodeError: | |
pass | |
ret: dict[dt.datetime, dict[str, JsonValue]] = {} | |
for k, v in data.items(): | |
if isinstance(v, dict): | |
ret[dt.datetime.strptime( | |
k, "%Y%m%d%H%M%S").replace(tzinfo=JST)] = v | |
return ret | |
def flatten(data: dict[dt.datetime, dict[str, JsonValue]]) -> Tuple[list[str], | |
list[list[Any]]]: | |
ret = [] | |
headers = [] | |
for timestamp, values in data.items(): | |
if not headers: | |
headers = list(values) | |
record: list[Any] = [timestamp] | |
for h in headers: | |
v = values.get(h) | |
if isinstance(v, list): | |
v = v[0] | |
elif isinstance(v, dict) and "hour" in v and "minute" in v: | |
v = "{hour:2d}:{minute:02d}".format(**v) | |
record.append(v) | |
ret.append(record) | |
return headers, ret | |
def main(): | |
if len(sys.argv) > 1: | |
location = sys.argv[1] | |
else: | |
location = "Tokyo" | |
code = search_observatory_number(location) | |
data = get_amedas_data(code) | |
headers, flatten_data = flatten(data) | |
print(tabulate(flatten_data, headers=headers)) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment