Last active
March 1, 2023 16:04
-
-
Save ajelenak/80354a95b449cedea5cca508004f97a9 to your computer and use it in GitHub Desktop.
Python code to extract HDF5 chunk locations and add them to Zarr metadata.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Requirements: | |
# HDF5 library version 1.10.5 or later | |
# h5py version 3.0 or later | |
# pip install git+https://github.com/HDFGroup/zarr-python.git@hdf5 | |
import logging | |
from urllib.parse import urlparse, urlunparse | |
import numpy as np | |
import h5py | |
import zarr | |
from zarr.storage import FileChunkStore | |
from zarr.meta import encode_fill_value | |
from numcodecs import Zlib | |
import fsspec | |
lggr = logging.getLogger('h5-to-zarr') | |
lggr.addHandler(logging.NullHandler()) | |
class Hdf5ToZarr: | |
"""Translate the content of one HDF5 file into Zarr metadata. | |
HDF5 groups become Zarr groups. HDF5 datasets become Zarr arrays. Zarr array | |
chunks remain in the HDF5 file. | |
Parameters | |
---------- | |
h5f : file-like or str | |
Input HDF5 file as a string or file-like Python object. | |
store : MutableMapping | |
Zarr store. | |
xarray : bool, optional | |
Produce atributes required by the `xarray <http://xarray.pydata.org>`_ | |
package to correctly identify dimensions (HDF5 dimension scales) of a | |
Zarr array. Default is ``False``. | |
""" | |
def __init__(self, h5f, store, xarray=False): | |
# Open HDF5 file in read mode... | |
lggr.debug(f'HDF5 file: {h5f}') | |
lggr.debug(f'Zarr store: {store}') | |
lggr.debug(f'xarray: {xarray}') | |
self._h5f = h5py.File(h5f, mode='r') | |
self._xr = xarray | |
# Create Zarr store's root group... | |
self._zroot = zarr.group(store=store, overwrite=True) | |
# Figure out HDF5 file's URI... | |
if hasattr(h5f, 'name'): | |
self._uri = h5f.name | |
elif hasattr(h5f, 'url'): | |
parts = urlparse(h5f.url()) | |
self._uri = urlunparse(parts[:3] + ('',) * 3) | |
else: | |
self._uri = None | |
lggr.debug(f'Source URI: {self._uri}') | |
def translate(self): | |
"""Translate content of one HDF5 file into Zarr storage format. | |
No data is copied out of the HDF5 file. | |
""" | |
lggr.debug('Translation begins') | |
self.transfer_attrs(self._h5f, self._zroot) | |
self._h5f.visititems(self.translator) | |
def transfer_attrs(self, h5obj, zobj): | |
"""Transfer attributes from an HDF5 object to its equivalent Zarr object. | |
Parameters | |
---------- | |
h5obj : h5py.Group or h5py.Dataset | |
An HDF5 group or dataset. | |
zobj : zarr.hierarchy.Group or zarr.core.Array | |
An equivalent Zarr group or array to the HDF5 group or dataset with | |
attributes. | |
""" | |
for n, v in h5obj.attrs.items(): | |
if n in ('REFERENCE_LIST', 'DIMENSION_LIST'): | |
continue | |
# Fix some attribute values to avoid JSON encoding exceptions... | |
if isinstance(v, bytes): | |
v = v.decode('utf-8') | |
elif isinstance(v, (np.ndarray, np.number)): | |
if n == '_FillValue': | |
v = encode_fill_value(v, v.dtype) | |
elif v.size == 1: | |
v = v.flatten()[0].tolist() | |
else: | |
v = v.tolist() | |
if self._xr and v == 'DIMENSION_SCALE': | |
continue | |
try: | |
zobj.attrs[n] = v | |
except TypeError: | |
print(f'Caught TypeError: {n}@{h5obj.name} = {v} ({type(v)})') | |
def translator(self, name, h5obj): | |
"""Produce Zarr metadata for all groups and datasets in the HDF5 file. | |
""" | |
if isinstance(h5obj, h5py.Dataset): | |
lggr.debug(f'Dataset: {h5obj.name}') | |
if h5obj.id.get_create_plist().get_layout() == h5py.h5d.COMPACT: | |
RuntimeError( | |
f'Compact HDF5 datasets not yet supported: <{h5obj.name} ' | |
f'{h5obj.shape} {h5obj.dtype} {h5obj.nbytes} bytes>') | |
return | |
if (h5obj.scaleoffset or h5obj.fletcher32 or h5obj.shuffle or | |
h5obj.compression in ('szip', 'lzf')): | |
raise RuntimeError( | |
f'{h5obj.name} uses unsupported HDF5 filters') | |
if h5obj.compression == 'gzip': | |
compression = Zlib(level=h5obj.compression_opts) | |
else: | |
compression = None | |
# Get storage info of this HDF5 dataset... | |
cinfo = self.storage_info(h5obj) | |
if self._xr and h5py.h5ds.is_scale(h5obj.id) and not cinfo: | |
return | |
# Create a Zarr array equivalent to this HDF5 dataset... | |
za = self._zroot.create_dataset(h5obj.name, shape=h5obj.shape, | |
dtype=h5obj.dtype, | |
chunks=h5obj.chunks or False, | |
fill_value=h5obj.fillvalue, | |
compression=compression, | |
overwrite=True) | |
lggr.debug(f'Created Zarr array: {za}') | |
self.transfer_attrs(h5obj, za) | |
if self._xr: | |
# Do this for xarray... | |
adims = self._get_array_dims(h5obj) | |
za.attrs['_ARRAY_DIMENSIONS'] = adims | |
lggr.debug(f'_ARRAY_DIMENSIONS = {adims}') | |
# Store chunk location metadata... | |
if cinfo: | |
cinfo['source'] = {'uri': self._uri, | |
'array_name': h5obj.name} | |
FileChunkStore.chunks_info(za, cinfo) | |
elif isinstance(h5obj, h5py.Group): | |
lggr.debug(f'Group: {h5obj.name}') | |
zgrp = self._zroot.create_group(h5obj.name) | |
self.transfer_attrs(h5obj, zgrp) | |
def _get_array_dims(self, dset): | |
"""Get a list of dimension scale names attached to input HDF5 dataset. | |
This is required by the xarray package to work with Zarr arrays. Only | |
one dimension scale per dataset dimension is allowed. If dataset is | |
dimension scale, it will be considered as the dimension to itself. | |
Parameters | |
---------- | |
dset : h5py.Dataset | |
HDF5 dataset. | |
Returns | |
------- | |
list | |
List with HDF5 path names of dimension scales attached to input | |
dataset. | |
""" | |
dims = list() | |
rank = len(dset.shape) | |
if rank: | |
for n in range(rank): | |
num_scales = len(dset.dims[n]) | |
if num_scales == 1: | |
dims.append(dset.dims[n][0].name[1:]) | |
elif h5py.h5ds.is_scale(dset.id): | |
dims.append(dset.name[1:]) | |
elif num_scales > 1: | |
raise RuntimeError( | |
f'{dset.name}: {len(dset.dims[n])} ' | |
f'dimension scales attached to dimension #{n}') | |
return dims | |
def storage_info(self, dset): | |
"""Get storage information of an HDF5 dataset in the HDF5 file. | |
Storage information consists of file offset and size (length) for every | |
chunk of the HDF5 dataset. | |
Parameters | |
---------- | |
dset : h5py.Dataset | |
HDF5 dataset for which to collect storage information. | |
Returns | |
------- | |
dict | |
HDF5 dataset storage information. Dict keys are chunk array offsets | |
as tuples. Dict values are pairs with chunk file offset and size | |
integers. | |
""" | |
# Empty (null) dataset... | |
if dset.shape is None: | |
return dict() | |
dsid = dset.id | |
if dset.chunks is None: | |
# Contiguous dataset... | |
if dsid.get_offset() is None: | |
# No data ever written... | |
return dict() | |
else: | |
key = (0,) * (len(dset.shape) or 1) | |
return {key: {'offset': dsid.get_offset(), | |
'size': dsid.get_storage_size()}} | |
else: | |
# Chunked dataset... | |
num_chunks = dsid.get_num_chunks() | |
if num_chunks == 0: | |
# No data ever written... | |
return dict() | |
# Go over all the dataset chunks... | |
stinfo = dict() | |
chunk_size = dset.chunks | |
for index in range(num_chunks): | |
blob = dsid.get_chunk_info(index) | |
key = tuple( | |
[a // b for a, b in zip(blob.chunk_offset, chunk_size)]) | |
stinfo[key] = {'offset': blob.byte_offset, | |
'size': blob.size} | |
return stinfo | |
if __name__ == '__main__': | |
lggr.setLevel(logging.DEBUG) | |
lggr_handler = logging.StreamHandler() | |
lggr_handler.setFormatter(logging.Formatter( | |
'%(levelname)s:%(name)s:%(funcName)s:%(message)s')) | |
lggr.addHandler(lggr_handler) | |
with fsspec.open('s3://pangeo-data-uswest2/esip/adcirc/adcirc_01d.nc', | |
mode='rb', anon=False, requester_pays=True, | |
default_fill_cache=False) as f: | |
store = zarr.DirectoryStore('../adcirc_01d.nc.chunkstore') | |
h5chunks = Hdf5ToZarr(f, store, xarray=True) | |
h5chunks.translate() | |
# Consolidate Zarr metadata... | |
lggr.info('Consolidating Zarr dataset metadata') | |
zarr.convenience.consolidate_metadata(store) | |
lggr.info('Done') |
@ajelenak. Hello, I try to use your code, and after by passing the initial errors (use of the modified zarr package etc ), I get the following error :
module 'zarr' has no attribute 'DirectoryStore'
Thanks in advance
@ajelenak. Hello, I try to use your code, and after by passing the initial errors (use of the modified zarr package etc ), I get the following error : module 'zarr' has no attribute 'DirectoryStore' Thanks in advance
It was solved, re-installation of git+https://github.com/HDFGroup/zarr-python.git@hdf5
@Haris-auth Glad that you finally made it to work. This idea/code now lives in the kerchunk package. It also enables using official zarr package releases.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hello @rsignell-usgs. Thank you very much for sharing the binder. It is great to be able too read chunked netcdf file as its zarr.
I tried the pangeo binder, but at the line
cluster = gateway.new_cluster()
I get following err;
GatewayClusterError: Cluster 'prod.1716d3af703f48bbb8c7428b13823e65' failed to start, see logs for more information
Sorry, I could not find the logs so I couldn't fix by myself.
I skipped the usage of Dask Gateway cluster, I shortened the computation as
#max_var = ds['zeta'].max(dim='time')
max_var = ds['zeta'].sel(node=slice(0,425919)).max(dim='time').compute()
and I could verify so it is ok for me.
If I should report this somewhere else (pangeo Gitter?), plz let me know