Skip to content

Instantly share code, notes, and snippets.

@rjzamora
Last active December 6, 2024 19:02
Show Gist options
  • Save rjzamora/dce67e68682214c208633f4693a71169 to your computer and use it in GitHub Desktop.
Save rjzamora/dce67e68682214c208633f4693a71169 to your computer and use it in GitHub Desktop.
import argparse
import random
from functools import partial
from time import perf_counter as clock
import fsspec
import numpy as np
import pandas as pd
import dask
import dask.dataframe as dd
from dask.base import tokenize
from dask.utils import format_bytes, format_time, parse_bytes
from distributed import performance_report
DISK_SIZE_CACHE = {}
OPTIONS_CACHE = {}
parser = argparse.ArgumentParser(
prog="Distributed Parquet Benchmark",
description="Generalized version of Dask-CUDA read_parquet.py benchmark.",
)
parser.add_argument(
"path",
type=str,
help="Remote Parquet directory to read from.",
)
parser.add_argument(
"-b",
"--backend",
choices=["cudf", "pandas"],
default="cudf",
type=str,
help="DataFrame backend to use.",
)
parser.add_argument(
"--columns",
type=str,
help='Columns to read/select from data.',
)
parser.add_argument(
"--filesystem",
type=str,
default="arrow",
help='Filesystem backend.',
)
parser.add_argument(
"-r",
"--runs",
default=3,
type=int,
help="Number of trials to run (1-10).",
)
parser.add_argument(
"-w",
"--workers",
default=0,
type=int,
help="Number of dask workers to use (0 disables dask).",
)
parser.add_argument(
"--worker-threads",
default=0,
type=int,
help="Number of threads per worker to use with dask.",
)
parser.add_argument(
"--file-count",
default=0,
type=int,
help="Number of files to read from the dataset.",
)
parser.add_argument(
"--blocksize",
default="1GiB",
type=str,
help="Approx. partition size.",
)
parser.add_argument(
"--hybrid",
default=False,
action='store_true',
help="Use a hybrid (CPU/GPU) dask cluster.",
)
args = parser.parse_args()
def _noop(df):
return df
def read_data(reader, paths, columns, backend, **kwargs):
with dask.config.set({"dataframe.backend": backend}):
return reader(
paths,
columns=columns,
**kwargs,
)
def get_fs_paths_kwargs(args):
kwargs = {}
storage_options = {}
# TODO: Handle key/secret later?
# if args.key:
# storage_options["key"] = args.key
# if args.secret:
# storage_options["secret"] = args.secret
filesystem = args.filesystem
if filesystem == "arrow":
import pyarrow.fs as pa_fs
from fsspec.implementations.arrow import ArrowFSWrapper
_mapping = {
"key": "access_key",
"secret": "secret_key",
} # See: pyarrow.fs.S3FileSystem docs
s3_args = {}
for k, v in storage_options.items():
s3_args[_mapping[k]] = v
fs = pa_fs.FileSystem.from_uri(args.path)[0]
try:
region = {"region": fs.region}
except AttributeError:
region = {}
kwargs["filesystem"] = type(fs)(**region, **s3_args)
fsspec_fs = ArrowFSWrapper(kwargs["filesystem"])
else:
fsspec_fs = fsspec.core.get_fs_token_paths(
args.path, mode="rb", storage_options=storage_options
)[0]
kwargs["filesystem"] = fsspec_fs
kwargs["blocksize"] = args.blocksize
#kwargs["aggregate_files"] = True
# Collect list of paths
stripped_url_path = fsspec_fs._strip_protocol(args.path)
if stripped_url_path.endswith("/"):
stripped_url_path = stripped_url_path[:-1]
paths = fsspec_fs.glob(f"{stripped_url_path}/*.parquet")
paths = random.sample(paths, len(paths))
if args.file_count:
paths = paths[: args.file_count]
return fsspec_fs, paths, kwargs
def bench_once(client, reader, args):
global OPTIONS_CACHE
global DISK_SIZE_CACHE
# Construct kwargs
token = tokenize(args)
try:
fsspec_fs, paths, kwargs = OPTIONS_CACHE[token]
except KeyError:
fsspec_fs, paths, kwargs = get_fs_paths_kwargs(args)
OPTIONS_CACHE[token] = (fsspec_fs, paths, kwargs)
t1 = clock()
with performance_report(filename="pq_perf_report.html"):
df = read_data(
reader,
paths,
columns=args.columns,
backend=args.backend,
**kwargs,
)
num_rows = len(
# Use opaque `map_partitions` call to "block"
# dask-expr from using pq metadata to get length
df.map_partitions(
_noop,
meta=df._meta,
enforce_metadata=False,
)
)
t2 = clock()
# Extract total size of files on disk
token = tokenize(paths)
try:
disk_size = DISK_SIZE_CACHE[token]
except KeyError:
disk_size = sum(fsspec_fs.sizes(paths))
DISK_SIZE_CACHE[token] = disk_size
return (disk_size, num_rows, t2 - t1)
def run(args):
from distributed import Client
kwargs = {"dashboard_address": ":8585"}
if args.workers:
kwargs["n_workers"] = args.workers
if args.worker_threads:
kwargs["threads_per_worker"] = args.worker_threads
reader = dd.read_parquet
if args.backend == "cudf":
if args.hybrid:
from dask_cuda.hybrid.cluster import LocalHybridCluster as Cluster
kwargs["rmm_pool_size"] = 0.9
cluster = Cluster(**kwargs)
reader = partial(cluster.read_parquet, agg_factor=10)
else:
from dask_cuda import LocalCUDACluster as Cluster
kwargs["rmm_pool_size"] = 0.9
cluster = Cluster(**kwargs)
else:
from distributed import LocalCluster as Cluster
cluster = Cluster(**kwargs)
client = Client(cluster)
times = []
bws = []
for i in range(args.runs):
with dask.config.set({'dataframe.parquet.minimum-partition-size': args.blocksize}):
size, nrows, dt = bench_once(client, reader, args)
bw = size / dt
times.append(dt)
bws.append(bw)
print("======================================================")
print("Dask Parquet Read Benchmark")
print("======================================================")
print(f"Disk-size: {format_bytes(size)}")
print(f"Row-count: {nrows}")
print("------------------------------------------------------")
for i in range(args.runs):
print(f"Trial-{i}: {format_time(times[i])} - {format_bytes(bws[i])}/s")
if args.runs > 1:
avg_wct = np.mean(times)
std_wct = np.std(times)
avg_bw = np.mean(bws)
std_bw = np.std(bws)
print("------------------------------------------------------")
print(f"Averge-WCT: {format_time(avg_wct)} (std: {format_time(std_wct)})")
print(f"Averge-BW: {format_bytes(avg_bw)}/s (std: {format_bytes(std_bw)}/s)")
print("======================================================")
if __name__ == "__main__":
run(args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment