Last active
February 5, 2025 10:26
-
-
Save maxiimilian/f6d0d5081a310cc102266f8089d86105 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Download data from KNMI API. | |
Copyright (C) 2024 Maximilian Pierzyna, except when indicated otherwise | |
in the docstring of the functions. | |
This program is free software: you can redistribute it and/or modify | |
it under the terms of the GNU General Public License as published by | |
the Free Software Foundation, either version 3 of the License, or | |
(at your option) any later version. | |
This program is distributed in the hope that it will be useful, | |
but WITHOUT ANY WARRANTY; without even the implied warranty of | |
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
GNU General Public License for more details. | |
You should have received a copy of the GNU General Public License | |
along with this program. If not, see <https://www.gnu.org/licenses/>. | |
""" | |
from __future__ import annotations | |
import concurrent.futures | |
import pathlib | |
import os | |
import requests | |
import warnings | |
# Set API key | |
API_KEY = os.environ.get("KNMI_API_KEY", "") | |
# Define available datasets (name, version) | |
DS_TOWER_UNVAL = ("cesar_tower_meteo_la1_t10", "v1.2") | |
DS_TOWER_VAL = ("cesar_tower_meteo_lc1_t10", "v1.0") | |
DS_SURFACE_UNVAL = ("cesar_surface_meteo_la1_t10", "v1.0") | |
DS_FLUX_UNVAL = ("cesar_surface_flux_la1_t10", "v1.0") | |
DS_CLOUD = ("cesar_nubiscope_cldcov_la1_t10", "v1.0") | |
DS_RAD = ("cesar_surface_radiation_la1_t10", "v1.0") | |
class OpenDataAPI: | |
"""Wrapper to access KNMI API | |
From KNMI tutorial: https://developer.dataplatform.knmi.nl/open-data-api#example-last | |
""" | |
def __init__(self, api_token: str): | |
self.base_url = "https://api.dataplatform.knmi.nl/open-data/v1" | |
self.headers = {"Authorization": api_token} | |
def __get_data(self, url, params=None): | |
return requests.get(url, headers=self.headers, params=params).json() | |
def list_files( | |
self, dataset_name: str, dataset_version: str, begin: str, end: str, **params | |
): | |
# Translate begin and end dates into filenames for API filtering | |
params = { | |
**params, | |
"begin": f"{dataset_name}_{dataset_version}_{begin}.nc", | |
"end": f"{dataset_name}_{dataset_version}_{end}.nc", | |
} | |
# Query API and get file list | |
res = self.__get_data( | |
f"{self.base_url}/datasets/{dataset_name}/versions/{dataset_version}/files", | |
params=params, | |
) | |
try: | |
files = res["files"] | |
except KeyError: | |
print(res) | |
raise | |
# If result is trunacted, submit again with nextPageToken, to retreive all data | |
if res["isTruncated"]: | |
files += self.list_files( | |
dataset_name, | |
dataset_version, | |
begin, | |
end, | |
nextPageToken=res["nextPageToken"], | |
) | |
# Return full file list | |
return files | |
def get_file_url(self, dataset_name: str, dataset_version: str, filename: str): | |
return self.__get_data( | |
f"{self.base_url}/datasets/{dataset_name}/versions/{dataset_version}/files/{filename}/url" | |
) | |
def download_file(url: str, local_dst: pathlib.Path, chunk_size: int = 16384) -> None: | |
"""Streaming download for large files | |
Source: https://stackoverflow.com/questions/16694907/download-large-file-in-python-with-requests. | |
""" | |
with requests.get(url, stream=True) as r: | |
r.raise_for_status() | |
with local_dst.open("wb") as f: | |
for chunk in r.iter_content(chunk_size=chunk_size): | |
f.write(chunk) | |
def download_dataset_subset( | |
api: OpenDataAPI, | |
dataset_name: str, | |
dataset_version: str, | |
begin: str, | |
end: str, | |
output_path: pathlib.Path | str, | |
): | |
# Make sure that output_path is a pathlib.Path object | |
output_path = pathlib.Path(output_path) | |
# Typically, we don't want dashes in `begin` or `end` for filtering to work | |
if "-" in begin or "-" in end: | |
warnings.warn("Begin and end dates should not contain dashes.") | |
# Retrieve file list | |
print("Retrieving file list...") | |
files = api.list_files(dataset_name, dataset_version, begin, end) | |
filenames = [f["filename"] for f in files] | |
print(f"-> {len(filenames)} files.") | |
# Check if files are already downloaded | |
filenames = [f for f in filenames if not (output_path / f).exists()] | |
if len(filenames) == 0: | |
print("No new files to download. ") | |
print("-> Done.") | |
return | |
# Get download urls | |
print("Retrieving download urls for each file...") | |
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor: | |
futures = [ | |
executor.submit( | |
api.get_file_url, | |
dataset_name=dataset_name, | |
dataset_version=dataset_version, | |
filename=f, | |
) | |
for f in filenames | |
] | |
file_urls = [f.result()["temporaryDownloadUrl"] for f in futures] | |
print("-> Done") | |
# Download files | |
print("Downloading files...") | |
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: | |
for f, u in zip(filenames, file_urls): | |
print(f"Downloading {f}...") | |
try: | |
executor.submit(download_file, url=u, local_dst=output_path / f) | |
except Exception as e: | |
print(f"Error downloading {f}: {e}") | |
print(f"-> Done. Check {output_path}.") | |
if __name__ == "__main__": | |
# Define datasets to download (based on available datasets defined above) | |
datasets = [DS_TOWER_UNVAL, DS_SURFACE_UNVAL, DS_FLUX_UNVAL, DS_RAD] | |
# Define periods to download for each dataset. | |
periods_datasets = [ | |
("20211221", "20211224", datasets), | |
("20220418", "20220423", datasets), | |
("20220726", "20220729", datasets), | |
("20221007", "20221010", datasets), | |
] | |
# Make output directory | |
output = pathlib.Path("knmi_data") | |
output.mkdir(exist_ok=True) | |
# Query API and download files | |
api = OpenDataAPI(API_KEY) | |
for begin, end, datasets in periods_datasets: | |
for ds in datasets: | |
download_dataset_subset(api, *ds, begin=begin, end=end, output_path=output) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment