Skip to content

Instantly share code, notes, and snippets.

@TomNicholas
Created June 1, 2023 20:30
Show Gist options
  • Select an option

  • Save TomNicholas/7fd86a98c5eb80aac0ecb5efd3557d7b to your computer and use it in GitHub Desktop.

Select an option

Save TomNicholas/7fd86a98c5eb80aac0ecb5efd3557d7b to your computer and use it in GitHub Desktop.
Bug with xarray lazy indexing and cubed
import sys
import yaml
import logging
from tqdm.contrib.logging import logging_redirect_tqdm
import xarray as xr
import cubed
from cubed.extensions.history import HistoryCallback
from cubed.extensions.timeline import TimelineVisualizationCallback
from cubed.extensions.tqdm import TqdmProgressBar
from cubed.runtime.executors.lithops import LithopsDagExecutor
logging.basicConfig(level=logging.INFO)
# suppress harmless connection pool warnings
logging.getLogger("urllib3.connectionpool").setLevel(logging.ERROR)
if __name__ == "__main__":
tmp_path = sys.argv[1]
runtime = sys.argv[2]
spec = cubed.Spec(tmp_path, allowed_mem='1GB')
executor = LithopsDagExecutor()
config = yaml.safe_load(open('/home/tom/Documents/Work/Code/experimentation/cubed_xarray_blog/lithops_gcf/.lithops_config'))
print(config)
single_reanalysis = xr.open_zarr(
"gs://gcp-public-data-arco-era5/co/single-level-reanalysis.zarr",
chunks=None,
consolidated=True,
)
# optionally downsample here
# Do indexing before chunking to avoid creating big empty arrays for cubed to track (same trick works for dask)
ds = single_reanalysis[['u100', 'v100']].isel(time=slice(0, 1000))
print(ds)
ds = ds.chunk(
chunks={'time': 1},
chunked_array_type='cubed',
from_array_kwargs={'spec': spec},
)
print(ds['u100'].chunksizes)
print(f'size of dataset to process in GB = {ds.nbytes / 1e9}') # size in GB
quadratic_products = ds**2
quadratic_products["uv"] = ds['u100'] * ds['v100']
mean = quadratic_products.mean("time", skipna=False) # skipna=False to avoid eager load, see xarray issue #7243)
with logging_redirect_tqdm():
progress = TqdmProgressBar()
history = HistoryCallback()
timeline_viz = TimelineVisualizationCallback()
result = mean.compute(
callbacks=[progress, history, timeline_viz],
executor=executor,
config=config,
runtime="cubed-runtime",
runtime_memory=2048,
#use_backups=True,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment