Last active
September 23, 2024 20:43
-
-
Save ajelenak/9f2a634c47426f35cdb897d6a64052ef to your computer and use it in GitHub Desktop.
Additional HDF5 dataset chunk statistics
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import json | |
import operator | |
from collections import defaultdict | |
from dataclasses import dataclass | |
from functools import partial, reduce | |
import os | |
from typing import Any, Union | |
from configparser import ConfigParser | |
from pathlib import Path | |
import h5py | |
import numpy as np | |
from tabulate import tabulate | |
if h5py.h5.get_libversion() < (1, 14, 3): | |
raise RuntimeError("Requires HDF5 library 1.14.3 or later") | |
elif not h5py.h5.get_config().ros3: | |
raise RuntimeError("HDF5 library must be built with ROS3 virtual file driver") | |
# ---------------------------------------------------------------------------- # | |
MiB = 1024 * 1024 | |
def get_cli_args(): | |
"""Parse command-line arguments.""" | |
parser = argparse.ArgumentParser( | |
description="Provide contiguous and chunked dataset statistics that h5stat does not do.", | |
epilog="Developed by The HDF Group. This work was supported by NASA/GSFC under Raytheon Company contract 80GSFC21CA001.", | |
formatter_class=argparse.ArgumentDefaultsHelpFormatter, | |
) | |
parser.add_argument("h5file", help="Input HDF5 file name.", type=str) | |
parser.add_argument("--show", help="Print individual dataset stats", action="store_true") | |
parser.add_argument( | |
"--json", help="Format individual dataset stats in JSON", action="store_true" | |
) | |
return parser.parse_args() | |
def get_s3_params(need_region: bool = False) -> dict[str, bytes]: | |
"""Collect AWS-like S3 connection parameters.""" | |
s3p = dict() | |
# Read AWS credentials and config files... | |
home = Path.home() | |
creds = ConfigParser() | |
creds.read(os.getenv("AWS_SHARED_CREDENTIALS_FILE", home.joinpath(".aws", "credentials"))) | |
config = ConfigParser() | |
config.read(os.getenv("AWS_CONFIG_FILE", home.joinpath(".aws", "config"))) | |
profile = os.getenv("AWS_PROFILE", "default") | |
s3p["secret_id"] = os.getenv( | |
"AWS_ACCESS_KEY_ID", creds.get(profile, "aws_access_key_id", fallback="") | |
).encode("ascii") | |
s3p["secret_key"] = os.getenv( | |
"AWS_SECRET_ACCESS_KEY", | |
creds.get(profile, "aws_secret_access_key", fallback=""), | |
).encode("ascii") | |
s3p["session_token"] = os.getenv( | |
"AWS_SESSION_TOKEN", | |
creds.get(profile, "aws_session_token", fallback=""), | |
).encode("ascii") | |
if need_region: | |
s3p["aws_region"] = os.getenv("AWS_REGION", config.get(profile, "region")).encode("ascii") | |
return s3p | |
@dataclass(slots=True, frozen=True) | |
class ContStats: | |
"""Stats for one contiguous HDF5 dataset.""" | |
name: str | |
stor_size: int | |
page_bins: dict | |
out_of_page: bool | |
def to_dict(self) -> dict[str, Any]: | |
d = { | |
"dataset": self.name, | |
"stored_size": self.stor_size, | |
} | |
if len(self.page_bins) or self.out_of_page: | |
d.update( | |
{ | |
"file_pages": self.page_bins, | |
"out_of_page": self.out_of_page, | |
} | |
) | |
return d | |
def to_print(self) -> str: | |
if len(self.page_bins): | |
return ( | |
f"dataset={_.name} stored_size={_.stor_size}" | |
f" file_pages={len(_.page_bins)} out_of_page={_.out_of_page}" | |
) | |
else: | |
return f"dataset={_.name} stored_size={_.stor_size}" | |
@dataclass(slots=True, frozen=True) | |
class ChunkStats: | |
"""Stats for one chunked HDF5 dataset.""" | |
name: str | |
num_stored: int | |
size: int | |
stor_size: int | |
min_size: int | |
max_size: int | |
extent_ratio: float | |
page_bins: dict | |
page_spread_anomaly: int | |
def __post_init__(self): | |
if self.extent_ratio > 1: | |
raise ValueError(f"Chunk shape ratio greater than 1 for {self.name}") | |
if self.page_spread_anomaly < 0: | |
raise ValueError(f"Chunks file page spread anomaly negative for {self.name}") | |
def to_dict(self) -> dict[str, Any]: | |
d = { | |
"dataset": self.name, | |
"chunks_stored": self.num_stored, | |
"chunk_size": self.size, | |
"stored_size": self.stor_size, | |
"min_stored_chunk_size": self.min_size, | |
"max_stored_chunk_size": self.max_size, | |
"chunk_shape_ratio": self.extent_ratio, | |
} | |
if len(self.page_bins): | |
d.update( | |
{ | |
"file_pages": self.page_bins, | |
"page_spread_anomaly": self.page_spread_anomaly, | |
} | |
) | |
return d | |
def to_print(self) -> str: | |
if len(self.page_bins): | |
return ( | |
f"dataset={_.name} stored_size={_.stor_size} chunks_stored={_.num_stored}" | |
f" chunk_size={_.size} min_stored_chunk_size={_.min_size} max_stored_chunk_size={_.max_size}" | |
f" chunk_shape_ratio={_.extent_ratio:.6g} file_pages={len(_.page_bins)}" | |
f" page_spread_anomaly={_.page_spread_anomaly}" | |
) | |
else: | |
return ( | |
f"dataset={_.name} stored_size={_.stor_size} chunks_stored={_.num_stored}" | |
f" chunk_size={_.size} min_stored_chunk_size={_.min_size} max_stored_chunk_size={_.max_size}" | |
f" chunk_shape_ratio={_.extent_ratio:.6g}" | |
) | |
def chunk_to_shape_ratio(chunk: tuple, shape: tuple) -> float: | |
"""Ratio of chunk to dataset shape extent.""" | |
ratio = 1 | |
for c, s in zip(chunk, shape): | |
try: | |
ratio *= min(1, c / s) | |
except ZeroDivisionError: | |
# Deal with 1D datasets without data... | |
continue | |
return ratio | |
def chunk_info(dset: h5py.Dataset, page_size: int) -> tuple[dict[int, int], int, int]: | |
"""Determine file page and the smallest and largest chunk size of a chunked | |
dataset. | |
Only for files with "PAGE" file space strategy. | |
""" | |
stinfo = defaultdict(int) | |
chunk_sizes = list() | |
def chunk_info(chunk_stor) -> None: | |
chunk_sizes.append(chunk_stor.size) | |
if chunk_stor.size <= page_size: | |
start_page = np.floor(chunk_stor.byte_offset / page_size).astype(int).item() + 1 | |
end_page = ( | |
np.floor((chunk_stor.byte_offset + chunk_stor.size - 1) / page_size) | |
.astype(int) | |
.item() | |
+ 1 | |
) | |
if start_page != end_page: | |
raise ValueError(f"Chunk crosses file page boundary: {chunk_stor}") | |
stinfo[start_page] += 1 | |
dset.id.chunk_iter(chunk_info) | |
return stinfo, min(chunk_sizes), max(chunk_sizes) | |
def cont2page(dset: h5py.Dataset, page_size: int) -> dict[int, int]: | |
"""Determine file page of a contiguous dataset.""" | |
stinfo = defaultdict(int) | |
offs = dset.id.get_offset() | |
size = dset.id.get_storage_size() | |
if offs is not None and size <= page_size: | |
start_page = np.floor(offs / page_size).astype(int).item() + 1 | |
end_page = np.floor((offs + size - 1) / page_size).astype(int).item() + 1 | |
if start_page != end_page: | |
raise ValueError(f"Contiguous dataset crosses file page boundary: {dset.name}") | |
stinfo[start_page] += 1 | |
return stinfo | |
def dset_stats( | |
name: str, | |
h5obj: Union[h5py.Group, h5py.Dataset], | |
dset_list: list[Union[ChunkStats, ContStats]], | |
page_size: int = 0, | |
) -> None: | |
if isinstance(h5obj, h5py.Dataset): | |
chunk_shape = h5obj.chunks | |
if chunk_shape: | |
chunk_nelem = reduce(operator.mul, chunk_shape, 1) | |
chunk_pages, min_size, max_size = chunk_info(h5obj, page_size) | |
if page_size: | |
num_chunks = reduce(operator.add, chunk_pages.values(), 0) | |
stored_size = h5obj.id.get_storage_size() | |
if max_size > page_size: | |
page_spread = 0 | |
else: | |
page_spread = ( | |
len(chunk_pages) - np.ceil(stored_size / page_size).astype(int).item() | |
) | |
else: | |
num_chunks = h5obj.id.get_num_chunks() | |
stored_size = h5obj.id.get_storage_size() | |
page_spread = 0 | |
dset_list.append( | |
ChunkStats( | |
name=h5obj.name, | |
num_stored=num_chunks, | |
extent_ratio=chunk_to_shape_ratio(chunk_shape, h5obj.shape), | |
stor_size=stored_size, | |
min_size=min_size, | |
max_size=max_size, | |
size=h5obj.id.get_type().get_size() * chunk_nelem, | |
page_bins=chunk_pages, | |
page_spread_anomaly=page_spread, | |
) | |
) | |
else: | |
if h5obj.id.get_create_plist().get_layout() == h5py.h5d.COMPACT: | |
# Compact datasets are not included due to their specific storage... | |
return | |
stored_size = h5obj.id.get_storage_size() | |
dset_list.append( | |
ContStats( | |
name=h5obj.name, | |
stor_size=stored_size, | |
page_bins=cont2page(h5obj, page_size), | |
out_of_page=True if page_size and stored_size > page_size else False, | |
) | |
) | |
def stats_table( | |
bin_hdr: str, | |
bins: list, | |
bin_fmt: Union[str, list[str]], | |
stats_hdr: str, | |
data: np.ndarray, | |
) -> str: | |
"""Prepare and print a table with data.""" | |
# Calculate the histograms... | |
hist, bins_ = np.histogram(data, bins=bins) | |
bin_prcnt = 100 * hist / np.sum(hist) | |
bin_cumsum_prcnt = 100 * np.cumsum(hist) / np.sum(hist) | |
# Headers... | |
prcnt_hdr = "% of\ntotal datasets" | |
cumcum_prcnt_hdr = "cusum % of\ntotal datasets" | |
tablefmt = "grid" | |
if isinstance(bin_fmt, list): | |
return tabulate( | |
{ | |
bin_hdr: bin_fmt, | |
stats_hdr: hist, | |
prcnt_hdr: np.round(bin_prcnt, decimals=2), | |
cumcum_prcnt_hdr: np.round(bin_cumsum_prcnt, decimals=2), | |
}, | |
headers="keys", | |
tablefmt=tablefmt, | |
) | |
else: | |
return tabulate( | |
{ | |
bin_hdr: [ | |
f"{bins_[i]:{bin_fmt}} ≤ # < {bins[i+1]:{bin_fmt}}" | |
for i in range(len(bins_) - 1) | |
], | |
stats_hdr: hist, | |
prcnt_hdr: np.round(bin_prcnt, decimals=2), | |
cumcum_prcnt_hdr: np.round(bin_cumsum_prcnt, decimals=2), | |
}, | |
headers="keys", | |
tablefmt=tablefmt, | |
) | |
# ---------------------------------------------------------------------------- # | |
cli = get_cli_args() | |
if cli.h5file.startswith(("https://", "s3://")): | |
driver = "ros3" | |
page_buf_size = 64 * MiB | |
s3params = get_s3_params(need_region=cli.h5file.startswith("s3://")) | |
else: | |
driver = None | |
page_buf_size = 0 | |
s3params = dict() | |
dset_info: list[Union[ChunkStats, ContStats]] = list() | |
with h5py.File(cli.h5file, mode="r", driver=driver, **s3params) as f: | |
fcpl = f.id.get_create_plist() | |
page = fcpl.get_file_space_strategy()[0] == h5py.h5f.FSPACE_STRATEGY_PAGE | |
if page: | |
page_size = fcpl.get_file_space_page_size() | |
else: | |
f.visititems(partial(dset_stats, dset_list=dset_info, page_size=0)) | |
if page and page_size: | |
with h5py.File( | |
cli.h5file, mode="r", driver=driver, page_buf_size=page_buf_size, **s3params | |
) as f: | |
f.visititems(partial(dset_stats, dset_list=dset_info, page_size=page_size)) | |
if cli.show: | |
if cli.json: | |
print(json.dumps([_.to_dict() for _ in sorted(dset_info, key=lambda d: d.name)])) | |
else: | |
for _ in sorted(dset_info, key=lambda d: d.name): | |
print(_.to_print()) | |
raise SystemExit() | |
# Split dataset info into chunked and contiguous... | |
cont_info: list[ContStats] = list() | |
chunked_info: list[ChunkStats] = list() | |
for _ in dset_info: | |
if isinstance(_, ChunkStats): | |
chunked_info.append(_) | |
else: | |
cont_info.append(_) | |
del dset_info | |
print(f"\nDataset statistics for {cli.h5file}") | |
print("Compact datasets in the file, if they exist, are excluded.") | |
print(f"Contiguous datasets in the file: {len(cont_info)}") | |
print(f"Chunked datasets in the file: {len(chunked_info)}") | |
if page: | |
print(f'"PAGE" file space strategy with page size of {page_size:,} bytes.') | |
print("\n") | |
if cont_info: | |
print( | |
stats_table( | |
"Contiguous dataset size\nin bytes", | |
[0, 1_000_000, 4_000_000, 8_000_000, 16_000_000, np.inf], | |
".1e", | |
"# contiguous\ndatasets", | |
[_.stor_size for _ in cont_info], | |
), | |
end="\n\n\n", | |
) | |
if page: | |
print( | |
stats_table( | |
"Contiguous dataset", | |
[0, 1, 2], | |
["In a file page", "Out of file pages"], | |
"# contiguous\ndatasets", | |
[int(_.out_of_page) for _ in cont_info], | |
), | |
end="\n\n\n", | |
) | |
if chunked_info: | |
print( | |
stats_table( | |
"Chunked dataset total\nstored size in bytes", | |
[0, 1_000_000, 4_000_000, 8_000_000, 16_000_000, 32_000_000, 64_000_000, np.inf], | |
".1e", | |
"# chunked\ndatasets", | |
[_.stor_size for _ in chunked_info], | |
), | |
end="\n\n\n", | |
) | |
print( | |
stats_table( | |
"Chunk size in bytes", | |
[0, 10, 1000, 10000, 100_000, 1_000_000, 8_000_000, 16_000_000, np.inf], | |
".1e", | |
"# chunked\ndatasets", | |
[_.size for _ in chunked_info], | |
), | |
end="\n\n\n", | |
) | |
print( | |
stats_table( | |
"Chunk to dataset\nshape ratio", | |
[ | |
0, | |
0.001, | |
0.002, | |
0.003, | |
0.004, | |
0.005, | |
0.01, | |
0.02, | |
0.03, | |
0.04, | |
0.05, | |
0.1, | |
0.25, | |
1, | |
], | |
".3f", | |
"# chunked\ndatasets", | |
[_.extent_ratio for _ in chunked_info], | |
), | |
end="\n\n\n", | |
) | |
print( | |
stats_table( | |
"Chunks stored", | |
[0, 1, 2, 10, 100, 1000, 10000, 100_000, np.inf], | |
[ | |
"No chunks", | |
"1 chunk", | |
"2-9 chunks", | |
"10-99 chunks", | |
"100-999 chunks", | |
"1000-9999 chunks", | |
"10,000-99,999 chunks", | |
"100,000 or more chunks", | |
], | |
"# chunked\ndatasets", | |
[_.num_stored for _ in chunked_info], | |
), | |
end="\n\n\n", | |
) | |
print( | |
stats_table( | |
"Chunk cache size", | |
[0, 1 * MiB, 4 * MiB, 8 * MiB, 16 * MiB, np.inf], | |
["1 MiB", "4 MiB", "8 MiB", "16 MiB", "> 16 MiB"], | |
"# chunked\ndatasets", | |
[_.size * _.num_stored for _ in chunked_info], | |
), | |
end="\n\n\n" if page else "\n", | |
) | |
if page: | |
print( | |
stats_table( | |
"Chunk size vs file page size", | |
[0, 1, 2], | |
["All chunks in file pages", "Some chunks out of file pages"], | |
"# chunked\ndatasets", | |
[1 if _.max_size > page_size else 0 for _ in chunked_info], | |
), | |
end="\n\n\n", | |
) | |
# Remove all chunked datasets with chunks bigger than one file page | |
# because they are going to mess up the following stats... | |
cleaned_chunked_info = [_ for _ in chunked_info if _.max_size <= page_size] | |
if len(cleaned_chunked_info) < len(chunked_info): | |
print( | |
f"*** Removed {len(chunked_info) - len(cleaned_chunked_info)} chunked datasets " | |
"with chunks stored outside of file pages because ***\n*** they are not applicable " | |
"to following stats. ***", | |
end="\n\n\n", | |
) | |
chunked_info = cleaned_chunked_info | |
del cleaned_chunked_info | |
print( | |
stats_table( | |
"# of file pages\nholding all chunks", | |
[1, 2, 3, 4, 5, 6, 10, 15, 20, 25, 30, np.inf], | |
[ | |
"1 page", | |
"2 pages", | |
"3 pages", | |
"4 pages", | |
"5 pages", | |
"6 - 9 pages", | |
"10 - 14 pages", | |
"15 - 19 pages", | |
"20 - 24 pages", | |
"25 - 29 pages", | |
"30 or more pages", | |
], | |
"# chunked\ndatasets", | |
[len(_.page_bins) for _ in chunked_info], | |
), | |
end="\n\n\n", | |
) | |
print( | |
stats_table( | |
"# file pages anomaly", | |
[0, 1, 2, 3, 4, 5, np.inf], | |
[ | |
"No extra file pages", | |
"1 extra file page", | |
"2 extra file pages", | |
"3 extra file pages", | |
"4 extra file pages", | |
"5 or more extra file pages", | |
], | |
"# chunked\ndatasets", | |
[_.page_spread_anomaly for _ in chunked_info], | |
), | |
end="\n\n\n", | |
) | |
print( | |
stats_table( | |
"Max % of chunks\nin one file page", | |
[0, 20, 40, 60, 80, 100], | |
".0f", | |
"# chunked\ndatasets", | |
[ | |
max(map(lambda x: 100 * x / _.num_stored, _.page_bins.values())) | |
for _ in chunked_info | |
], | |
), | |
) |
Fix JSON output to be compliant.
Changes in version 13b49856:
- Switch to numpy for all histogram calculations.
- Use tabulate package to pretty-print output.
- Added a statistics about chunk cache size to fit all chunks of one dataset.
- Minimum required libhdf5 version is 1.14.3.
Changes in version 2c0e9427:
- Support for files in S3-compatible cloud stores. Both https:// and s3:// style object links can be used.
- libhdf5 with ROS3 virtual file driver required.
Only a few minor tweaks in version 835d936f.
Changes in 835d936f:
- New name:
h5stat-extra.py
- Contiguous datasets are included.
- Two new stats for paged files: How many contiguous datasets or chunked datasets' chunks are stored outside of file pages (too large for one file page).
- Compact datasets are skipped due to their specific storage that does not influence the reported stats.
- Few changes to bin ranges to produce more relevant information.
- Support for AWS env. variables for configuration and credentials files.
- Chunked datasets with chunks outside of file pages are removed prior to some paged file related statistics.
- Code cleanup and optimization.
- Added stats for total stored size of chunked datasets.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Added JSON format output and a few bug fixes.