Last active
December 6, 2024 19:02
-
-
Save rjzamora/dce67e68682214c208633f4693a71169 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import random | |
from functools import partial | |
from time import perf_counter as clock | |
import fsspec | |
import numpy as np | |
import pandas as pd | |
import dask | |
import dask.dataframe as dd | |
from dask.base import tokenize | |
from dask.utils import format_bytes, format_time, parse_bytes | |
from distributed import performance_report | |
DISK_SIZE_CACHE = {} | |
OPTIONS_CACHE = {} | |
parser = argparse.ArgumentParser( | |
prog="Distributed Parquet Benchmark", | |
description="Generalized version of Dask-CUDA read_parquet.py benchmark.", | |
) | |
parser.add_argument( | |
"path", | |
type=str, | |
help="Remote Parquet directory to read from.", | |
) | |
parser.add_argument( | |
"-b", | |
"--backend", | |
choices=["cudf", "pandas"], | |
default="cudf", | |
type=str, | |
help="DataFrame backend to use.", | |
) | |
parser.add_argument( | |
"--columns", | |
type=str, | |
help='Columns to read/select from data.', | |
) | |
parser.add_argument( | |
"--filesystem", | |
type=str, | |
default="arrow", | |
help='Filesystem backend.', | |
) | |
parser.add_argument( | |
"-r", | |
"--runs", | |
default=3, | |
type=int, | |
help="Number of trials to run (1-10).", | |
) | |
parser.add_argument( | |
"-w", | |
"--workers", | |
default=0, | |
type=int, | |
help="Number of dask workers to use (0 disables dask).", | |
) | |
parser.add_argument( | |
"--worker-threads", | |
default=0, | |
type=int, | |
help="Number of threads per worker to use with dask.", | |
) | |
parser.add_argument( | |
"--file-count", | |
default=0, | |
type=int, | |
help="Number of files to read from the dataset.", | |
) | |
parser.add_argument( | |
"--blocksize", | |
default="1GiB", | |
type=str, | |
help="Approx. partition size.", | |
) | |
parser.add_argument( | |
"--hybrid", | |
default=False, | |
action='store_true', | |
help="Use a hybrid (CPU/GPU) dask cluster.", | |
) | |
args = parser.parse_args() | |
def _noop(df): | |
return df | |
def read_data(reader, paths, columns, backend, **kwargs): | |
with dask.config.set({"dataframe.backend": backend}): | |
return reader( | |
paths, | |
columns=columns, | |
**kwargs, | |
) | |
def get_fs_paths_kwargs(args): | |
kwargs = {} | |
storage_options = {} | |
# TODO: Handle key/secret later? | |
# if args.key: | |
# storage_options["key"] = args.key | |
# if args.secret: | |
# storage_options["secret"] = args.secret | |
filesystem = args.filesystem | |
if filesystem == "arrow": | |
import pyarrow.fs as pa_fs | |
from fsspec.implementations.arrow import ArrowFSWrapper | |
_mapping = { | |
"key": "access_key", | |
"secret": "secret_key", | |
} # See: pyarrow.fs.S3FileSystem docs | |
s3_args = {} | |
for k, v in storage_options.items(): | |
s3_args[_mapping[k]] = v | |
fs = pa_fs.FileSystem.from_uri(args.path)[0] | |
try: | |
region = {"region": fs.region} | |
except AttributeError: | |
region = {} | |
kwargs["filesystem"] = type(fs)(**region, **s3_args) | |
fsspec_fs = ArrowFSWrapper(kwargs["filesystem"]) | |
else: | |
fsspec_fs = fsspec.core.get_fs_token_paths( | |
args.path, mode="rb", storage_options=storage_options | |
)[0] | |
kwargs["filesystem"] = fsspec_fs | |
kwargs["blocksize"] = args.blocksize | |
#kwargs["aggregate_files"] = True | |
# Collect list of paths | |
stripped_url_path = fsspec_fs._strip_protocol(args.path) | |
if stripped_url_path.endswith("/"): | |
stripped_url_path = stripped_url_path[:-1] | |
paths = fsspec_fs.glob(f"{stripped_url_path}/*.parquet") | |
paths = random.sample(paths, len(paths)) | |
if args.file_count: | |
paths = paths[: args.file_count] | |
return fsspec_fs, paths, kwargs | |
def bench_once(client, reader, args): | |
global OPTIONS_CACHE | |
global DISK_SIZE_CACHE | |
# Construct kwargs | |
token = tokenize(args) | |
try: | |
fsspec_fs, paths, kwargs = OPTIONS_CACHE[token] | |
except KeyError: | |
fsspec_fs, paths, kwargs = get_fs_paths_kwargs(args) | |
OPTIONS_CACHE[token] = (fsspec_fs, paths, kwargs) | |
t1 = clock() | |
with performance_report(filename="pq_perf_report.html"): | |
df = read_data( | |
reader, | |
paths, | |
columns=args.columns, | |
backend=args.backend, | |
**kwargs, | |
) | |
num_rows = len( | |
# Use opaque `map_partitions` call to "block" | |
# dask-expr from using pq metadata to get length | |
df.map_partitions( | |
_noop, | |
meta=df._meta, | |
enforce_metadata=False, | |
) | |
) | |
t2 = clock() | |
# Extract total size of files on disk | |
token = tokenize(paths) | |
try: | |
disk_size = DISK_SIZE_CACHE[token] | |
except KeyError: | |
disk_size = sum(fsspec_fs.sizes(paths)) | |
DISK_SIZE_CACHE[token] = disk_size | |
return (disk_size, num_rows, t2 - t1) | |
def run(args): | |
from distributed import Client | |
kwargs = {"dashboard_address": ":8585"} | |
if args.workers: | |
kwargs["n_workers"] = args.workers | |
if args.worker_threads: | |
kwargs["threads_per_worker"] = args.worker_threads | |
reader = dd.read_parquet | |
if args.backend == "cudf": | |
if args.hybrid: | |
from dask_cuda.hybrid.cluster import LocalHybridCluster as Cluster | |
kwargs["rmm_pool_size"] = 0.9 | |
cluster = Cluster(**kwargs) | |
reader = partial(cluster.read_parquet, agg_factor=10) | |
else: | |
from dask_cuda import LocalCUDACluster as Cluster | |
kwargs["rmm_pool_size"] = 0.9 | |
cluster = Cluster(**kwargs) | |
else: | |
from distributed import LocalCluster as Cluster | |
cluster = Cluster(**kwargs) | |
client = Client(cluster) | |
times = [] | |
bws = [] | |
for i in range(args.runs): | |
with dask.config.set({'dataframe.parquet.minimum-partition-size': args.blocksize}): | |
size, nrows, dt = bench_once(client, reader, args) | |
bw = size / dt | |
times.append(dt) | |
bws.append(bw) | |
print("======================================================") | |
print("Dask Parquet Read Benchmark") | |
print("======================================================") | |
print(f"Disk-size: {format_bytes(size)}") | |
print(f"Row-count: {nrows}") | |
print("------------------------------------------------------") | |
for i in range(args.runs): | |
print(f"Trial-{i}: {format_time(times[i])} - {format_bytes(bws[i])}/s") | |
if args.runs > 1: | |
avg_wct = np.mean(times) | |
std_wct = np.std(times) | |
avg_bw = np.mean(bws) | |
std_bw = np.std(bws) | |
print("------------------------------------------------------") | |
print(f"Averge-WCT: {format_time(avg_wct)} (std: {format_time(std_wct)})") | |
print(f"Averge-BW: {format_bytes(avg_bw)}/s (std: {format_bytes(std_bw)}/s)") | |
print("======================================================") | |
if __name__ == "__main__": | |
run(args) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment