Created
April 2, 2026 20:32
-
-
Save d-v-b/c3563c7c650f2e83a7e53392cebc167f to your computer and use it in GitHub Desktop.
Scale-offset encoding for Xarray via codecs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # /// script | |
| # requires-python = ">=3.12" | |
| # dependencies = [ | |
| # "zarr==3.1.6", | |
| # "xarray>=2025.7.1", | |
| # "cast-value>=0.2.1", | |
| # "cast-value-rs>=0.4.0", | |
| # "numpy>=2.4", | |
| # ] | |
| # /// | |
| from __future__ import annotations | |
| from dataclasses import dataclass | |
| from typing import TYPE_CHECKING, Self | |
| import numpy as np | |
| import xarray as xr | |
| import zarr | |
| import zarr.storage | |
| from cast_value import CastValueRustV1 | |
| from zarr.abc.codec import ArrayArrayCodec | |
| from zarr.core.common import JSON, parse_named_configuration | |
| from zarr.registry import register_codec | |
| if TYPE_CHECKING: | |
| from zarr.core.array_spec import ArraySpec | |
| from zarr.core.chunk_grids import ChunkGrid | |
| from zarr.core.dtype import ZDType | |
| from zarr.core.dtype.wrapper import TBaseDType, TBaseScalar | |
| from zarr.core.ndbuffer import NDBuffer | |
| # --------------------------------------------------------------------------- | |
| # ScaleOffset codec (inline definition — no external dependency) | |
| # --------------------------------------------------------------------------- | |
| @dataclass(frozen=True) | |
| class ScaleOffset(ArrayArrayCodec): | |
| """Zarr v3 array-to-array codec: linear scale and offset. | |
| Encode: out = (in - offset) * scale | |
| Decode: out = (in / scale) + offset | |
| """ | |
| is_fixed_size = True | |
| offset: float | |
| scale: float | |
| def __init__(self, *, offset: float = 0.0, scale: float = 1.0) -> None: | |
| object.__setattr__(self, "offset", float(offset)) | |
| object.__setattr__(self, "scale", float(scale)) | |
| @classmethod | |
| def from_dict(cls, data: dict[str, JSON]) -> Self: | |
| _, configuration_parsed = parse_named_configuration(data, "scale_offset") | |
| return cls(**configuration_parsed) | |
| def to_dict(self) -> dict[str, JSON]: | |
| return { | |
| "name": "scale_offset", | |
| "configuration": {"offset": self.offset, "scale": self.scale}, | |
| } | |
| def validate(self, shape: tuple[int, ...], dtype: ZDType[TBaseDType, TBaseScalar], chunk_grid: ChunkGrid) -> None: | |
| pass | |
| def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: | |
| return self | |
| def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec: | |
| return chunk_spec | |
| async def _decode_single(self, chunk_array: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer: | |
| return self._decode_sync(chunk_array, chunk_spec) | |
| def _decode_sync(self, chunk_array: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer: | |
| data = chunk_array.as_numpy_array() | |
| decoded = (data / self.scale) + self.offset | |
| return chunk_spec.prototype.nd_buffer.from_numpy_array(decoded) | |
| async def _encode_single(self, chunk_array: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer | None: | |
| return self._encode_sync(chunk_array, chunk_spec) | |
| def _encode_sync(self, chunk_array: NDBuffer, _chunk_spec: ArraySpec) -> NDBuffer | None: | |
| data = chunk_array.as_numpy_array() | |
| encoded = (data - self.offset) * self.scale | |
| return chunk_array.from_numpy_array(encoded) | |
| def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int: | |
| return input_byte_length | |
| # Register so zarr can reconstruct the codec from metadata on read. | |
| register_codec("scale_offset", ScaleOffset) | |
| def main() -> None: | |
| # -- CF parameters (as you'd find on a netCDF / zarr variable) ----------- | |
| scale_factor = 0.01 | |
| add_offset = 273.15 | |
| packed_dtype = np.dtype("int16") | |
| # -- Source data: float values that a user would work with --------------- | |
| packed = np.arange(-1000, 1001, dtype=packed_dtype) | |
| float_data = packed * scale_factor + add_offset | |
| # ----------------------------------------------------------------------- | |
| # Approach 1: xarray CF encoding (the status quo) | |
| # | |
| # xarray writes packed integers to disk and attaches scale_factor / | |
| # add_offset attributes. On read, xarray decodes them back to floats. | |
| # ----------------------------------------------------------------------- | |
| cf_store = zarr.storage.MemoryStore() | |
| xr.Dataset({"temperature": xr.DataArray(float_data, dims=["x"])}).to_zarr( | |
| cf_store, | |
| mode="w", | |
| zarr_format=3, | |
| encoding={ | |
| "temperature": { | |
| "dtype": packed_dtype, | |
| "scale_factor": scale_factor, | |
| "add_offset": add_offset, | |
| } | |
| }, | |
| ) | |
| cf_result = xr.open_dataset(cf_store, engine="zarr")["temperature"].values | |
| # ----------------------------------------------------------------------- | |
| # Approach 2: zarr codec pipeline (the replacement) | |
| # | |
| # The same float data is written through ScaleOffset + CastValue codecs. | |
| # No CF attributes — the codec pipeline handles encoding/decoding. | |
| # Any zarr v3 reader with these codecs gets correct float values. | |
| # ----------------------------------------------------------------------- | |
| so_codec = ScaleOffset(offset=add_offset, scale=1.0 / scale_factor) | |
| nan_sentinel = int(np.iinfo(packed_dtype).min) | |
| cv_codec = CastValueRustV1( | |
| data_type=packed_dtype.name, | |
| rounding="nearest-even", | |
| scalar_map={ | |
| "encode": [["NaN", nan_sentinel]], | |
| "decode": [[nan_sentinel, "NaN"]], | |
| }, | |
| ) | |
| codec_store = zarr.storage.MemoryStore() | |
| xr.Dataset({"temperature": xr.DataArray(float_data, dims=["x"])}).to_zarr( | |
| codec_store, | |
| mode="w", | |
| zarr_format=3, | |
| encoding={"temperature": {"filters": (so_codec, cv_codec)}}, | |
| ) | |
| codec_result = xr.open_dataset(codec_store, engine="zarr")["temperature"].values | |
| # ----------------------------------------------------------------------- | |
| # Compare | |
| # ----------------------------------------------------------------------- | |
| print(f"Attributes encoding: {cf_result}") | |
| print(f"Codec encoding: {codec_result}") | |
| print(f"Identical: {np.array_equal(cf_result, codec_result)}") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment