Last active
May 28, 2021 15:21
-
-
Save stephanie-wang/f4d061ca237837f79069807cf88e6c14 to your computer and use it in GitHub Desktop.
Sorting on Dask
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
num_nodes | nbytes | npartitions | dask_tasks | dask_nprocs | dask_nthreads | dask_memlimit | duration | |
---|---|---|---|---|---|---|---|---|
1 | 1000000000 | 100 | False | 0 | 0 | 0 | 12.28133487701416 | |
1 | 1000000000 | 100 | False | 0 | 0 | 0 | 11.294680833816528 | |
1 | 1000000000 | 100 | False | 0 | 0 | 0 | 11.143301963806152 | |
1 | 1000000000 | 100 | False | 0 | 0 | 0 | 10.956552743911743 | |
1 | 1000000000 | 100 | False | 0 | 0 | 0 | 11.068711757659912 | |
1 | 1000000000 | 100 | False | 0 | 0 | 0 | 11.079143285751343 | |
1 | 10000000000 | 100 | False | 0 | 0 | 0 | 114.72856569290161 | |
1 | 20000000000 | 100 | False | 0 | 0 | 0 | 258.343745470047 | |
1 | 100000000000 | 100 | False | 0 | 0 | 0 | 1911.8010439872742 | |
1 | 10000000000 | 100 | False | 1 | 32 | -1 | 437.6499922275543 | |
1 | 1000000000 | 100 | False | 32 | 1 | -1 | 14.642318964004517 | |
1 | 1000000000 | 100 | False | 32 | 1 | -1 | 14.439082622528076 | |
1 | 1000000000 | 100 | False | 32 | 1 | -1 | 13.660181999206543 | |
1 | 1000000000 | 100 | False | 32 | 1 | -1 | 13.599586963653564 | |
1 | 1000000000 | 100 | False | 32 | 1 | -1 | 13.238817930221558 | |
1 | 10000000000 | 100 | False | 32 | 1 | -1 | 121.73604011535645 | |
1 | 20000000000 | 100 | False | 32 | 1 | -1 | x | |
1 | 20000000000 | 200 | False | 32 | 1 | -1 | 234.0025613307953 | |
1 | 100000000000 | 100 | False | 32 | 1 | -1 | x | |
1 | 100000000000 | 1000 | False | 32 | 1 | -1 | x | |
1 | 1000000000 | 100 | False | 8 | 1 | 30_000_000_000 | 30.35613703727722 | |
1 | 1000000000 | 100 | False | 8 | 1 | 30_000_000_000 | 25.762451887130737 | |
1 | 1000000000 | 100 | False | 8 | 1 | 30_000_000_000 | 26.792430639266968 | |
1 | 10000000000 | 100 | False | 8 | 1 | 30_000_000_000 | 323.0174648761749 | |
1 | 20000000000 | 100 | False | 8 | 1 | 30_000_000_000 | 598.4368667602539 | |
1 | 100000000000 | 100 | False | 8 | 1 | 30_000_000_000 | x |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
distributed.core - INFO - Event loop was unresponsive in Scheduler for 82.19s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability. | |
distributed.comm.tcp - INFO - Connection closed before handshake completed | |
distributed.comm.tcp - INFO - Connection closed before handshake completed | |
distributed.comm.tcp - INFO - Connection closed before handshake completed | |
distributed.comm.tcp - INFO - Connection closed before handshake completed | |
distributed.comm.tcp - INFO - Connection closed before handshake completed | |
distributed.comm.tcp - INFO - Connection closed before handshake completed | |
distributed.comm.tcp - INFO - Connection closed before handshake completed | |
distributed.comm.tcp - INFO - Connection closed before handshake completed | |
distributed.comm.tcp - INFO - Connection closed before handshake completed | |
distributed.comm.tcp - INFO - Connection closed before handshake completed | |
distributed.comm.tcp - INFO - Connection closed before handshake completed | |
distributed.comm.tcp - INFO - Connection closed before handshake completed | |
distributed.comm.tcp - INFO - Connection closed before handshake completed | |
distributed.comm.tcp - INFO - Connection closed before handshake completed | |
distributed.comm.tcp - INFO - Connection closed before handshake completed | |
distributed.comm.tcp - INFO - Connection closed before handshake completed | |
distributed.comm.tcp - INFO - Connection closed before handshake completed | |
distributed.comm.tcp - INFO - Connection closed before handshake completed | |
distributed.comm.tcp - INFO - Connection closed before handshake completed | |
distributed.comm.tcp - INFO - Connection closed before handshake completed | |
distributed.comm.tcp - INFO - Connection closed before handshake completed | |
distributed.comm.tcp - INFO - Connection closed before handshake completed | |
distributed.comm.tcp - INFO - Connection closed before handshake completed | |
distributed.comm.tcp - INFO - Connection closed before handshake completed | |
distributed.comm.tcp - INFO - Connection closed before handshake completed | |
distributed.comm.tcp - INFO - Connection closed before handshake completed | |
distributed.comm.tcp - INFO - Connection closed before handshake completed | |
distributed.comm.tcp - INFO - Connection closed before handshake completed | |
distributed.comm.tcp - INFO - Connection closed before handshake completed | |
distributed.comm.tcp - INFO - Connection closed before handshake completed | |
distributed.comm.tcp - INFO - Connection closed before handshake completed | |
distributed.comm.tcp - INFO - Connection closed before handshake completed | |
distributed.comm.tcp - INFO - Connection closed before handshake completed | |
distributed.comm.tcp - INFO - Connection closed before handshake completed | |
distributed.comm.tcp - INFO - Connection closed before handshake completed | |
distributed.core - INFO - Event loop was unresponsive in Scheduler for 6.21s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability. | |
... | |
distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://172.30.0.20:32773', name: tcp://172.30.0.20:32773, memory: 535, processing: 45104> | |
distributed.core - INFO - Removing comms to tcp://172.30.0.20:32773 | |
distributed.scheduler - INFO - Task ('split-simple-shuffle-6a633cadf5a2c4f1600bc3d86e0694cc', 592, 740) marked as failed because 3 workers died while trying to run it | |
distributed.scheduler - INFO - Task ('split-simple-shuffle-6a633cadf5a2c4f1600bc3d86e0694cc', 322, 740) marked as failed because 3 workers died while trying to run it | |
distributed.scheduler - INFO - Task ('split-simple-shuffle-6a633cadf5a2c4f1600bc3d86e0694cc', 154, 740) marked as failed because 3 workers died while trying to run it | |
distributed.scheduler - INFO - Task ('split-simple-shuffle-6a633cadf5a2c4f1600bc3d86e0694cc', 810, 740) marked as failed because 3 workers died while trying to run it | |
distributed.scheduler - INFO - Task ('split-simple-shuffle-6a633cadf5a2c4f1600bc3d86e0694cc', 896, 740) marked as failed because 3 workers died while trying to run it | |
distributed.scheduler - INFO - Task ('split-simple-shuffle-6a633cadf5a2c4f1600bc3d86e0694cc', 253, 740) marked as failed because 3 workers died while trying to run it | |
distributed.scheduler - INFO - Task ('split-simple-shuffle-6a633cadf5a2c4f1600bc3d86e0694cc', 964, 740) marked as failed because 3 workers died while trying to run it | |
distributed.scheduler - INFO - Task ('split-simple-shuffle-6a633cadf5a2c4f1600bc3d86e0694cc', 939, 740) marked as failed because 3 workers died while trying to run it | |
... | |
distributed.core - INFO - Event loop was unresponsive in Scheduler for 60.94s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause time | |
outs and instability. | |
Traceback (most recent call last): | |
File "test_sort.py", line 147, in <module> | |
output = trial(client, args.data_dir, args.s3_bucket, args.nbytes, npartitions, args.generate_only) | |
File "test_sort.py", line 73, in trial | |
print(df.set_index('a', shuffle='tasks', max_branch=float('inf')).head(10, npartitions=-1)) | |
File "/home/ubuntu/anaconda3/envs/dask-38/lib/python3.8/site-packages/dask/dataframe/core.py", line 1050, in head | |
return self._head(n=n, npartitions=npartitions, compute=compute, safe=True) | |
File "/home/ubuntu/anaconda3/envs/dask-38/lib/python3.8/site-packages/dask/dataframe/core.py", line 1083, in _head | |
result = result.compute() | |
File "/home/ubuntu/anaconda3/envs/dask-38/lib/python3.8/site-packages/dask/base.py", line 285, in compute | |
(result,) = compute(self, traverse=False, **kwargs) | |
File "/home/ubuntu/anaconda3/envs/dask-38/lib/python3.8/site-packages/dask/base.py", line 567, in compute | |
results = schedule(dsk, keys, **kwargs) | |
File "/home/ubuntu/anaconda3/envs/dask-38/lib/python3.8/site-packages/distributed/client.py", line 2674, in get | |
results = self.gather(packed, asynchronous=asynchronous, direct=direct) | |
File "/home/ubuntu/anaconda3/envs/dask-38/lib/python3.8/site-packages/distributed/client.py", line 1983, in gather | |
return self.sync( | |
File "/home/ubuntu/anaconda3/envs/dask-38/lib/python3.8/site-packages/distributed/client.py", line 851, in sync | |
return sync( | |
File "/home/ubuntu/anaconda3/envs/dask-38/lib/python3.8/site-packages/distributed/utils.py", line 354, in sync | |
raise exc.with_traceback(tb) | |
File "/home/ubuntu/anaconda3/envs/dask-38/lib/python3.8/site-packages/distributed/utils.py", line 337, in f | |
result[0] = yield future | |
File "/home/ubuntu/anaconda3/envs/dask-38/lib/python3.8/site-packages/tornado/gen.py", line 762, in run | |
value = future.result() | |
File "/home/ubuntu/anaconda3/envs/dask-38/lib/python3.8/site-packages/distributed/client.py", line 1848, in _gather | |
raise exception.with_traceback(traceback) | |
distributed.scheduler.KilledWorker: ("('split-simple-shuffle-6a633cadf5a2c4f1600bc3d86e0694cc', 239, 740)", <WorkerState 'tcp://172.30.0.20:32773', name: tcp://172.30.0.20:32773, memory: 0, processing: 45104>) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import dask | |
import dask.dataframe as dd | |
import json | |
import pandas as pd | |
import numpy as np | |
import os.path | |
import csv | |
import boto3 | |
from dask.distributed import Client | |
from dask.distributed import wait | |
import time | |
def load_dataset(client, data_dir, s3_bucket, nbytes, npartitions): | |
num_bytes_per_partition = nbytes // npartitions | |
filenames = [] | |
@dask.delayed | |
def generate_s3_file(i, data_dir, s3_bucket): | |
s3 = boto3.client('s3') | |
key = "df-{}-{}.parquet.gzip".format(num_bytes_per_partition, i) | |
contents = s3.list_objects(Bucket=s3_bucket) | |
for obj in contents['Contents']: | |
if obj['Key'] == key: | |
print(f"S3 partition {i} exists") | |
return | |
filename = os.path.join(data_dir, key) | |
if not os.path.exists(filename): | |
print("Generating partition", filename) | |
nrows = num_bytes_per_partition // 8 | |
dataset = pd.DataFrame(np.random.randint(0, np.iinfo(np.int64).max, size=(nrows, 1), dtype=np.int64), columns=['a']) | |
dataset.to_parquet(filename, compression='gzip') | |
print("Writing partition to S3", filename) | |
with open(filename, 'rb') as f: | |
s3.put_object(Bucket=s3_bucket, Key=key, Body=f) | |
#for i in range(npartitions): | |
# filenames.append(foo(i, data_dir)) | |
#filenames = dask.compute(filenames)[0] | |
x = [] | |
for i in range(npartitions): | |
x.append(generate_s3_file(i, data_dir, s3_bucket)) | |
dask.compute(x) | |
#filenames = [] | |
#for i in range(npartitions): | |
# filename = "df-{}-{}.parquet.gzip".format(num_bytes_per_partition, i) | |
# filenames.append(f"s3://dask-on-ray-data/{filename}") | |
filenames = [f's3://{s3_bucket}/df-{num_bytes_per_partition}-{i}.parquet.gzip' for i in range(npartitions)] | |
df = dd.read_parquet(filenames) | |
return df | |
def trial(client, data_dir, s3_bucket, nbytes, n_partitions, generate_only): | |
df = load_dataset(client, data_dir, s3_bucket, nbytes, n_partitions) | |
if generate_only: | |
return | |
times = [] | |
start = time.time() | |
for i in range(10): | |
print("Trial {} start".format(i)) | |
trial_start = time.time() | |
print(df.set_index('a', shuffle='tasks', max_branch=float('inf')).head(10, npartitions=-1)) | |
trial_end = time.time() | |
duration = trial_end - trial_start | |
times.append(duration) | |
print("Trial {} done after {}".format(i, duration)) | |
if time.time() - start > 60: | |
break | |
return times | |
if __name__ == '__main__': | |
import argparse | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--nbytes", type=int, default=1_000_000) | |
parser.add_argument("--npartitions", type=int, default=100, required=False) | |
# Max partition size is 1GB. | |
parser.add_argument("--max-partition-size", type=int, default=1000_000_000, required=False) | |
parser.add_argument("--num-nodes", type=int, default=1) | |
parser.add_argument("--dask-tasks", action="store_true") | |
parser.add_argument("--generate-only", action="store_true") | |
parser.add_argument("--ray", action="store_true") | |
parser.add_argument("--data-dir", default="/home/ubuntu/dask-benchmarks") | |
parser.add_argument("--s3-bucket", default="dask-on-ray-data") | |
parser.add_argument("--dask-nprocs", type=int, default=0) | |
parser.add_argument("--dask-nthreads", type=int, default=0) | |
parser.add_argument("--dask-memlimit", type=int, default=0) | |
args = parser.parse_args() | |
if args.ray: | |
import ray | |
ray.init(address='auto') | |
from ray.util.dask import ray_dask_get, dataframe_optimize | |
dask.config.set(scheduler=ray_dask_get, dataframe_optimize=dataframe_optimize) | |
client = None | |
else: | |
assert args.dask_nprocs != -0 | |
assert args.dask_nthreads != -0 | |
assert args.dask_memlimit != -0 | |
if args.dask_tasks: | |
print("Using task-based Dask shuffle") | |
dask.config.set(shuffle='tasks') | |
else: | |
print("Using disk-based Dask shuffle") | |
#from dask.distributed import LocalCluster | |
#kwargs = { | |
# 'n_workers': args.dask_nprocs, | |
# 'threads_per_worker': args.dask_nthreads, | |
# 'memory_target_fraction': 0.2, | |
# 'memory_spill_fraction': 0.4, | |
# 'memory_pause_fraction': 0.6, | |
# 'local_directory': '/data0', | |
# } | |
#if args.dask_memlimit != -1: | |
# kwargs['memory_limit'] = args.dask_memlimit | |
#print(kwargs) | |
#cluster = LocalCluster(**kwargs) | |
client = Client('localhost:8786') | |
print(trial(client, args.data_dir, args.s3_bucket, 1000, 10, args.generate_only)) | |
print("WARMUP DONE") | |
npartitions = args.npartitions | |
if args.nbytes // npartitions > args.max_partition_size: | |
npartitions = args.nbytes // args.max_partition_size | |
output = trial(client, args.data_dir, args.s3_bucket, args.nbytes, npartitions, args.generate_only) | |
print("mean over {} trials: {} +- {}".format(len(output), np.mean(output), np.std(output))) | |
write_header = not os.path.exists("output.csv") or os.path.getsize("output.csv") == 0 | |
with open("output.csv", "a+") as csvfile: | |
fieldnames = ["num_nodes", "nbytes", "npartitions", "dask_tasks", "dask_nprocs", "dask_nthreads", "dask_memlimit", "duration"] | |
writer = csv.DictWriter(csvfile, fieldnames=fieldnames) | |
if write_header: | |
writer.writeheader() | |
row = { | |
"num_nodes": args.num_nodes, | |
"nbytes": args.nbytes, | |
"npartitions": npartitions, | |
"dask_tasks": args.dask_tasks, | |
"dask_nprocs": args.dask_nprocs, | |
"dask_nthreads": args.dask_nthreads, | |
"dask_memlimit": args.dask_memlimit, | |
} | |
for output in output: | |
row["duration"] = output | |
writer.writerow(row) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment