Skip to content

Instantly share code, notes, and snippets.

@andersy005
Last active October 17, 2018 05:44
Show Gist options
  • Save andersy005/415dc0f36e3abc888be054b6eb3b491b to your computer and use it in GitHub Desktop.
Save andersy005/415dc0f36e3abc888be054b6eb3b491b to your computer and use it in GitHub Desktop.
DaskMPIRUN
#!/bin/bash
#PBS -N dask-scheduler
#PBS -q economy
#PBS -A NIOW0001
#PBS -l select=1:ncpus=36:mpiprocs=6:ompthreads=6
#PBS -l walltime=00:30:00
#PBS -j oe
# module purge
module load gnu
module load mpt
module load conda
# Writes ~/scheduler.json file in home directory
# Connect with
# >>> from dask.distributed import Client
# >>> client = Client(scheduler_file='~/scheduler.json')
# Setup Environment
source activate pangeo
SCHEDULER=/glade/scratch/$USER/scheduler.json
rm -f $SCHEDULER
mpirun -np 6 dask-mpi --nthreads 6 \
--memory-limit 12e9 \
--interface ib0 \
--local-directory /glade/scratch/$USER \
--scheduler-file=$SCHEDULER
#!/usr/bin/env python
from functools import partial
from mpi4py import MPI
from sys import exit
from tornado.ioloop import IOLoop
from tornado import gen
from distributed import Scheduler, Nanny, Worker
from distributed.bokeh.worker import BokehWorker
from distributed.cli.utils import check_python_3, uri_from_host_port
from distributed.utils import get_ip_interface
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
class SomeMPIClass(object):
def __init__(self, scheduler_file='scheduler.json', interface=None, nthreads=0, local_directory='', memory_limit='auto',
scheduler=True, bokeh_port=8787, bokeh_prefix=None, nanny=True, bokeh_worker_port=8789):
# Launch Dask Scheduler HERE
if interface:
host = get_ip_interface(interface)
else:
host = None
scheduler = Scheduler()
addr = uri_from_host_port(host, None, 8786)
scheduler.start(addr)
print(scheduler)
if rank > 0:
# Launch Dask Workers HERE
comm.Barrier()
exit(0)
# Construct cluster object information
def __del__(self):
if rank == 0:
comm.Barrier()
def rank(self):
return rank
if __name__=="__main__":
a = SomeMPIClass()
print(a.rank())
#!/usr/bin/env python
from __future__ import print_function
from mpi4py import MPI
comm = MPI.COMM_WORLD
print("Hello! I'm rank %d from %d running in total..." % (comm.rank, comm.size))
comm.Barrier() # wait for everybody to synchronize _here_
#!/bin/bash
#PBS -N mpi4py-test
#PBS -q economy
#PBS -A NIOW0001
#PBS -l select=1:ncpus=36:mpiprocs=6:ompthreads=6
#PBS -l walltime=00:30:00
#PBS -j oe
module purge
module load gnu
module load mpt
module load conda
# Writes ~/scheduler.json file in home directory
# Connect with
# >>> from dask.distributed import Client
# >>> client = Client(scheduler_file='~/scheduler.json')
# Setup Environment
source activate pangeo
mpirun -np 36 which python
mpirun -np 36 python dask-test-mpi.py
#!/usr/bin/env bash
qsub -I -l select=2:ncpus=1:mpiprocs=1 -l walltime=00:20:00 -q economy -A NTDD0005
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment