Skip to content

Instantly share code, notes, and snippets.

@ajelenak
Last active September 23, 2024 20:43
Show Gist options
  • Save ajelenak/9f2a634c47426f35cdb897d6a64052ef to your computer and use it in GitHub Desktop.
Save ajelenak/9f2a634c47426f35cdb897d6a64052ef to your computer and use it in GitHub Desktop.
Additional HDF5 dataset chunk statistics
import argparse
import json
import operator
from collections import defaultdict
from dataclasses import dataclass
from functools import partial, reduce
import os
from typing import Any, Union
from configparser import ConfigParser
from pathlib import Path
import h5py
import numpy as np
from tabulate import tabulate
if h5py.h5.get_libversion() < (1, 14, 3):
raise RuntimeError("Requires HDF5 library 1.14.3 or later")
elif not h5py.h5.get_config().ros3:
raise RuntimeError("HDF5 library must be built with ROS3 virtual file driver")
# ---------------------------------------------------------------------------- #
MiB = 1024 * 1024
def get_cli_args():
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(
description="Provide contiguous and chunked dataset statistics that h5stat does not do.",
epilog="Developed by The HDF Group. This work was supported by NASA/GSFC under Raytheon Company contract 80GSFC21CA001.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument("h5file", help="Input HDF5 file name.", type=str)
parser.add_argument("--show", help="Print individual dataset stats", action="store_true")
parser.add_argument(
"--json", help="Format individual dataset stats in JSON", action="store_true"
)
return parser.parse_args()
def get_s3_params(need_region: bool = False) -> dict[str, bytes]:
"""Collect AWS-like S3 connection parameters."""
s3p = dict()
# Read AWS credentials and config files...
home = Path.home()
creds = ConfigParser()
creds.read(os.getenv("AWS_SHARED_CREDENTIALS_FILE", home.joinpath(".aws", "credentials")))
config = ConfigParser()
config.read(os.getenv("AWS_CONFIG_FILE", home.joinpath(".aws", "config")))
profile = os.getenv("AWS_PROFILE", "default")
s3p["secret_id"] = os.getenv(
"AWS_ACCESS_KEY_ID", creds.get(profile, "aws_access_key_id", fallback="")
).encode("ascii")
s3p["secret_key"] = os.getenv(
"AWS_SECRET_ACCESS_KEY",
creds.get(profile, "aws_secret_access_key", fallback=""),
).encode("ascii")
s3p["session_token"] = os.getenv(
"AWS_SESSION_TOKEN",
creds.get(profile, "aws_session_token", fallback=""),
).encode("ascii")
if need_region:
s3p["aws_region"] = os.getenv("AWS_REGION", config.get(profile, "region")).encode("ascii")
return s3p
@dataclass(slots=True, frozen=True)
class ContStats:
"""Stats for one contiguous HDF5 dataset."""
name: str
stor_size: int
page_bins: dict
out_of_page: bool
def to_dict(self) -> dict[str, Any]:
d = {
"dataset": self.name,
"stored_size": self.stor_size,
}
if len(self.page_bins) or self.out_of_page:
d.update(
{
"file_pages": self.page_bins,
"out_of_page": self.out_of_page,
}
)
return d
def to_print(self) -> str:
if len(self.page_bins):
return (
f"dataset={_.name} stored_size={_.stor_size}"
f" file_pages={len(_.page_bins)} out_of_page={_.out_of_page}"
)
else:
return f"dataset={_.name} stored_size={_.stor_size}"
@dataclass(slots=True, frozen=True)
class ChunkStats:
"""Stats for one chunked HDF5 dataset."""
name: str
num_stored: int
size: int
stor_size: int
min_size: int
max_size: int
extent_ratio: float
page_bins: dict
page_spread_anomaly: int
def __post_init__(self):
if self.extent_ratio > 1:
raise ValueError(f"Chunk shape ratio greater than 1 for {self.name}")
if self.page_spread_anomaly < 0:
raise ValueError(f"Chunks file page spread anomaly negative for {self.name}")
def to_dict(self) -> dict[str, Any]:
d = {
"dataset": self.name,
"chunks_stored": self.num_stored,
"chunk_size": self.size,
"stored_size": self.stor_size,
"min_stored_chunk_size": self.min_size,
"max_stored_chunk_size": self.max_size,
"chunk_shape_ratio": self.extent_ratio,
}
if len(self.page_bins):
d.update(
{
"file_pages": self.page_bins,
"page_spread_anomaly": self.page_spread_anomaly,
}
)
return d
def to_print(self) -> str:
if len(self.page_bins):
return (
f"dataset={_.name} stored_size={_.stor_size} chunks_stored={_.num_stored}"
f" chunk_size={_.size} min_stored_chunk_size={_.min_size} max_stored_chunk_size={_.max_size}"
f" chunk_shape_ratio={_.extent_ratio:.6g} file_pages={len(_.page_bins)}"
f" page_spread_anomaly={_.page_spread_anomaly}"
)
else:
return (
f"dataset={_.name} stored_size={_.stor_size} chunks_stored={_.num_stored}"
f" chunk_size={_.size} min_stored_chunk_size={_.min_size} max_stored_chunk_size={_.max_size}"
f" chunk_shape_ratio={_.extent_ratio:.6g}"
)
def chunk_to_shape_ratio(chunk: tuple, shape: tuple) -> float:
"""Ratio of chunk to dataset shape extent."""
ratio = 1
for c, s in zip(chunk, shape):
try:
ratio *= min(1, c / s)
except ZeroDivisionError:
# Deal with 1D datasets without data...
continue
return ratio
def chunk_info(dset: h5py.Dataset, page_size: int) -> tuple[dict[int, int], int, int]:
"""Determine file page and the smallest and largest chunk size of a chunked
dataset.
Only for files with "PAGE" file space strategy.
"""
stinfo = defaultdict(int)
chunk_sizes = list()
def chunk_info(chunk_stor) -> None:
chunk_sizes.append(chunk_stor.size)
if chunk_stor.size <= page_size:
start_page = np.floor(chunk_stor.byte_offset / page_size).astype(int).item() + 1
end_page = (
np.floor((chunk_stor.byte_offset + chunk_stor.size - 1) / page_size)
.astype(int)
.item()
+ 1
)
if start_page != end_page:
raise ValueError(f"Chunk crosses file page boundary: {chunk_stor}")
stinfo[start_page] += 1
dset.id.chunk_iter(chunk_info)
return stinfo, min(chunk_sizes), max(chunk_sizes)
def cont2page(dset: h5py.Dataset, page_size: int) -> dict[int, int]:
"""Determine file page of a contiguous dataset."""
stinfo = defaultdict(int)
offs = dset.id.get_offset()
size = dset.id.get_storage_size()
if offs is not None and size <= page_size:
start_page = np.floor(offs / page_size).astype(int).item() + 1
end_page = np.floor((offs + size - 1) / page_size).astype(int).item() + 1
if start_page != end_page:
raise ValueError(f"Contiguous dataset crosses file page boundary: {dset.name}")
stinfo[start_page] += 1
return stinfo
def dset_stats(
name: str,
h5obj: Union[h5py.Group, h5py.Dataset],
dset_list: list[Union[ChunkStats, ContStats]],
page_size: int = 0,
) -> None:
if isinstance(h5obj, h5py.Dataset):
chunk_shape = h5obj.chunks
if chunk_shape:
chunk_nelem = reduce(operator.mul, chunk_shape, 1)
chunk_pages, min_size, max_size = chunk_info(h5obj, page_size)
if page_size:
num_chunks = reduce(operator.add, chunk_pages.values(), 0)
stored_size = h5obj.id.get_storage_size()
if max_size > page_size:
page_spread = 0
else:
page_spread = (
len(chunk_pages) - np.ceil(stored_size / page_size).astype(int).item()
)
else:
num_chunks = h5obj.id.get_num_chunks()
stored_size = h5obj.id.get_storage_size()
page_spread = 0
dset_list.append(
ChunkStats(
name=h5obj.name,
num_stored=num_chunks,
extent_ratio=chunk_to_shape_ratio(chunk_shape, h5obj.shape),
stor_size=stored_size,
min_size=min_size,
max_size=max_size,
size=h5obj.id.get_type().get_size() * chunk_nelem,
page_bins=chunk_pages,
page_spread_anomaly=page_spread,
)
)
else:
if h5obj.id.get_create_plist().get_layout() == h5py.h5d.COMPACT:
# Compact datasets are not included due to their specific storage...
return
stored_size = h5obj.id.get_storage_size()
dset_list.append(
ContStats(
name=h5obj.name,
stor_size=stored_size,
page_bins=cont2page(h5obj, page_size),
out_of_page=True if page_size and stored_size > page_size else False,
)
)
def stats_table(
bin_hdr: str,
bins: list,
bin_fmt: Union[str, list[str]],
stats_hdr: str,
data: np.ndarray,
) -> str:
"""Prepare and print a table with data."""
# Calculate the histograms...
hist, bins_ = np.histogram(data, bins=bins)
bin_prcnt = 100 * hist / np.sum(hist)
bin_cumsum_prcnt = 100 * np.cumsum(hist) / np.sum(hist)
# Headers...
prcnt_hdr = "% of\ntotal datasets"
cumcum_prcnt_hdr = "cusum % of\ntotal datasets"
tablefmt = "grid"
if isinstance(bin_fmt, list):
return tabulate(
{
bin_hdr: bin_fmt,
stats_hdr: hist,
prcnt_hdr: np.round(bin_prcnt, decimals=2),
cumcum_prcnt_hdr: np.round(bin_cumsum_prcnt, decimals=2),
},
headers="keys",
tablefmt=tablefmt,
)
else:
return tabulate(
{
bin_hdr: [
f"{bins_[i]:{bin_fmt}} ≤ # < {bins[i+1]:{bin_fmt}}"
for i in range(len(bins_) - 1)
],
stats_hdr: hist,
prcnt_hdr: np.round(bin_prcnt, decimals=2),
cumcum_prcnt_hdr: np.round(bin_cumsum_prcnt, decimals=2),
},
headers="keys",
tablefmt=tablefmt,
)
# ---------------------------------------------------------------------------- #
cli = get_cli_args()
if cli.h5file.startswith(("https://", "s3://")):
driver = "ros3"
page_buf_size = 64 * MiB
s3params = get_s3_params(need_region=cli.h5file.startswith("s3://"))
else:
driver = None
page_buf_size = 0
s3params = dict()
dset_info: list[Union[ChunkStats, ContStats]] = list()
with h5py.File(cli.h5file, mode="r", driver=driver, **s3params) as f:
fcpl = f.id.get_create_plist()
page = fcpl.get_file_space_strategy()[0] == h5py.h5f.FSPACE_STRATEGY_PAGE
if page:
page_size = fcpl.get_file_space_page_size()
else:
f.visititems(partial(dset_stats, dset_list=dset_info, page_size=0))
if page and page_size:
with h5py.File(
cli.h5file, mode="r", driver=driver, page_buf_size=page_buf_size, **s3params
) as f:
f.visititems(partial(dset_stats, dset_list=dset_info, page_size=page_size))
if cli.show:
if cli.json:
print(json.dumps([_.to_dict() for _ in sorted(dset_info, key=lambda d: d.name)]))
else:
for _ in sorted(dset_info, key=lambda d: d.name):
print(_.to_print())
raise SystemExit()
# Split dataset info into chunked and contiguous...
cont_info: list[ContStats] = list()
chunked_info: list[ChunkStats] = list()
for _ in dset_info:
if isinstance(_, ChunkStats):
chunked_info.append(_)
else:
cont_info.append(_)
del dset_info
print(f"\nDataset statistics for {cli.h5file}")
print("Compact datasets in the file, if they exist, are excluded.")
print(f"Contiguous datasets in the file: {len(cont_info)}")
print(f"Chunked datasets in the file: {len(chunked_info)}")
if page:
print(f'"PAGE" file space strategy with page size of {page_size:,} bytes.')
print("\n")
if cont_info:
print(
stats_table(
"Contiguous dataset size\nin bytes",
[0, 1_000_000, 4_000_000, 8_000_000, 16_000_000, np.inf],
".1e",
"# contiguous\ndatasets",
[_.stor_size for _ in cont_info],
),
end="\n\n\n",
)
if page:
print(
stats_table(
"Contiguous dataset",
[0, 1, 2],
["In a file page", "Out of file pages"],
"# contiguous\ndatasets",
[int(_.out_of_page) for _ in cont_info],
),
end="\n\n\n",
)
if chunked_info:
print(
stats_table(
"Chunked dataset total\nstored size in bytes",
[0, 1_000_000, 4_000_000, 8_000_000, 16_000_000, 32_000_000, 64_000_000, np.inf],
".1e",
"# chunked\ndatasets",
[_.stor_size for _ in chunked_info],
),
end="\n\n\n",
)
print(
stats_table(
"Chunk size in bytes",
[0, 10, 1000, 10000, 100_000, 1_000_000, 8_000_000, 16_000_000, np.inf],
".1e",
"# chunked\ndatasets",
[_.size for _ in chunked_info],
),
end="\n\n\n",
)
print(
stats_table(
"Chunk to dataset\nshape ratio",
[
0,
0.001,
0.002,
0.003,
0.004,
0.005,
0.01,
0.02,
0.03,
0.04,
0.05,
0.1,
0.25,
1,
],
".3f",
"# chunked\ndatasets",
[_.extent_ratio for _ in chunked_info],
),
end="\n\n\n",
)
print(
stats_table(
"Chunks stored",
[0, 1, 2, 10, 100, 1000, 10000, 100_000, np.inf],
[
"No chunks",
"1 chunk",
"2-9 chunks",
"10-99 chunks",
"100-999 chunks",
"1000-9999 chunks",
"10,000-99,999 chunks",
"100,000 or more chunks",
],
"# chunked\ndatasets",
[_.num_stored for _ in chunked_info],
),
end="\n\n\n",
)
print(
stats_table(
"Chunk cache size",
[0, 1 * MiB, 4 * MiB, 8 * MiB, 16 * MiB, np.inf],
["1 MiB", "4 MiB", "8 MiB", "16 MiB", "> 16 MiB"],
"# chunked\ndatasets",
[_.size * _.num_stored for _ in chunked_info],
),
end="\n\n\n" if page else "\n",
)
if page:
print(
stats_table(
"Chunk size vs file page size",
[0, 1, 2],
["All chunks in file pages", "Some chunks out of file pages"],
"# chunked\ndatasets",
[1 if _.max_size > page_size else 0 for _ in chunked_info],
),
end="\n\n\n",
)
# Remove all chunked datasets with chunks bigger than one file page
# because they are going to mess up the following stats...
cleaned_chunked_info = [_ for _ in chunked_info if _.max_size <= page_size]
if len(cleaned_chunked_info) < len(chunked_info):
print(
f"*** Removed {len(chunked_info) - len(cleaned_chunked_info)} chunked datasets "
"with chunks stored outside of file pages because ***\n*** they are not applicable "
"to following stats. ***",
end="\n\n\n",
)
chunked_info = cleaned_chunked_info
del cleaned_chunked_info
print(
stats_table(
"# of file pages\nholding all chunks",
[1, 2, 3, 4, 5, 6, 10, 15, 20, 25, 30, np.inf],
[
"1 page",
"2 pages",
"3 pages",
"4 pages",
"5 pages",
"6 - 9 pages",
"10 - 14 pages",
"15 - 19 pages",
"20 - 24 pages",
"25 - 29 pages",
"30 or more pages",
],
"# chunked\ndatasets",
[len(_.page_bins) for _ in chunked_info],
),
end="\n\n\n",
)
print(
stats_table(
"# file pages anomaly",
[0, 1, 2, 3, 4, 5, np.inf],
[
"No extra file pages",
"1 extra file page",
"2 extra file pages",
"3 extra file pages",
"4 extra file pages",
"5 or more extra file pages",
],
"# chunked\ndatasets",
[_.page_spread_anomaly for _ in chunked_info],
),
end="\n\n\n",
)
print(
stats_table(
"Max % of chunks\nin one file page",
[0, 20, 40, 60, 80, 100],
".0f",
"# chunked\ndatasets",
[
max(map(lambda x: 100 * x / _.num_stored, _.page_bins.values()))
for _ in chunked_info
],
),
)
@ajelenak
Copy link
Author

Changes:

  • Added stats for total stored size of chunked datasets.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment