Skip to content

Instantly share code, notes, and snippets.

@stephanie-wang
Last active May 28, 2021 15:21
Show Gist options
  • Save stephanie-wang/f4d061ca237837f79069807cf88e6c14 to your computer and use it in GitHub Desktop.
Save stephanie-wang/f4d061ca237837f79069807cf88e6c14 to your computer and use it in GitHub Desktop.
Sorting on Dask
num_nodes nbytes npartitions dask_tasks dask_nprocs dask_nthreads dask_memlimit duration
1 1000000000 100 False 0 0 0 12.28133487701416
1 1000000000 100 False 0 0 0 11.294680833816528
1 1000000000 100 False 0 0 0 11.143301963806152
1 1000000000 100 False 0 0 0 10.956552743911743
1 1000000000 100 False 0 0 0 11.068711757659912
1 1000000000 100 False 0 0 0 11.079143285751343
1 10000000000 100 False 0 0 0 114.72856569290161
1 20000000000 100 False 0 0 0 258.343745470047
1 100000000000 100 False 0 0 0 1911.8010439872742
1 10000000000 100 False 1 32 -1 437.6499922275543
1 1000000000 100 False 32 1 -1 14.642318964004517
1 1000000000 100 False 32 1 -1 14.439082622528076
1 1000000000 100 False 32 1 -1 13.660181999206543
1 1000000000 100 False 32 1 -1 13.599586963653564
1 1000000000 100 False 32 1 -1 13.238817930221558
1 10000000000 100 False 32 1 -1 121.73604011535645
1 20000000000 100 False 32 1 -1 x
1 20000000000 200 False 32 1 -1 234.0025613307953
1 100000000000 100 False 32 1 -1 x
1 100000000000 1000 False 32 1 -1 x
1 1000000000 100 False 8 1 30_000_000_000 30.35613703727722
1 1000000000 100 False 8 1 30_000_000_000 25.762451887130737
1 1000000000 100 False 8 1 30_000_000_000 26.792430639266968
1 10000000000 100 False 8 1 30_000_000_000 323.0174648761749
1 20000000000 100 False 8 1 30_000_000_000 598.4368667602539
1 100000000000 100 False 8 1 30_000_000_000 x
distributed.core - INFO - Event loop was unresponsive in Scheduler for 82.19s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.comm.tcp - INFO - Connection closed before handshake completed
distributed.comm.tcp - INFO - Connection closed before handshake completed
distributed.comm.tcp - INFO - Connection closed before handshake completed
distributed.comm.tcp - INFO - Connection closed before handshake completed
distributed.comm.tcp - INFO - Connection closed before handshake completed
distributed.comm.tcp - INFO - Connection closed before handshake completed
distributed.comm.tcp - INFO - Connection closed before handshake completed
distributed.comm.tcp - INFO - Connection closed before handshake completed
distributed.comm.tcp - INFO - Connection closed before handshake completed
distributed.comm.tcp - INFO - Connection closed before handshake completed
distributed.comm.tcp - INFO - Connection closed before handshake completed
distributed.comm.tcp - INFO - Connection closed before handshake completed
distributed.comm.tcp - INFO - Connection closed before handshake completed
distributed.comm.tcp - INFO - Connection closed before handshake completed
distributed.comm.tcp - INFO - Connection closed before handshake completed
distributed.comm.tcp - INFO - Connection closed before handshake completed
distributed.comm.tcp - INFO - Connection closed before handshake completed
distributed.comm.tcp - INFO - Connection closed before handshake completed
distributed.comm.tcp - INFO - Connection closed before handshake completed
distributed.comm.tcp - INFO - Connection closed before handshake completed
distributed.comm.tcp - INFO - Connection closed before handshake completed
distributed.comm.tcp - INFO - Connection closed before handshake completed
distributed.comm.tcp - INFO - Connection closed before handshake completed
distributed.comm.tcp - INFO - Connection closed before handshake completed
distributed.comm.tcp - INFO - Connection closed before handshake completed
distributed.comm.tcp - INFO - Connection closed before handshake completed
distributed.comm.tcp - INFO - Connection closed before handshake completed
distributed.comm.tcp - INFO - Connection closed before handshake completed
distributed.comm.tcp - INFO - Connection closed before handshake completed
distributed.comm.tcp - INFO - Connection closed before handshake completed
distributed.comm.tcp - INFO - Connection closed before handshake completed
distributed.comm.tcp - INFO - Connection closed before handshake completed
distributed.comm.tcp - INFO - Connection closed before handshake completed
distributed.comm.tcp - INFO - Connection closed before handshake completed
distributed.comm.tcp - INFO - Connection closed before handshake completed
distributed.core - INFO - Event loop was unresponsive in Scheduler for 6.21s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
...
distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://172.30.0.20:32773', name: tcp://172.30.0.20:32773, memory: 535, processing: 45104>
distributed.core - INFO - Removing comms to tcp://172.30.0.20:32773
distributed.scheduler - INFO - Task ('split-simple-shuffle-6a633cadf5a2c4f1600bc3d86e0694cc', 592, 740) marked as failed because 3 workers died while trying to run it
distributed.scheduler - INFO - Task ('split-simple-shuffle-6a633cadf5a2c4f1600bc3d86e0694cc', 322, 740) marked as failed because 3 workers died while trying to run it
distributed.scheduler - INFO - Task ('split-simple-shuffle-6a633cadf5a2c4f1600bc3d86e0694cc', 154, 740) marked as failed because 3 workers died while trying to run it
distributed.scheduler - INFO - Task ('split-simple-shuffle-6a633cadf5a2c4f1600bc3d86e0694cc', 810, 740) marked as failed because 3 workers died while trying to run it
distributed.scheduler - INFO - Task ('split-simple-shuffle-6a633cadf5a2c4f1600bc3d86e0694cc', 896, 740) marked as failed because 3 workers died while trying to run it
distributed.scheduler - INFO - Task ('split-simple-shuffle-6a633cadf5a2c4f1600bc3d86e0694cc', 253, 740) marked as failed because 3 workers died while trying to run it
distributed.scheduler - INFO - Task ('split-simple-shuffle-6a633cadf5a2c4f1600bc3d86e0694cc', 964, 740) marked as failed because 3 workers died while trying to run it
distributed.scheduler - INFO - Task ('split-simple-shuffle-6a633cadf5a2c4f1600bc3d86e0694cc', 939, 740) marked as failed because 3 workers died while trying to run it
...
distributed.core - INFO - Event loop was unresponsive in Scheduler for 60.94s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause time
outs and instability.
Traceback (most recent call last):
File "test_sort.py", line 147, in <module>
output = trial(client, args.data_dir, args.s3_bucket, args.nbytes, npartitions, args.generate_only)
File "test_sort.py", line 73, in trial
print(df.set_index('a', shuffle='tasks', max_branch=float('inf')).head(10, npartitions=-1))
File "/home/ubuntu/anaconda3/envs/dask-38/lib/python3.8/site-packages/dask/dataframe/core.py", line 1050, in head
return self._head(n=n, npartitions=npartitions, compute=compute, safe=True)
File "/home/ubuntu/anaconda3/envs/dask-38/lib/python3.8/site-packages/dask/dataframe/core.py", line 1083, in _head
result = result.compute()
File "/home/ubuntu/anaconda3/envs/dask-38/lib/python3.8/site-packages/dask/base.py", line 285, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/home/ubuntu/anaconda3/envs/dask-38/lib/python3.8/site-packages/dask/base.py", line 567, in compute
results = schedule(dsk, keys, **kwargs)
File "/home/ubuntu/anaconda3/envs/dask-38/lib/python3.8/site-packages/distributed/client.py", line 2674, in get
results = self.gather(packed, asynchronous=asynchronous, direct=direct)
File "/home/ubuntu/anaconda3/envs/dask-38/lib/python3.8/site-packages/distributed/client.py", line 1983, in gather
return self.sync(
File "/home/ubuntu/anaconda3/envs/dask-38/lib/python3.8/site-packages/distributed/client.py", line 851, in sync
return sync(
File "/home/ubuntu/anaconda3/envs/dask-38/lib/python3.8/site-packages/distributed/utils.py", line 354, in sync
raise exc.with_traceback(tb)
File "/home/ubuntu/anaconda3/envs/dask-38/lib/python3.8/site-packages/distributed/utils.py", line 337, in f
result[0] = yield future
File "/home/ubuntu/anaconda3/envs/dask-38/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/home/ubuntu/anaconda3/envs/dask-38/lib/python3.8/site-packages/distributed/client.py", line 1848, in _gather
raise exception.with_traceback(traceback)
distributed.scheduler.KilledWorker: ("('split-simple-shuffle-6a633cadf5a2c4f1600bc3d86e0694cc', 239, 740)", <WorkerState 'tcp://172.30.0.20:32773', name: tcp://172.30.0.20:32773, memory: 0, processing: 45104>)
import dask
import dask.dataframe as dd
import json
import pandas as pd
import numpy as np
import os.path
import csv
import boto3
from dask.distributed import Client
from dask.distributed import wait
import time
def load_dataset(client, data_dir, s3_bucket, nbytes, npartitions):
num_bytes_per_partition = nbytes // npartitions
filenames = []
@dask.delayed
def generate_s3_file(i, data_dir, s3_bucket):
s3 = boto3.client('s3')
key = "df-{}-{}.parquet.gzip".format(num_bytes_per_partition, i)
contents = s3.list_objects(Bucket=s3_bucket)
for obj in contents['Contents']:
if obj['Key'] == key:
print(f"S3 partition {i} exists")
return
filename = os.path.join(data_dir, key)
if not os.path.exists(filename):
print("Generating partition", filename)
nrows = num_bytes_per_partition // 8
dataset = pd.DataFrame(np.random.randint(0, np.iinfo(np.int64).max, size=(nrows, 1), dtype=np.int64), columns=['a'])
dataset.to_parquet(filename, compression='gzip')
print("Writing partition to S3", filename)
with open(filename, 'rb') as f:
s3.put_object(Bucket=s3_bucket, Key=key, Body=f)
#for i in range(npartitions):
# filenames.append(foo(i, data_dir))
#filenames = dask.compute(filenames)[0]
x = []
for i in range(npartitions):
x.append(generate_s3_file(i, data_dir, s3_bucket))
dask.compute(x)
#filenames = []
#for i in range(npartitions):
# filename = "df-{}-{}.parquet.gzip".format(num_bytes_per_partition, i)
# filenames.append(f"s3://dask-on-ray-data/{filename}")
filenames = [f's3://{s3_bucket}/df-{num_bytes_per_partition}-{i}.parquet.gzip' for i in range(npartitions)]
df = dd.read_parquet(filenames)
return df
def trial(client, data_dir, s3_bucket, nbytes, n_partitions, generate_only):
df = load_dataset(client, data_dir, s3_bucket, nbytes, n_partitions)
if generate_only:
return
times = []
start = time.time()
for i in range(10):
print("Trial {} start".format(i))
trial_start = time.time()
print(df.set_index('a', shuffle='tasks', max_branch=float('inf')).head(10, npartitions=-1))
trial_end = time.time()
duration = trial_end - trial_start
times.append(duration)
print("Trial {} done after {}".format(i, duration))
if time.time() - start > 60:
break
return times
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--nbytes", type=int, default=1_000_000)
parser.add_argument("--npartitions", type=int, default=100, required=False)
# Max partition size is 1GB.
parser.add_argument("--max-partition-size", type=int, default=1000_000_000, required=False)
parser.add_argument("--num-nodes", type=int, default=1)
parser.add_argument("--dask-tasks", action="store_true")
parser.add_argument("--generate-only", action="store_true")
parser.add_argument("--ray", action="store_true")
parser.add_argument("--data-dir", default="/home/ubuntu/dask-benchmarks")
parser.add_argument("--s3-bucket", default="dask-on-ray-data")
parser.add_argument("--dask-nprocs", type=int, default=0)
parser.add_argument("--dask-nthreads", type=int, default=0)
parser.add_argument("--dask-memlimit", type=int, default=0)
args = parser.parse_args()
if args.ray:
import ray
ray.init(address='auto')
from ray.util.dask import ray_dask_get, dataframe_optimize
dask.config.set(scheduler=ray_dask_get, dataframe_optimize=dataframe_optimize)
client = None
else:
assert args.dask_nprocs != -0
assert args.dask_nthreads != -0
assert args.dask_memlimit != -0
if args.dask_tasks:
print("Using task-based Dask shuffle")
dask.config.set(shuffle='tasks')
else:
print("Using disk-based Dask shuffle")
#from dask.distributed import LocalCluster
#kwargs = {
# 'n_workers': args.dask_nprocs,
# 'threads_per_worker': args.dask_nthreads,
# 'memory_target_fraction': 0.2,
# 'memory_spill_fraction': 0.4,
# 'memory_pause_fraction': 0.6,
# 'local_directory': '/data0',
# }
#if args.dask_memlimit != -1:
# kwargs['memory_limit'] = args.dask_memlimit
#print(kwargs)
#cluster = LocalCluster(**kwargs)
client = Client('localhost:8786')
print(trial(client, args.data_dir, args.s3_bucket, 1000, 10, args.generate_only))
print("WARMUP DONE")
npartitions = args.npartitions
if args.nbytes // npartitions > args.max_partition_size:
npartitions = args.nbytes // args.max_partition_size
output = trial(client, args.data_dir, args.s3_bucket, args.nbytes, npartitions, args.generate_only)
print("mean over {} trials: {} +- {}".format(len(output), np.mean(output), np.std(output)))
write_header = not os.path.exists("output.csv") or os.path.getsize("output.csv") == 0
with open("output.csv", "a+") as csvfile:
fieldnames = ["num_nodes", "nbytes", "npartitions", "dask_tasks", "dask_nprocs", "dask_nthreads", "dask_memlimit", "duration"]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
if write_header:
writer.writeheader()
row = {
"num_nodes": args.num_nodes,
"nbytes": args.nbytes,
"npartitions": npartitions,
"dask_tasks": args.dask_tasks,
"dask_nprocs": args.dask_nprocs,
"dask_nthreads": args.dask_nthreads,
"dask_memlimit": args.dask_memlimit,
}
for output in output:
row["duration"] = output
writer.writerow(row)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment