Skip to content

Instantly share code, notes, and snippets.

@rjzamora
Last active December 6, 2024 18:56
Show Gist options
  • Save rjzamora/ef9f26499e0295b02b160164d4779eaf to your computer and use it in GitHub Desktop.
Save rjzamora/ef9f26499e0295b02b160164d4779eaf to your computer and use it in GitHub Desktop.
Simple multi-file parquet benchmark for S3 storage
# # KvikIO S3 Env Variables
# os.environ["CUDF_NATIVE_S3_IO"] = "True"
# if os.environ.get("CUDF_NATIVE_S3_IO", "False") == "True":
# os.environ["KVIKIO_NTHREADS"] = f"{os.cpu_count()}"
import os
import math
import argparse
import random
import time
import numpy as np
from fsspec.core import get_fs_token_paths
import cudf
import pandas as pd
try:
import daft
except ImportError:
daft = None
try:
import pyarrow as pa
import pyarrow.dataset as pa_ds
import pyarrow.fs as pa_fs
except ImportError:
pa = None
#
# Usage: python parquet_bench.py -k <s3-key> -s <s3-secret-key> -r daft
#
parser = argparse.ArgumentParser(
prog="Multi-file Parquet Benchmark",
description="Read multiple parquet files into a DataFrame and collect runtimes.",
)
parser.add_argument(
"-r",
"--reader",
choices=[
"cudf",
"daft",
"daft[to_arrow]",
"cudf[daft]",
"arrow",
"cudf[arrow]",
"cudf[fsspec]",
],
default="cudf",
type=str,
help="DataFrame reader to use.",
)
parser.add_argument(
"-p",
"--path",
default="s3://dask-cudf-parquet-testing/dedup_parquet",
type=str,
help="Remote Parquet directory to read from.",
)
parser.add_argument(
"--columns",
type=str,
help='Columns to read/select from data.',
)
parser.add_argument(
"-t",
"--trials",
default=10,
type=int,
help="Number of trials to run (1-10).",
)
parser.add_argument(
"-w",
"--workers",
default=0,
type=int,
help="Number of dask workers to use (0 disables dask).",
)
parser.add_argument(
"--cuda_cluster",
action="store_true",
help="Number of dask workers to use (0 disables dask).",
)
parser.add_argument(
"--worker_threads",
default=1,
type=int,
help="Number of threads per worker to use with dask.",
)
parser.add_argument(
"--stride",
default=10,
type=int,
help="Number of files to read at once.",
)
parser.add_argument(
"-k",
"--key",
type=str,
help="Public S3 key.",
)
parser.add_argument(
"-s",
"--secret",
type=str,
help="Secret S3 key.",
)
args = parser.parse_args()
class CudfReader:
def __init__(self, kind, storage_options=None):
import s3fs
self.kind = kind
self.fs = s3fs.S3FileSystem(**(storage_options or {}))
def __call__(self, sources, columns):
if self.kind == "cudf[fsspec]":
return cudf.read_parquet(
self.fs.cat_ranges(sources, None, None),
columns=columns,
)
else:
return cudf.read_parquet(
sources,
columns=columns,
filesystem=self.fs,
)
class DaftReader:
def __init__(self, kind, storage_options=None):
if daft is None:
raise ImportError(
"Could not import daft - Please use `pip install getdaft[ray]`."
)
self.kind = kind
_mapping = {
"key": "key_id",
"secret": "access_key",
} # See: daft.io.S3Config docs
s3_args = {}
for k, v in (storage_options or {}).items():
s3_args[_mapping[k]] = v
self.io_config = None
if s3_args:
self.io_config = daft.io.IOConfig(s3=daft.io.S3Config(**s3_args))
def __call__(self, sources, columns):
df = daft.read_parquet(sources, io_config=self.io_config)
df = df.select(*columns) if columns else df
if self.kind == "cudf[daft]":
#return cudf.DataFrame.from_arrow(df.to_arrow())
return cudf.DataFrame.from_arrow(
pa.Table.from_batches(df.to_arrow_iter())
)
elif self.kind == "daft[to_arrow]":
#return df.to_arrow()
return pa.Table.from_batches(df.to_arrow_iter())
else:
return df.collect()
class ArrowReader:
def __init__(self, kind, storage_options=None):
if pa is None:
raise ImportError("Could not import pyarrow")
self.kind = kind
_mapping = {
"key": "access_key",
"secret": "secret_key",
} # See: pyarrow.fs.S3FileSystem docs
s3_args = {}
for k, v in (storage_options or {}).items():
s3_args[_mapping[k]] = v
self.arrow_fs = pa_fs.S3FileSystem(**s3_args)
self.scan_options = pa_ds.ParquetFragmentScanOptions(
pre_buffer=True,
cache_options=pa.CacheOptions(
hole_size_limit=4_194_304, # 4 MiB
range_size_limit=33_554_432, # 32.00 MiB
),
)
pa.set_cpu_count(os.cpu_count())
def __call__(self, sources, columns):
ds = pa_ds.dataset(
# Strip "s3://" for arrow read
[source[5:] for source in sources],
filesystem=self.arrow_fs,
format="parquet",
)
table = ds.to_table(
columns=columns,
batch_size=10_000_000,
fragment_scan_options=self.scan_options,
use_threads=True,
)
if self.kind == "cudf[arrow]":
return cudf.DataFrame.from_arrow(table.replace_schema_metadata())
else:
return table
def get_reader(kind, storage_options):
if kind in ("cudf", "cudf[fsspec]"):
reader = CudfReader(kind, storage_options)
elif "daft" in kind:
reader = DaftReader(kind, storage_options)
elif "arrow" in kind:
reader = ArrowReader(kind, storage_options)
else:
raise NotImplementedError()
return reader
def run(args):
# Get reader
storage_options = {}
if args.key:
storage_options["key"] = args.key
if args.secret:
storage_options["secret"] = args.secret
reader = get_reader(args.reader, storage_options)
print(f"Using {args.reader} Reader...")
# Collect dataset information
columns = None
if args.columns:
columns = args.columns.split(",")
url_path = args.path
fs = get_fs_token_paths(
url_path,
mode="rb",
storage_options=storage_options,
)[0]
all_paths = fs.glob(f"{url_path}/*.parquet")
# Choose stride and path count to use
worker_stride = args.stride
n_workers = args.workers
worker_threads = args.worker_threads
total_stride = worker_stride * max(n_workers, 1) * max(worker_threads, 1)
assert total_stride < len(all_paths), (
f"Total stride {total_stride} is larger than total file count {len(all_paths)}"
)
n_files_total = args.trials * total_stride
all_paths = random.sample(all_paths, len(all_paths))
if n_files_total > len(all_paths):
multiple = math.ceil(n_files_total / len(all_paths))
all_paths = all_paths * multiple
sizes = fs.sizes(all_paths[:n_files_total])
paths = ["s3://" + p for p in all_paths[:n_files_total]]
# Dask prep
if n_workers and worker_threads:
import dask.dataframe as dd
from dask.distributed import Client
if args.cuda_cluster:
from dask_cuda import LocalCUDACluster as LocalCluster
kwargs = {"rmm_pool_size": 0.9}
else:
from dask.distributed import LocalCluster
kwargs = {}
client = Client(
LocalCluster(
n_workers=n_workers,
threads_per_worker=worker_threads,
dashboard_address=":8585",
**kwargs,
),
)
meta = reader(paths[:1], columns=columns).to_pandas()
else:
client, meta = None, None
# Collect perf data
count = 0
times, file_sizes = [], []
for i in range(0, len(paths), total_stride):
count += 1
batch = paths[i:i+total_stride]
if client is None:
t0 = time.perf_counter()
reader(batch, columns=columns)
t1 = time.perf_counter()
else:
ddf = dd.from_map(
reader,
[
batch[j:j+worker_stride]
for j in range(0, total_stride, worker_stride)
],
meta=meta,
enforce_metadata=False,
columns=columns,
)
t0 = time.perf_counter()
len(ddf)
t1 = time.perf_counter()
t = t1 - t0
file_size = sum(sizes[i:i+total_stride]) / 1e6
times.append(t)
file_sizes.append(file_size)
print(f"...Read {file_size} MB in {t} s...")
# Summarize
file_sizes = np.array(file_sizes)
times = np.array(times)
bwths = file_sizes / times
avg = np.mean(bwths)
std = np.std(bwths)
mx = np.max(bwths)
mn = np.min(bwths)
print(f"With {args.reader} Reader...")
print(f"Summary ({count} trials): avg={avg}, std={std}, min={mn}, max={mx} (MB/s)\n")
if __name__ == "__main__":
run(args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment