Skip to content

Instantly share code, notes, and snippets.

@goraj
Created February 4, 2019 18:58
Show Gist options
  • Save goraj/e0135867e2615dbd1c79ac784da14980 to your computer and use it in GitHub Desktop.
Save goraj/e0135867e2615dbd1c79ac784da14980 to your computer and use it in GitHub Desktop.
dask.py
import time
import numpy as np
import pandas as pd
from dask.distributed import Client
from dask_jobqueue import SGECluster
from distributed.deploy.local import LocalCluster
from math import sqrt
from joblib import Parallel, delayed
N_CORES = 2
def do_process():
return Parallel(n_jobs=N_CORES, backend='loky')(delayed(sqrt)(i ** 2) for i in range(100))
def test():
wait_for_complete_pool = False
use_cluster = True
# number of method calls
n_workers = 12
# number of cores available inside each call
n_cores = N_CORES
memory = '1GB'
cluster = SGECluster(queue='default.q',
processes=1, # number of python processes per job
cores=n_cores, # cores per job
memory='1GB',
#walltime='14400',
walltime='500',
env_extra=['export PYTHONPATH=/clscratch/goraja1/pycharm_project'],
extra=['--resources processes=1'],
resource_spec=f'm_mem_free={memory[:-1]}',
)
cluster.job_header += f'\n#$ -pe smp {n_cores}\n'
print(cluster.job_script())
cluster.scale(n_workers)
with Client(cluster) as client:
pool = []
if wait_for_complete_pool:
while use_cluster and (client.status == 'running') and (len(client.scheduler_info()['workers']) < n_workers):
time.sleep(1.0)
for i in range(n_workers):
pool.append(
client.submit(do_process, pure=False, resources={'processes': 1})
)
print('gathering results')
result = client.gather(pool)
print(f'result: {[e[:5] for e in result]}')
try:
cluster.close()
except:
pass
if __name__ == '__main__':
test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment