-
-
Save poplarShift/a58dbcb4a3f8c798024fd3e553bd8297 to your computer and use it in GitHub Desktop.
Dask-Jobqueue SLURMCluster with Singularity
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Dask-Jobqueue with Singularity" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Need adapter for sbatch, scancel, squeue via SSH" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"mkdir -p bin" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"Overwriting bin/sbatch\n" | |
] | |
} | |
], | |
"source": [ | |
"%%file bin/sbatch\n", | |
"#!/usr/bin/env bash\n", | |
"\n", | |
"# dirty hack: This cluster has single-tenant nodes only and hence\n", | |
"# does not accept users trying to specify memory requirements.\n", | |
"# So instead of teaching Dask-Jobqueue to make the memory spec optional, \n", | |
"# we remove the conflicting header line just before submitting the job.\n", | |
"grep -v '#SBATCH \\-\\-mem=' ${1} > ${1}.corr\n", | |
"\n", | |
"# SSH back to the host (var is passed through to singularity anyway).\n", | |
"# Then, source /etc/profile to make it feel like an interactive session, \n", | |
"# and submit the job.\n", | |
"ssh $(hostname) -q -t \". /etc/profile && sbatch ${1}.corr\"" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"Overwriting bin/scancel\n" | |
] | |
} | |
], | |
"source": [ | |
"%%file bin/scancel\n", | |
"#!/usr/bin/env bash\n", | |
"\n", | |
"# See bin/sbatch for details\n", | |
"ssh $(hostname) -q -t \". /etc/profile && scancel $@\"" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 4, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"Overwriting bin/squeue\n" | |
] | |
} | |
], | |
"source": [ | |
"%%file bin/squeue\n", | |
"#!/usr/bin/env bash\n", | |
"\n", | |
"# See bin/sbatch for details\n", | |
"ssh $(hostname) -q -t \". /etc/profile && squeue $@\"" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 5, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"!chmod -R 755 bin/s*" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 6, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import os\n", | |
"pwd = !echo ${PWD}\n", | |
"os.environ[\"PATH\"] = f\"{pwd[0]}/bin:{os.environ['PATH']}\"" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Set up dask jobqueue\n", | |
"\n", | |
"All we really need to do is make sure the generated job script loads the singularity module and uses a containered python executable." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 7, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"from dask_jobqueue import SLURMCluster\n", | |
"from dask.distributed import Client\n", | |
"import os" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 8, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"containered_python_exe = f\"singularity run {os.environ['SINGULARITY_CONTAINER']} python\"" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 9, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"cluster = SLURMCluster(\n", | |
" cores=40, memory=\"170G\", processes=4,\n", | |
" project=\"shk00023\",\n", | |
" queue='medium40:test', interface=\"ib0\",\n", | |
" walltime=\"00:20:00\",\n", | |
" env_extra=['module load singularity',], # ensure singularity is loaded\n", | |
" python=containered_python_exe, # use pyhton in container\n", | |
")" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 10, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"#!/usr/bin/env bash\n", | |
"\n", | |
"#SBATCH -J dask-worker\n", | |
"#SBATCH -p medium40:test\n", | |
"#SBATCH -A shk00023\n", | |
"#SBATCH -n 1\n", | |
"#SBATCH --cpus-per-task=40\n", | |
"#SBATCH --mem=159G\n", | |
"#SBATCH -t 00:20:00\n", | |
"module load singularity\n", | |
"singularity run /home/shkifmwr/.singularity/cache/oci-tmp/0d046f7b1e98ec23133a61a71d65f550f300c8928db0d0da5ec99d4d524e962e/esm-vfc-stacks_latest.sif python -m distributed.cli.dask_worker tcp://10.246.201.2:42036 --nthreads 10 --nprocs 4 --memory-limit 42.50GB --name name --nanny --death-timeout 60 --interface ib0\n", | |
"\n" | |
] | |
} | |
], | |
"source": [ | |
"print(cluster.job_script())" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 11, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/html": [ | |
"<table style=\"border: 2px solid white;\">\n", | |
"<tr>\n", | |
"<td style=\"vertical-align: top; border: 0px solid white\">\n", | |
"<h3 style=\"text-align: left;\">Client</h3>\n", | |
"<ul style=\"text-align: left; list-style: none; margin: 0; padding: 0;\">\n", | |
" <li><b>Scheduler: </b>tcp://10.246.201.2:42036</li>\n", | |
" <li><b>Dashboard: </b><a href='http://10.246.201.2:8787/status' target='_blank'>http://10.246.201.2:8787/status</a></li>\n", | |
"</ul>\n", | |
"</td>\n", | |
"<td style=\"vertical-align: top; border: 0px solid white\">\n", | |
"<h3 style=\"text-align: left;\">Cluster</h3>\n", | |
"<ul style=\"text-align: left; list-style:none; margin: 0; padding: 0;\">\n", | |
" <li><b>Workers: </b>0</li>\n", | |
" <li><b>Cores: </b>0</li>\n", | |
" <li><b>Memory: </b>0 B</li>\n", | |
"</ul>\n", | |
"</td>\n", | |
"</tr>\n", | |
"</table>" | |
], | |
"text/plain": [ | |
"<Client: 'tcp://10.246.201.2:42036' processes=0 threads=0, memory=0 B>" | |
] | |
}, | |
"metadata": {}, | |
"output_type": "display_data" | |
} | |
], | |
"source": [ | |
"client = Client(cluster)\n", | |
"display(client)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Let's scale the cluster to 12 nodes" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 12, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"cluster.scale(jobs=12)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 14, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
" JOBID PARTITION NAME USER ACCOUNT STATE TIME NODES NODELIST(REASON)\n", | |
" 2337935 medium40:tes dask-worker shkifmwr shk00023 PENDING 0:00 1 (None)\n", | |
" 2337931 medium40:tes dask-worker shkifmwr shk00023 RUNNING 0:01 1 gcn1276\n", | |
" 2337932 medium40:tes dask-worker shkifmwr shk00023 RUNNING 0:01 1 gcn1277\n", | |
" 2337933 medium40:tes dask-worker shkifmwr shk00023 RUNNING 0:01 1 gcn1033\n", | |
" 2337934 medium40:tes dask-worker shkifmwr shk00023 RUNNING 0:01 1 gcn1034\n", | |
" 2337928 medium40:tes dask-worker shkifmwr shk00023 RUNNING 0:04 1 gcn1108\n", | |
" 2337929 medium40:tes dask-worker shkifmwr shk00023 RUNNING 0:04 1 gcn1224\n", | |
" 2337930 medium40:tes dask-worker shkifmwr shk00023 RUNNING 0:04 1 gcn1225\n", | |
" 2337925 medium40:tes dask-worker shkifmwr shk00023 RUNNING 0:07 1 gcn1353\n", | |
" 2337926 medium40:tes dask-worker shkifmwr shk00023 RUNNING 0:07 1 gcn1404\n", | |
" 2337927 medium40:tes dask-worker shkifmwr shk00023 RUNNING 0:07 1 gcn1107\n", | |
" 2337924 medium40:tes dask-worker shkifmwr shk00023 RUNNING 0:10 1 gcn1141\n" | |
] | |
} | |
], | |
"source": [ | |
"!bin/squeue -u $USER" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 15, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/html": [ | |
"<table style=\"border: 2px solid white;\">\n", | |
"<tr>\n", | |
"<td style=\"vertical-align: top; border: 0px solid white\">\n", | |
"<h3 style=\"text-align: left;\">Client</h3>\n", | |
"<ul style=\"text-align: left; list-style: none; margin: 0; padding: 0;\">\n", | |
" <li><b>Scheduler: </b>tcp://10.246.201.2:42036</li>\n", | |
" <li><b>Dashboard: </b><a href='http://10.246.201.2:8787/status' target='_blank'>http://10.246.201.2:8787/status</a></li>\n", | |
"</ul>\n", | |
"</td>\n", | |
"<td style=\"vertical-align: top; border: 0px solid white\">\n", | |
"<h3 style=\"text-align: left;\">Cluster</h3>\n", | |
"<ul style=\"text-align: left; list-style:none; margin: 0; padding: 0;\">\n", | |
" <li><b>Workers: </b>40</li>\n", | |
" <li><b>Cores: </b>400</li>\n", | |
" <li><b>Memory: </b>1.70 TB</li>\n", | |
"</ul>\n", | |
"</td>\n", | |
"</tr>\n", | |
"</table>" | |
], | |
"text/plain": [ | |
"<Client: 'tcp://10.246.201.2:42036' processes=28 threads=280, memory=1.19 TB>" | |
] | |
}, | |
"execution_count": 15, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"client" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## A calculation\n", | |
"\n", | |
"We use my [goto example for Dask array demos](https://nbviewer.jupyter.org/github/willirath/dask_jobqueue_workshop_materials/blob/15874fe653f2c129fd47b3a156d9a16292978505/notebooks/01_local_cluster_monte_carlo_estimate_of_pi.ipynb): An embarassingly parallel Monte Carlo estimate of the number Pi.\n", | |
"\n", | |
"[<img src=\"https://upload.wikimedia.org/wikipedia/commons/8/84/Pi_30K.gif\" width=\"300px\" alt=\"PI monte-carlo estimate\">](https://en.wikipedia.org/wiki/Pi#Monte_Carlo_methods)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 16, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"from dask import array as darr\n", | |
"import numpy as np" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 17, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"from 1000.0 GB of random data:\tpi = 3.141594476352, err = 1.8227622069488802e-06\n", | |
"\n", | |
"CPU times: user 7.31 s, sys: 656 ms, total: 7.97 s\n", | |
"Wall time: 11.3 s\n" | |
] | |
} | |
], | |
"source": [ | |
"%%time\n", | |
"\n", | |
"xy = darr.random.uniform(\n", | |
" 0, 1,\n", | |
" size=(1e12 / 16, 2), chunks=(500e6 / 16, None)\n", | |
")\n", | |
"pi = 4 * ((xy ** 2).sum(-1) < 1).mean()\n", | |
"\n", | |
"pi = pi.compute()\n", | |
"\n", | |
"print(f\"from {xy.nbytes / 1e9} GB of random data:\\tpi = {pi}, err = {pi - np.pi}\\n\")" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Adaptive clusters\n", | |
"\n", | |
"We'll set a target duration of 2 minutes to perform the same calculation with 5 TB of random data." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 18, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"ca = cluster.adapt(\n", | |
" minimum=4, maximum=48,\n", | |
" target_duration=\"1200\", # measured in CPU time per worker\n", | |
" # -> 120 seconds at 10 cores / worker\n", | |
" wait_count=4 # scale down more gently\n", | |
")" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 21, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
" JOBID PARTITION NAME USER ACCOUNT STATE TIME NODES NODELIST(REASON)\n", | |
" 2337927 medium40:tes dask-worker shkifmwr shk00023 RUNNING 1:26 1 gcn1107\n" | |
] | |
} | |
], | |
"source": [ | |
"!bin/squeue -u $USER" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 22, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"from 5000.0 GB of random data:\tpi = 3.1415927031936, err = 4.960380683982635e-08\n", | |
"\n", | |
"CPU times: user 1min 19s, sys: 8.31 s, total: 1min 27s\n", | |
"Wall time: 2min 53s\n" | |
] | |
} | |
], | |
"source": [ | |
"%%time\n", | |
"\n", | |
"xy = darr.random.uniform(\n", | |
" 0, 1,\n", | |
" size=(5e12 / 16, 2), chunks=(500e6 / 16, None)\n", | |
")\n", | |
"pi = 4 * ((xy ** 2).sum(-1) < 1).mean()\n", | |
"\n", | |
"pi = pi.compute()\n", | |
"\n", | |
"print(f\"from {xy.nbytes / 1e9} GB of random data:\\tpi = {pi}, err = {pi - np.pi}\\n\")" | |
] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python 3", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.7.6" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 4 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment