Created
March 26, 2024 15:09
-
-
Save manzt/8cae6696b9982e66a39bb1217ec19639 to your computer and use it in GitHub Desktop.
Map higlass tilesets to multiscale zarr
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import dataclasses | |
import httpx | |
import zarr | |
import zarr.storage | |
import zarr.util | |
def get_tile_size(info: dict) -> int: | |
if "bins_per_dimension" in info: | |
return info["bins_per_dimension"] | |
return 256 | |
def get_max_zoom(info: dict) -> int: | |
if "max_zoom" in info: | |
return info["max_zoom"] | |
return len(info["resolutions"]) | |
def create_meta(info: dict) -> dict: | |
tile_size = get_tile_size(info) | |
max_zoom = get_max_zoom(info) | |
store = {} | |
datasets: list[dict] = [] | |
size = tile_size | |
zarr.storage.init_group(store=store) | |
for path in range(max_zoom, -1, -1): | |
datasets.append({"path": str(path)}) | |
zarr.storage.init_array( | |
store=store, | |
path=str(max_zoom - path), | |
shape=(size, size), | |
chunks=(tile_size, tile_size), | |
dtype="float32", | |
compressor=None, | |
dimension_separator="/", | |
) | |
size *= 2 | |
store[".zattrs"] = zarr.util.json_dumps( | |
{"multiscales": [{"datasets": datasets}]} | |
) | |
return store | |
def is_meta_key(key: str): | |
return key.endswith(".zarray") or key.endswith(".zattrs") or key.endswith(".zgroup") | |
@dataclasses.dataclass | |
class RemoteTileset: | |
server: str | |
uid: str | |
def info(self): | |
with httpx.Client() as client: | |
r = client.get( | |
url=f"{self.server.rstrip('/')}/tileset_info/?d={self.uid}", | |
follow_redirects=True, | |
) | |
return r.json()[self.uid] | |
def tiles(self, tile_ids: str): | |
query = "&d=".join(tile_ids) | |
with httpx.Client() as client: | |
r = client.get( | |
url=f"{self.server.rstrip('/')}/tiles/?d={query}", | |
follow_redirects=True, | |
) | |
return list(r.json().items()) | |
def process_tile(tile) -> bytes | None: | |
bdata = base64.b64decode(tile["dense"]) | |
# TODO: configuration to compress / not compress | |
return bdata | |
class TilesetStore(zarr.storage.BaseStore): | |
def __init__(self, tileset): | |
self._ts = tileset | |
self._meta = create_meta(tileset.info()) | |
def __getitem__(self, key: str): | |
if is_meta_key(key): | |
return self._meta[key] | |
return self._getitems([key])[key] | |
def getitems(self, keys: list[str], *, contexts): | |
max_request_size = 5 | |
batches = [ | |
keys[i : i + max_request_size] | |
for i in range(0, len(keys), max_request_size) | |
] | |
out = {} | |
for batch in batches: | |
out.update(self._getitems(batch)) | |
return out | |
def _getitems(self, keys: list[str]): | |
tile_ids = [] | |
for key in keys: | |
zoom_level, x, y = map(int, key.split("/")) | |
tile_id = f"{self._ts.uid}.{zoom_level}.{y}.{x}" | |
tile_ids.append(tile_id) | |
try: | |
tiles = dict(self._ts.tiles(tile_ids)) | |
except Exception as e: | |
return {} | |
data = { | |
zarr_key: process_tile(tiles[tile_id]) | |
for zarr_key, tile_id in zip(keys, tile_ids) | |
} | |
# filter out None | |
data = {k: v for k, v in data.items() if v is not None} | |
return data | |
def __setitem__(self, key: str, value: bytes): | |
raise NotImplementedError() | |
def __delitem__(self, key: str): | |
raise NotImplementedError() | |
def __iter__(self): | |
raise NotImplementedError() | |
def __len__(self): | |
raise NotImplementedError() | |
if __name__ == "__main__": | |
from simple_zarr_server import serve | |
import higlass.tilesets | |
ts = higlass.tilesets.cooler("/Users/manzt/Downloads/4DNFI8FMG978.mcool") | |
store = TilesetStore(ts) | |
grp = zarr.open(store) | |
serve(grp, allowed_origins=["*"]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment