Last active
July 4, 2024 15:52
-
-
Save bendichter/30a9afb34b2178098c99f3b01fe72e75 to your computer and use it in GitHub Desktop.
Create zarr metadata to read arbitrary binary file
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import json | |
import base64 | |
def _add_dataset_to_rfs( | |
rfs: dict, | |
shape: list[int], | |
dtype: np.dtype, | |
dset_name: str, | |
chunk_len: int = None, | |
dims: list[str] = None, | |
units: str = None, | |
order: str = "C", | |
description: str = None, | |
scale_factor: float = None, | |
add_offset: float = None, | |
fill_value=None, | |
) -> None: | |
""" | |
Add a dataset to the reference filesystem | |
Parameters | |
---------- | |
rfs: dict | |
Reference file system dictionary | |
shape : list | |
The shape of the dataset. | |
dtype : np.dtype | |
The data type of the dataset. | |
dset_name : str | |
The name of the dataset. | |
chunk_len : int, optional | |
The length of the chunks used to divide the dataset. If None, dataset is | |
unchunked. | |
dims : list, optional | |
Labels for the dimensions of the dataset. If None, phony dimension labels | |
will be generated. | |
order : {"C", "F"} | |
Order of values in array. Default is "C". | |
description : str | |
scale_factor : float, optional | |
If dataset is read with mask_and_scale=True, the dataset will be scaled by | |
this factor. If None, no scaling is applied. | |
add_offset : float, optional | |
If dataset is read with mask_and_scale=True, this value will be added to the | |
scaled dataset. If None, no offset is added. | |
fill_value : optional | |
This value denotes a missing value in the dataset. When reading with | |
mask_and_scale=True, these values will be converted to NaNs. If None, no fill | |
value is used. Note that this is a different usage from a Zarr or HDF5 fill | |
value. | |
""" | |
if dims is None: | |
dims = [f"phony_dim_{i}" for i in range(len(shape))] | |
if chunk_len is None: | |
chunk_len = shape[0] | |
dset_shape = list(shape) | |
chunk_shape = [chunk_len] + dset_shape[1:] | |
zarray = dict( | |
chunks=chunk_shape, | |
compressor=None, | |
dtype=dtype.str, | |
fill_value=fill_value, | |
filters=None, | |
order=order, | |
shape=dset_shape, | |
zarr_format=2, | |
) | |
attrs = dict(_ARRAY_DIMENSIONS=dims) | |
if units is not None: | |
attrs.update(units=units) | |
if description is not None: | |
attrs.update(description=description) | |
if scale_factor is not None: | |
attrs.update(scale_factor=scale_factor) | |
if add_offset is not None: | |
attrs.update(add_offset=add_offset) | |
rfs["refs"].update( | |
{ | |
f"{dset_name}/.zarray": json.dumps(zarray), | |
f"{dset_name}/.zattrs": json.dumps(attrs), | |
} | |
) | |
def add_internal_dataset_to_rfs( | |
rfs: dict, | |
data: np.ndarray, | |
dset_name: str, | |
chunk_len: int = None, | |
dims: list = None, | |
units: str = None, | |
order: str = "C", | |
description: str = None, | |
scale_factor=None, | |
add_offset=None, | |
fill_value=None, | |
) -> None: | |
""" | |
Add a dataset to the reference filesystem inline as base64 | |
Parameters | |
---------- | |
rfs: dict | |
Reference file system dictionary | |
data: np.ndarray | |
The dataset to be added to the reference file system. | |
dset_name : str, default="data" | |
The name of the dataset. | |
chunk_len : int, optional | |
The length of the chunks used to divide the dataset. If None, dataset is unchunked | |
dims : list, optional | |
Labels for the dimensions of the dataset. If None, phony dimension labels | |
will be generated. | |
units : str, optional | |
order : {"C", "F"} | |
Order of values in array | |
description : str | |
scale_factor : float, optional | |
If dataset is read with mask_and_scale=True, the dataset will be scaled by | |
this factor. If None, no scaling is applied. | |
add_offset : float, optional | |
If dataset is read with mask_and_scale=True, this value will be added to the | |
scaled dataset. If None, no offset is added. | |
fill_value : optional | |
This value denotes a missing value in the dataset. When reading with | |
mask_and_scale=True, these values will be converted to NaNs. If None, no fill | |
value is used. Note that this is a different usage from a Zarr or HDF5 fill | |
value. | |
""" | |
_add_dataset_to_rfs( | |
rfs=rfs, | |
shape=list(data.shape), | |
dtype=data.dtype, | |
dset_name=dset_name, | |
chunk_len=chunk_len, | |
dims=dims, | |
units=units, | |
order=order, | |
description=description, | |
scale_factor=scale_factor, | |
add_offset=add_offset, | |
fill_value=fill_value, | |
) | |
base64_encoded = base64.b64encode(data.tobytes()) | |
rfs["refs"][f"{dset_name}/0"] = "base64:" + base64_encoded.decode() | |
def add_external_dataset_to_rfs( | |
rfs: dict, | |
filepath: str, | |
shape: tuple, | |
dtype: np.dtype, | |
offset: int = 0, | |
dset_name: str = "data", | |
chunk_len: int = None, | |
dims: list = None, | |
units: str = None, | |
order: str = "C", | |
description: str = None, | |
scale_factor: float = None, | |
add_offset: float = None, | |
fill_value=None, | |
) -> None: | |
""" | |
Add a dataset to the reference filesystem as an external file | |
Parameters | |
---------- | |
rfs: dict | |
filepath : str | |
The name of the file where the dataset is stored. | |
shape : tuple | |
The shape of the dataset. | |
dtype : np.dtype | |
The data type of the dataset. | |
offset : int, default=0 | |
The initial byte offset in the file where the dataset starts. | |
dset_name : str, default="data" | |
The name of the dataset. | |
chunk_len : int, optional | |
The length of the chunks used to divide the dataset. If None, dataset is unchunked | |
dims : list, optional | |
Labels for the dimensions of the dataset. If None, phony dimension labels | |
will be generated. | |
units : str, optional | |
order : {"C", "F"} | |
Order of values in array | |
description : str | |
scale_factor : float, optional | |
If dataset is read with mask_and_scale=True, the dataset will be scaled by | |
this factor. If None, no scaling is applied. | |
add_offset : float, optional | |
If dataset is read with mask_and_scale=True, this value will be added to the | |
scaled dataset. If None, no offset is added. | |
fill_value : optional | |
This value denotes a missing value in the dataset. When reading with | |
mask_and_scale=True, these values will be converted to NaNs. If None, no fill | |
value is used. Note that this is a different usage from a Zarr or HDF5 fill | |
value. | |
""" | |
_add_dataset_to_rfs( | |
rfs=rfs, | |
shape=list(shape), | |
dtype=dtype, | |
dset_name=dset_name, | |
chunk_len=chunk_len, | |
dims=dims, | |
units=units, | |
order=order, | |
description=description, | |
scale_factor=scale_factor, | |
add_offset=add_offset, | |
fill_value=fill_value, | |
) | |
if chunk_len is None: | |
chunk_len = shape[0] | |
n_chunks = shape[0] // chunk_len | |
chunk_shape = [chunk_len] + list(shape[1:]) | |
chunk_size = int(np.prod(chunk_shape) * dtype.itemsize) | |
for i_chunk in range(n_chunks): | |
key = f"{dset_name}/{i_chunk}" + ".0" * (len(shape) - 1) | |
rfs["refs"][key] = [filepath, offset, chunk_size] | |
offset += chunk_size | |
def create_rfs() -> dict: | |
""" | |
Creates a Zarr metadata dictionary for a dataset, including chunk references. | |
Returns | |
------- | |
dict | |
A dictionary representing Zarr-like metadata and chunk references. | |
""" | |
rfs = {"version": 1, "refs": {".zgroup": json.dumps(dict(zarr_format=2))}} | |
return rfs | |
def read_xarray_from_rfs( | |
rfs: dict, remote_protocol: str = "file", mask_and_scale=False | |
): | |
""" | |
Read an xarray dataset from a reference filesystem. | |
Parameters | |
---------- | |
rfs : dict | |
The reference filesystem containing the dataset. | |
remote_protocol : str, default="file" | |
The protocol to use for remote references. Can be "file" or "http". | |
mask_and_scale : bool, default=False | |
If True, the dataset will be read with mask_and_scale=True. | |
""" | |
ds = xr.open_dataset( | |
"reference://", | |
mask_and_scale=mask_and_scale, | |
engine="zarr", | |
backend_kwargs={ | |
"storage_options": dict( | |
fo=rfs, | |
remote_protocol=remote_protocol, | |
), | |
"consolidated": False, | |
}, | |
) | |
return ds | |
# tests | |
from numpy.testing import assert_array_equal | |
import h5py | |
import xarray as xr | |
# Define the path for the example HDF5 file | |
hdf5_file_path = "example4.h5" | |
dset_name = "example_dataset" | |
# Create some example data | |
data = np.random.randint(5, size=(100, 100)) | |
# Create the HDF5 file and dataset | |
with h5py.File(hdf5_file_path, "w") as f: | |
f.create_dataset("example_dataset", data=data) | |
rfs = create_rfs() | |
add_external_dataset_to_rfs( | |
rfs, | |
shape=(100, 100), | |
chunk_len=10, | |
dtype=data.dtype, | |
dims=["time", "channels"], | |
offset=2048, | |
filepath=hdf5_file_path, | |
units="uV", | |
scale_factor=10, | |
add_offset=100, | |
) | |
channel_names = np.array([f"chan{x}" for x in range(100)]) | |
add_internal_dataset_to_rfs( | |
rfs, | |
channel_names, | |
dset_name="channels", | |
dims=[ | |
"channels", | |
], | |
) | |
rfs2 = create_rfs() | |
add_external_dataset_to_rfs( | |
rfs2, | |
shape=(100, 100), | |
dtype=data.dtype, | |
dims=["a", "b"], | |
offset=2048, | |
filepath=hdf5_file_path, | |
) | |
assert_array_equal( | |
read_xarray_from_rfs(rfs)["data"].data, read_xarray_from_rfs(rfs2)["data"].data | |
) | |
with h5py.File(hdf5_file_path, mode="r") as file: | |
dset = file["/example_dataset"][:] | |
assert_array_equal(dset, read_xarray_from_rfs(rfs)["data"].data) | |
read_xarray_from_rfs(rfs)["data"] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment