Created
November 6, 2023 20:18
-
-
Save NikosAlexandris/f03bb5e33ff727aba28f25a6040ca53a to your computer and use it in GitHub Desktop.
Create a Parquet store for SARAH3 products in form of NetCDF files using Kerchunk
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import typer | |
import hashlib | |
from pathlib import Path | |
from functools import partial | |
import multiprocessing | |
from fsspec.implementations.reference import LazyReferenceMapper | |
import kerchunk | |
import os | |
import fsspec | |
from kerchunk.hdf import SingleHdf5ToZarr | |
import ujson | |
from kerchunk.combine import MultiZarrToZarr | |
import xarray as xr | |
from rich import print | |
VERBOSE_LEVEL_DEFAULT = 0 | |
app = typer.Typer( | |
add_completion=True, | |
add_help_option=True, | |
rich_markup_mode="rich", | |
help=f'Create kerchunk reference', | |
) | |
def generate_file_md5(file_path): | |
if not file_path.exists(): | |
return None | |
with open(file_path, 'rb') as f: | |
file_content = f.read() | |
if not file_content: | |
return None | |
hash_value = hashlib.md5(file_content).hexdigest() | |
return hash_value | |
def create_single_reference(file_path, output_directory): | |
""" """ | |
filename = file_path.stem | |
output_file = f"{output_directory}/{filename}.json" | |
hash_file = f"{output_directory}/{filename}.json.hash" | |
generated_hash = generate_file_md5(file_path) | |
local_fs = fsspec.filesystem('file') | |
if local_fs.exists(output_file) and local_fs.exists(hash_file): | |
print(f'Found a reference file \'{output_file}\' and a hash \'{hash_file}\'') | |
with local_fs.open(hash_file, 'r') as hf: | |
existing_hash = hf.read().strip() | |
if existing_hash == generated_hash: | |
pass | |
else: | |
print(f'Creating reference file \'{output_file}\' with hash \'{generated_hash}\'') | |
file_url = f"file://{file_path}" | |
with fsspec.open(file_url, mode='rb') as input_file: | |
h5chunks = SingleHdf5ToZarr(input_file, file_url, inline_threshold=0) | |
json = ujson.dumps(h5chunks.translate()).encode() | |
with local_fs.open(output_file, 'wb') as f: | |
f.write(json) | |
with local_fs.open(hash_file, 'w') as hf: | |
hf.write(generated_hash) | |
def create_kerchunk_reference( | |
source_directory: Path, | |
output_directory: Path, | |
pattern: str = "*.nc", | |
workers: int = 4, | |
dry_run: bool = False, | |
): | |
"""Reference local NetCDF files using Kerchunk""" | |
file_paths = list(source_directory.glob(pattern)) | |
if not file_paths: | |
print("No files found in the source directory matching the pattern.") | |
return | |
if dry_run: | |
print( | |
f"[bold]Dry run[/bold] of [bold]operations that would be performed[/bold]:" | |
) | |
print( | |
f"> Reading files in [code]{source_directory}[/code] matching the pattern [code]{pattern}[/code]" | |
) | |
print(f"> Number of files matched: {len(file_paths)}") | |
print(f"> Creating single reference files to [code]{output_directory}[/code]") | |
return # Exit for a dry run | |
output_directory.mkdir(parents=True, exist_ok=True) | |
with multiprocessing.Pool(processes=workers) as pool: | |
print(f'Creating signle references to [code]{output_directory}[/code]') | |
partial_create_single_reference = partial( | |
create_single_reference, output_directory=output_directory | |
) | |
results = pool.map(partial_create_single_reference, file_paths) | |
def combine_kerchunk_references_to_parquet( | |
reference_directory: Path, | |
pattern: str = "*.json", | |
combined_reference: Path = "combined_kerchunk.parq", | |
dry_run: bool = False, | |
): | |
"""Combine multiple kerchunked datasets into a single logical parquet aggregate | |
dataset using fsspec.implementations.reference.ReferenceFileSystem""" | |
source_directory = Path(reference_directory) | |
reference_file_paths = list(reference_directory.glob(pattern)) | |
reference_file_paths = list(map(str, reference_file_paths)) | |
if dry_run: | |
print( | |
f"[bold]Dry run[/bold] of [bold]operations that would be performed[/bold]:" | |
) | |
print( | |
f"> Reading files in [code]{source_directory}[/code] matching the pattern [code]{pattern}[/code]" | |
) | |
print(f"> Number of files matched: {len(reference_file_paths)}") | |
print(f"> Writing combined reference file to [code]{combined_reference}[/code]") | |
return # Exit for a dry run | |
# Create LazyReferenceMapper to pass to MultiZarrToZarr | |
filesystem = fsspec.filesystem("file") | |
combined_reference.mkdir(parents=True, exist_ok=True) | |
output_lazy = LazyReferenceMapper.create(1000, str(combined_reference), filesystem) | |
# Combine single references | |
print(f'Combining signle references to [code]{combined_reference}[/code]') | |
mzz = MultiZarrToZarr( | |
reference_file_paths, | |
remote_protocol="file", | |
concat_dims=["time"], | |
identical_dims=["lat", "lon"], | |
out=output_lazy, | |
) | |
multifile_kerchunk = mzz.translate() | |
# Write all non-full reference batches | |
output_lazy.flush() | |
return multifile_kerchunk | |
@app.callback() | |
def main( | |
source_directory: Path, | |
reference_directory: Path, | |
source_pattern: str = '*.nc', | |
reference_pattern: str = '*.json', | |
workers: int = 4, | |
dry_run: bool = False, | |
combined_reference: Path = "combined_kerchunk.parq", | |
): | |
""" """ | |
create_kerchunk_reference( | |
source_directory=source_directory, | |
pattern=source_pattern, | |
output_directory=reference_directory, | |
workers=workers, | |
dry_run=dry_run, | |
) | |
multifile_kerchunk = combine_kerchunk_references_to_parquet( | |
reference_directory=reference_directory, # directory with single references | |
pattern=reference_pattern, | |
combined_reference=combined_reference, | |
dry_run=dry_run, | |
) | |
kerchunk.df.refs_to_dataframe(multifile_kerchunk, str(combined_reference)) | |
filesystem = fsspec.implementations.reference.ReferenceFileSystem( | |
fo=str(combined_reference), | |
remote_protocol="file", | |
target_protocol="file", | |
lazy=True, | |
) | |
print(f'Reading from the Parquet storage...') | |
ds = xr.open_dataset( | |
filesystem.get_mapper(), | |
engine="zarr", | |
backend_kwargs={"consolidated": False}, | |
) | |
print(ds) | |
if __name__ == "__main__": | |
typer.run(main) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import typer | |
import hashlib | |
from pathlib import Path | |
from functools import partial | |
import multiprocessing | |
from fsspec.implementations.reference import LazyReferenceMapper | |
import kerchunk | |
import os | |
import fsspec | |
from kerchunk.hdf import SingleHdf5ToZarr | |
import ujson | |
from kerchunk.combine import MultiZarrToZarr | |
import xarray as xr | |
from rich import print | |
VERBOSE_LEVEL_DEFAULT = 0 | |
app = typer.Typer( | |
add_completion=True, | |
add_help_option=True, | |
rich_markup_mode="rich", | |
help=f'Create kerchunk reference', | |
) | |
def generate_file_md5(file_path): | |
if not file_path.exists(): | |
return None | |
with open(file_path, 'rb') as f: | |
file_content = f.read() | |
if not file_content: | |
return None | |
hash_value = hashlib.md5(file_content).hexdigest() | |
return hash_value | |
def create_single_reference(file_path, output_directory): | |
""" """ | |
filename = file_path.stem | |
output_file = f"{output_directory}/{filename}.json" | |
hash_file = f"{output_directory}/{filename}.json.hash" | |
generated_hash = generate_file_md5(file_path) | |
local_fs = fsspec.filesystem('file') | |
if local_fs.exists(output_file) and local_fs.exists(hash_file): | |
print(f'Found a reference file \'{output_file}\' and a hash \'{hash_file}\'') | |
with local_fs.open(hash_file, 'r') as hf: | |
existing_hash = hf.read().strip() | |
if existing_hash == generated_hash: | |
pass | |
else: | |
print(f'Creating reference file \'{output_file}\' with hash \'{generated_hash}\'') | |
file_url = f"file://{file_path}" | |
with fsspec.open(file_url, mode='rb') as input_file: | |
h5chunks = SingleHdf5ToZarr(input_file, file_url, inline_threshold=0) | |
json = ujson.dumps(h5chunks.translate()).encode() | |
with local_fs.open(output_file, 'wb') as f: | |
f.write(json) | |
with local_fs.open(hash_file, 'w') as hf: | |
hf.write(generated_hash) | |
def create_kerchunk_reference( | |
source_directory: Path, | |
output_directory: Path, | |
pattern: str = "*.nc", | |
workers: int = 4, | |
dry_run: bool = False, | |
): | |
"""Reference local NetCDF files using Kerchunk""" | |
file_paths = list(source_directory.glob(pattern)) | |
if not file_paths: | |
print("No files found in the source directory matching the pattern.") | |
return | |
if dry_run: | |
print( | |
f"[bold]Dry run[/bold] of [bold]operations that would be performed[/bold]:" | |
) | |
print( | |
f"> Reading files in [code]{source_directory}[/code] matching the pattern [code]{pattern}[/code]" | |
) | |
print(f"> Number of files matched: {len(file_paths)}") | |
print(f"> Creating single reference files to [code]{output_directory}[/code]") | |
return # Exit for a dry run | |
output_directory.mkdir(parents=True, exist_ok=True) | |
with multiprocessing.Pool(processes=workers) as pool: | |
print(f'Creating signle references to [code]{output_directory}[/code]') | |
partial_create_single_reference = partial( | |
create_single_reference, output_directory=output_directory | |
) | |
results = pool.map(partial_create_single_reference, file_paths) | |
def combine_kerchunk_references_to_parquet( | |
reference_directory: Path, | |
pattern: str = "*.json", | |
combined_reference: Path = "combined_kerchunk.parq", | |
cache_size: int = 1000, | |
dry_run: bool = False, | |
): | |
"""Combine multiple kerchunked datasets into a single logical parquet aggregate | |
dataset using fsspec.implementations.reference.ReferenceFileSystem""" | |
source_directory = Path(reference_directory) | |
reference_file_paths = list(reference_directory.glob(pattern)) | |
reference_file_paths = list(map(str, reference_file_paths)) | |
if dry_run: | |
print( | |
f"[bold]Dry run[/bold] of [bold]operations that would be performed[/bold]:" | |
) | |
print( | |
f"> Reading files in [code]{source_directory}[/code] matching the pattern [code]{pattern}[/code]" | |
) | |
print(f"> Number of files matched: {len(reference_file_paths)}") | |
print(f"> Writing combined reference file to [code]{combined_reference}[/code]") | |
return # Exit for a dry run | |
# Create LazyReferenceMapper to pass to MultiZarrToZarr | |
filesystem = fsspec.filesystem("file") | |
combined_reference.mkdir(parents=True, exist_ok=True) | |
output_lazy = LazyReferenceMapper( | |
root=str(combined_reference), | |
fs=filesystem, | |
cache_size=cache_size, | |
) | |
# Combine single references | |
print(f'Combining signle references to [code]{combined_reference}[/code]') | |
mzz = MultiZarrToZarr( | |
reference_file_paths, | |
remote_protocol="file", | |
concat_dims=["time"], | |
identical_dims=["lat", "lon"], | |
out=output_lazy, | |
) | |
multifile_kerchunk = mzz.translate() | |
# Write all non-full reference batches | |
output_lazy.flush() | |
return multifile_kerchunk | |
@app.callback() | |
def main( | |
source_directory: Path, | |
reference_directory: Path, | |
source_pattern: str = '*.nc', | |
reference_pattern: str = '*.json', | |
workers: int = 4, | |
combined_reference: Path = "combined_kerchunk.parq", | |
cache_size: int = 1000, | |
dry_run: bool = False, | |
): | |
""" """ | |
create_kerchunk_reference( | |
source_directory=source_directory, | |
pattern=source_pattern, | |
output_directory=reference_directory, | |
workers=workers, | |
dry_run=dry_run, | |
) | |
multifile_kerchunk = combine_kerchunk_references_to_parquet( | |
reference_directory=reference_directory, # directory with single references | |
pattern=reference_pattern, | |
combined_reference=combined_reference, | |
cache_size=cache_size, | |
dry_run=dry_run, | |
) | |
kerchunk.df.refs_to_dataframe(multifile_kerchunk, str(combined_reference)) | |
filesystem = fsspec.implementations.reference.ReferenceFileSystem( | |
fo=str(combined_reference), | |
remote_protocol="file", | |
target_protocol="file", | |
lazy=True, | |
) | |
print(f'Reading from the Parquet storage...') | |
ds = xr.open_dataset( | |
filesystem.get_mapper(), | |
engine="zarr", | |
backend_kwargs={"consolidated": False}, | |
) | |
print(ds) | |
if __name__ == "__main__": | |
typer.run(main) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment