Last active
August 2, 2021 17:46
-
-
Save rabernat/39d8b6a396e076d168c24167b8871c4b to your computer and use it in GitHub Desktop.
Test case for dask distributed scheduler
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Xarray / Dask Climatology Benchmark\n", | |
"\n", | |
"Notebook designed to debug the issue described in <https://github.com/dask/distributed/issues/2602>\n", | |
"\n", | |
"This has been tested with Dask 2020.12.0 and Dask 2021.07.1." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import dask\n", | |
"dask.__version__" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Dask Cluster Settings" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"nworkers = 30\n", | |
"worker_memory = 8\n", | |
"worker_cores = 1\n", | |
"use_MALLOC_TRIM_THRESHOLD = True" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"from dask_gateway import Gateway\n", | |
"g = Gateway()\n", | |
"options = g.cluster_options()\n", | |
"# set the options programatically, or through their HTML repr\n", | |
"options.worker_memory = worker_memory\n", | |
"options.worker_cores = worker_cores\n", | |
"if use_MALLOC_TRIM_THRESHOLD:\n", | |
" options.environment = {\"MALLOC_TRIM_THRESHOLD_\": \"0\"}\n", | |
"\n", | |
"display(options)\n", | |
"cluster = g.new_cluster(options)\n", | |
"cluster" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"cluster.scale(nworkers)\n", | |
"client = cluster.get_client()\n", | |
"client.wait_for_workers(nworkers)\n", | |
"client" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Synthetic Data Example 1\n", | |
"\n", | |
"Represetntative of computing standard deviation of climatological anomaly." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import dask.array as dsa\n", | |
"import numpy as np\n", | |
"import xarray as xr\n", | |
"\n", | |
"data = dsa.random.random((10000, 1000000), chunks=(1, 1000000))\n", | |
"da = xr.DataArray(data, dims=['time', 'x'],\n", | |
" coords={'day': ('time', np.arange(10000) % 100)})\n", | |
"clim = da.groupby('day').mean(dim='time')\n", | |
"anom = da.groupby('day') - clim\n", | |
"anom_std = anom.std(dim='time')" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# without use_MALLOC_TRIM_THRESHOLD, workers die\n", | |
"# with use_MALLOC_TRIM_THRESHOLD:\n", | |
"# Dask 2020.12.0: 2min 17s\n", | |
"# Dask 2021.07.1: 2min 21s\n", | |
"\n", | |
"%time anom_std.load()" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Synthetic Data Example 2\n", | |
"\n", | |
"Representative of calculating forecast skill from an ensemble of weather predictions. " | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import pandas as pd\n", | |
"\n", | |
"size = (28, 237, 48, 21, 90, 144)\n", | |
"chunks = (1, 1, 48, 21, 90, 144)\n", | |
"arr = dsa.random.random(size, chunks=chunks)\n", | |
"arr\n", | |
"\n", | |
"items = dict(\n", | |
" ensemble = np.arange(size[0]),\n", | |
" init_date = pd.date_range(start='1960', periods=size[1]),\n", | |
" lat = np.arange(size[2]).astype(float),\n", | |
" lead_time = np.arange(size[3]),\n", | |
" level = np.arange(size[4]).astype(float),\n", | |
" lon = np.arange(size[5]).astype(float),\n", | |
")\n", | |
"dims, coords = zip(*list(items.items()))\n", | |
"\n", | |
"array = xr.DataArray(arr, coords=coords, dims=dims)\n", | |
"dset = xr.Dataset({'data': array})\n", | |
"display(dset)\n", | |
"\n", | |
"result = dset['data'].groupby(\"init_date.month\").mean(dim=[\"init_date\", \"ensemble\"])\n", | |
"result" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# Dask 2020.12.0: 6min 49s\n", | |
"# Dask 2021.07.1: 4min 13s\n", | |
"\n", | |
"%time result.compute();" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": { | |
"tags": [] | |
}, | |
"source": [ | |
"## Real Data Example" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"from intake import open_catalog\n", | |
"cat = open_catalog(\"https://raw.githubusercontent.com/pangeo-data/pangeo-datastore/master/intake-catalogs/ocean.yaml\")\n", | |
"ds = cat[\"sea_surface_height\"].to_dask()\n", | |
"ds" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"sla = ds.sla\n", | |
"sla_gb = sla.groupby('time.dayofyear')\n", | |
"sla_clim = sla_gb.mean(dim='time')\n", | |
"sla_anom = sla_gb - sla_clim\n", | |
"sla_anom_std = sla_anom.std(dim='time')\n", | |
"sla_anom_std" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# With Dask 2020.12.0: 2min 58s\n", | |
"# With Dask 2021:07.1: 2min 6s\n", | |
"\n", | |
"%time sla_anom_std.load()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"sla_anom_std.plot(figsize=(20, 12))" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"client.close()\n", | |
"cluster.scale(0)\n", | |
"cluster.close()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"cluster" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python 3 (ipykernel)", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.8.10" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 4 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment