Created
June 1, 2023 20:30
-
-
Save TomNicholas/7fd86a98c5eb80aac0ecb5efd3557d7b to your computer and use it in GitHub Desktop.
Bug with xarray lazy indexing and cubed
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import sys | |
| import yaml | |
| import logging | |
| from tqdm.contrib.logging import logging_redirect_tqdm | |
| import xarray as xr | |
| import cubed | |
| from cubed.extensions.history import HistoryCallback | |
| from cubed.extensions.timeline import TimelineVisualizationCallback | |
| from cubed.extensions.tqdm import TqdmProgressBar | |
| from cubed.runtime.executors.lithops import LithopsDagExecutor | |
| logging.basicConfig(level=logging.INFO) | |
| # suppress harmless connection pool warnings | |
| logging.getLogger("urllib3.connectionpool").setLevel(logging.ERROR) | |
| if __name__ == "__main__": | |
| tmp_path = sys.argv[1] | |
| runtime = sys.argv[2] | |
| spec = cubed.Spec(tmp_path, allowed_mem='1GB') | |
| executor = LithopsDagExecutor() | |
| config = yaml.safe_load(open('/home/tom/Documents/Work/Code/experimentation/cubed_xarray_blog/lithops_gcf/.lithops_config')) | |
| print(config) | |
| single_reanalysis = xr.open_zarr( | |
| "gs://gcp-public-data-arco-era5/co/single-level-reanalysis.zarr", | |
| chunks=None, | |
| consolidated=True, | |
| ) | |
| # optionally downsample here | |
| # Do indexing before chunking to avoid creating big empty arrays for cubed to track (same trick works for dask) | |
| ds = single_reanalysis[['u100', 'v100']].isel(time=slice(0, 1000)) | |
| print(ds) | |
| ds = ds.chunk( | |
| chunks={'time': 1}, | |
| chunked_array_type='cubed', | |
| from_array_kwargs={'spec': spec}, | |
| ) | |
| print(ds['u100'].chunksizes) | |
| print(f'size of dataset to process in GB = {ds.nbytes / 1e9}') # size in GB | |
| quadratic_products = ds**2 | |
| quadratic_products["uv"] = ds['u100'] * ds['v100'] | |
| mean = quadratic_products.mean("time", skipna=False) # skipna=False to avoid eager load, see xarray issue #7243) | |
| with logging_redirect_tqdm(): | |
| progress = TqdmProgressBar() | |
| history = HistoryCallback() | |
| timeline_viz = TimelineVisualizationCallback() | |
| result = mean.compute( | |
| callbacks=[progress, history, timeline_viz], | |
| executor=executor, | |
| config=config, | |
| runtime="cubed-runtime", | |
| runtime_memory=2048, | |
| #use_backups=True, | |
| ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment