Created
November 8, 2023 14:29
-
-
Save NikosAlexandris/afcefadf0f4cc0846acb9337dcf7ad55 to your computer and use it in GitHub Desktop.
Create a Parquet store for SARAH3 products in form of NetCDF files using Kerchunk (after https://github.com/fsspec/kerchunk/pull/391)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pathlib import Path | |
from functools import partial | |
import typer | |
from typing import Optional | |
import xarray as xr | |
from rich import print | |
import fsspec | |
from fsspec.implementations.reference import LazyReferenceMapper | |
from kerchunk.hdf import SingleHdf5ToZarr | |
from kerchunk.combine import MultiZarrToZarr | |
import multiprocessing | |
import json | |
app = typer.Typer( | |
no_args_is_help=True, | |
add_completion=True, | |
add_help_option=True, | |
rich_markup_mode="rich", | |
help=f"Create parquet references", | |
) | |
def create_parquet_references( | |
input_file: Path, | |
output_parquet_store: Path, | |
record_size: int = 1000, | |
): | |
""" """ | |
output_parquet_store.mkdir(parents=True, exist_ok=True) | |
filesystem = fsspec.filesystem("file") | |
try: | |
output = LazyReferenceMapper.create( | |
root=str(output_parquet_store), # does not handle Path | |
fs=filesystem, | |
record_size=record_size, | |
) | |
single_zarr = SingleHdf5ToZarr(input_file, out=output) | |
single_zarr.translate() | |
output.flush() | |
except json.JSONDecodeError as e: | |
print(f"JSONDecodeError: {e}") | |
print(f"Failed processing file: {input_file}") | |
return | |
return output_parquet_store | |
def create_single_parquet_references( | |
input_file_path, | |
output_directory, | |
record_size, | |
): | |
""" """ | |
filename = input_file_path.stem | |
single_parquet_store = output_directory / f"{filename}.parquet" | |
create_parquet_references( | |
input_file_path, | |
output_parquet_store=single_parquet_store, | |
record_size=record_size, | |
) | |
def create_multi_parquet_references( | |
source_directory: Path, | |
output_parquet_store: Path, | |
output_directory: Path, | |
pattern: str = "*.nc", | |
record_size: int = 1000, | |
workers: int = 4, | |
): | |
""" """ | |
input_file_paths = list(source_directory.glob(pattern)) | |
if not input_file_paths: | |
print("No files found in [code]{source_directory}[/code] matching the pattern [code]{pattern}[/code]!" | |
) | |
return | |
output_directory.mkdir(parents=True, exist_ok=True) | |
with multiprocessing.Pool(processes=workers) as pool: | |
print(f'Creating Parquet stores in [code]{output_directory}[/code]') | |
partial_create_parquet_references = partial( | |
create_single_parquet_references, | |
output_directory=output_directory, | |
record_size=record_size, | |
) | |
pool.map(partial_create_parquet_references, input_file_paths) | |
print(f'Done creating single Parquet stores!') | |
output_parquet_store = output_parquet_store.parent / (output_parquet_store.name + '.parquet') | |
output_parquet_store.mkdir(parents=True, exist_ok=True) | |
filesystem = fsspec.filesystem("file") | |
try: | |
output = LazyReferenceMapper.create( | |
root=str(output_parquet_store), | |
fs=filesystem, | |
record_size=10 | |
) | |
reference_pattern = '*.parquet' | |
input_references = list(output_directory.glob(reference_pattern)) | |
input_references = list(map(str, input_references)) | |
multi_zarr = MultiZarrToZarr( | |
input_references, | |
remote_protocol="memory", | |
concat_dims=["time"], | |
out=output, | |
) | |
multi_zarr.translate() | |
output.flush() | |
except Exception as e: | |
print(f"Failed creating the [code]{output_parquet_store}[/code] : {e}!") | |
return | |
@app.command( | |
"reference", | |
no_args_is_help=True, | |
help=f"Create Parquet references to an HDF5/NetCDF file", | |
) | |
def reference( | |
input_file: Path, | |
output_directory: Optional[Path] = '.', | |
record_size: int = 1000, | |
dry_run: bool = False, | |
): | |
filename = input_file.stem | |
output_parquet_store = output_directory / f'{filename}.parquet' | |
"""Create Parquet references from an HDF5/NetCDF file""" | |
if dry_run: | |
print(f"[bold]Dry running operations that would be performed[/bold]:") | |
print( | |
f"> Creating Parquet references to [code]{input_file}[/code] in [code]{output_parquet_store}[/code]" | |
) | |
return # Exit for a dry run | |
create_parquet_references( | |
input_file=input_file, | |
output_parquet_store=output_parquet_store, | |
record_size=record_size, | |
) | |
@app.command( | |
"reference-multi", | |
no_args_is_help=True, | |
help=f"Create Parquet references to multiple HDF5/NetCDF files", | |
) | |
def reference_multi( | |
source_directory: Path, | |
output_parquet_store: Path, | |
output_directory: Optional[Path] = '.', | |
pattern: str = "*.nc", | |
record_size: int = 1000, | |
workers: int = 4, | |
dry_run: bool = False, | |
): | |
"""Create Parquet references from an HDF5/NetCDF file""" | |
input_file_paths = list(source_directory.glob(pattern)) | |
if not input_file_paths: | |
print("No files found in the source directory matching the pattern.") | |
return | |
if dry_run: | |
print( | |
f"[bold]Dry running operations that would be performed[/bold]:" | |
) | |
print( | |
f"> Reading files in [code]{source_directory}[/code] matching the pattern [code]{pattern}[/code]" | |
) | |
print(f"> Number of files matched : {len(input_file_paths)}") | |
print(f"> Creating Parquet stores in [code]{output_directory}[/code]") | |
return # Exit for a dry run | |
create_multi_parquet_references( | |
source_directory=source_directory, | |
output_parquet_store=output_parquet_store, | |
output_directory=output_directory, | |
pattern=pattern, | |
record_size=record_size, | |
workers=workers, | |
) | |
@app.command( | |
no_args_is_help=True, | |
help=f"Select data from a Parquet references store", | |
) | |
def select( | |
parquet_store: Path, | |
): | |
"""Select data from a Parquet store""" | |
dataset = xr.open_dataset( | |
str(parquet_store), # does not handle Path | |
engine="kerchunk", | |
storage_options=dict(skip_instance_cache=True, remote_protocol="file"), | |
# backend_kwargs={"consolidated": False}, | |
) | |
print(dataset) | |
if __name__ == "__main__": | |
app() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment