Last active
December 6, 2024 18:56
-
-
Save rjzamora/ef9f26499e0295b02b160164d4779eaf to your computer and use it in GitHub Desktop.
Simple multi-file parquet benchmark for S3 storage
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# # KvikIO S3 Env Variables | |
# os.environ["CUDF_NATIVE_S3_IO"] = "True" | |
# if os.environ.get("CUDF_NATIVE_S3_IO", "False") == "True": | |
# os.environ["KVIKIO_NTHREADS"] = f"{os.cpu_count()}" | |
import os | |
import math | |
import argparse | |
import random | |
import time | |
import numpy as np | |
from fsspec.core import get_fs_token_paths | |
import cudf | |
import pandas as pd | |
try: | |
import daft | |
except ImportError: | |
daft = None | |
try: | |
import pyarrow as pa | |
import pyarrow.dataset as pa_ds | |
import pyarrow.fs as pa_fs | |
except ImportError: | |
pa = None | |
# | |
# Usage: python parquet_bench.py -k <s3-key> -s <s3-secret-key> -r daft | |
# | |
parser = argparse.ArgumentParser( | |
prog="Multi-file Parquet Benchmark", | |
description="Read multiple parquet files into a DataFrame and collect runtimes.", | |
) | |
parser.add_argument( | |
"-r", | |
"--reader", | |
choices=[ | |
"cudf", | |
"daft", | |
"daft[to_arrow]", | |
"cudf[daft]", | |
"arrow", | |
"cudf[arrow]", | |
"cudf[fsspec]", | |
], | |
default="cudf", | |
type=str, | |
help="DataFrame reader to use.", | |
) | |
parser.add_argument( | |
"-p", | |
"--path", | |
default="s3://dask-cudf-parquet-testing/dedup_parquet", | |
type=str, | |
help="Remote Parquet directory to read from.", | |
) | |
parser.add_argument( | |
"--columns", | |
type=str, | |
help='Columns to read/select from data.', | |
) | |
parser.add_argument( | |
"-t", | |
"--trials", | |
default=10, | |
type=int, | |
help="Number of trials to run (1-10).", | |
) | |
parser.add_argument( | |
"-w", | |
"--workers", | |
default=0, | |
type=int, | |
help="Number of dask workers to use (0 disables dask).", | |
) | |
parser.add_argument( | |
"--cuda_cluster", | |
action="store_true", | |
help="Number of dask workers to use (0 disables dask).", | |
) | |
parser.add_argument( | |
"--worker_threads", | |
default=1, | |
type=int, | |
help="Number of threads per worker to use with dask.", | |
) | |
parser.add_argument( | |
"--stride", | |
default=10, | |
type=int, | |
help="Number of files to read at once.", | |
) | |
parser.add_argument( | |
"-k", | |
"--key", | |
type=str, | |
help="Public S3 key.", | |
) | |
parser.add_argument( | |
"-s", | |
"--secret", | |
type=str, | |
help="Secret S3 key.", | |
) | |
args = parser.parse_args() | |
class CudfReader: | |
def __init__(self, kind, storage_options=None): | |
import s3fs | |
self.kind = kind | |
self.fs = s3fs.S3FileSystem(**(storage_options or {})) | |
def __call__(self, sources, columns): | |
if self.kind == "cudf[fsspec]": | |
return cudf.read_parquet( | |
self.fs.cat_ranges(sources, None, None), | |
columns=columns, | |
) | |
else: | |
return cudf.read_parquet( | |
sources, | |
columns=columns, | |
filesystem=self.fs, | |
) | |
class DaftReader: | |
def __init__(self, kind, storage_options=None): | |
if daft is None: | |
raise ImportError( | |
"Could not import daft - Please use `pip install getdaft[ray]`." | |
) | |
self.kind = kind | |
_mapping = { | |
"key": "key_id", | |
"secret": "access_key", | |
} # See: daft.io.S3Config docs | |
s3_args = {} | |
for k, v in (storage_options or {}).items(): | |
s3_args[_mapping[k]] = v | |
self.io_config = None | |
if s3_args: | |
self.io_config = daft.io.IOConfig(s3=daft.io.S3Config(**s3_args)) | |
def __call__(self, sources, columns): | |
df = daft.read_parquet(sources, io_config=self.io_config) | |
df = df.select(*columns) if columns else df | |
if self.kind == "cudf[daft]": | |
#return cudf.DataFrame.from_arrow(df.to_arrow()) | |
return cudf.DataFrame.from_arrow( | |
pa.Table.from_batches(df.to_arrow_iter()) | |
) | |
elif self.kind == "daft[to_arrow]": | |
#return df.to_arrow() | |
return pa.Table.from_batches(df.to_arrow_iter()) | |
else: | |
return df.collect() | |
class ArrowReader: | |
def __init__(self, kind, storage_options=None): | |
if pa is None: | |
raise ImportError("Could not import pyarrow") | |
self.kind = kind | |
_mapping = { | |
"key": "access_key", | |
"secret": "secret_key", | |
} # See: pyarrow.fs.S3FileSystem docs | |
s3_args = {} | |
for k, v in (storage_options or {}).items(): | |
s3_args[_mapping[k]] = v | |
self.arrow_fs = pa_fs.S3FileSystem(**s3_args) | |
self.scan_options = pa_ds.ParquetFragmentScanOptions( | |
pre_buffer=True, | |
cache_options=pa.CacheOptions( | |
hole_size_limit=4_194_304, # 4 MiB | |
range_size_limit=33_554_432, # 32.00 MiB | |
), | |
) | |
pa.set_cpu_count(os.cpu_count()) | |
def __call__(self, sources, columns): | |
ds = pa_ds.dataset( | |
# Strip "s3://" for arrow read | |
[source[5:] for source in sources], | |
filesystem=self.arrow_fs, | |
format="parquet", | |
) | |
table = ds.to_table( | |
columns=columns, | |
batch_size=10_000_000, | |
fragment_scan_options=self.scan_options, | |
use_threads=True, | |
) | |
if self.kind == "cudf[arrow]": | |
return cudf.DataFrame.from_arrow(table.replace_schema_metadata()) | |
else: | |
return table | |
def get_reader(kind, storage_options): | |
if kind in ("cudf", "cudf[fsspec]"): | |
reader = CudfReader(kind, storage_options) | |
elif "daft" in kind: | |
reader = DaftReader(kind, storage_options) | |
elif "arrow" in kind: | |
reader = ArrowReader(kind, storage_options) | |
else: | |
raise NotImplementedError() | |
return reader | |
def run(args): | |
# Get reader | |
storage_options = {} | |
if args.key: | |
storage_options["key"] = args.key | |
if args.secret: | |
storage_options["secret"] = args.secret | |
reader = get_reader(args.reader, storage_options) | |
print(f"Using {args.reader} Reader...") | |
# Collect dataset information | |
columns = None | |
if args.columns: | |
columns = args.columns.split(",") | |
url_path = args.path | |
fs = get_fs_token_paths( | |
url_path, | |
mode="rb", | |
storage_options=storage_options, | |
)[0] | |
all_paths = fs.glob(f"{url_path}/*.parquet") | |
# Choose stride and path count to use | |
worker_stride = args.stride | |
n_workers = args.workers | |
worker_threads = args.worker_threads | |
total_stride = worker_stride * max(n_workers, 1) * max(worker_threads, 1) | |
assert total_stride < len(all_paths), ( | |
f"Total stride {total_stride} is larger than total file count {len(all_paths)}" | |
) | |
n_files_total = args.trials * total_stride | |
all_paths = random.sample(all_paths, len(all_paths)) | |
if n_files_total > len(all_paths): | |
multiple = math.ceil(n_files_total / len(all_paths)) | |
all_paths = all_paths * multiple | |
sizes = fs.sizes(all_paths[:n_files_total]) | |
paths = ["s3://" + p for p in all_paths[:n_files_total]] | |
# Dask prep | |
if n_workers and worker_threads: | |
import dask.dataframe as dd | |
from dask.distributed import Client | |
if args.cuda_cluster: | |
from dask_cuda import LocalCUDACluster as LocalCluster | |
kwargs = {"rmm_pool_size": 0.9} | |
else: | |
from dask.distributed import LocalCluster | |
kwargs = {} | |
client = Client( | |
LocalCluster( | |
n_workers=n_workers, | |
threads_per_worker=worker_threads, | |
dashboard_address=":8585", | |
**kwargs, | |
), | |
) | |
meta = reader(paths[:1], columns=columns).to_pandas() | |
else: | |
client, meta = None, None | |
# Collect perf data | |
count = 0 | |
times, file_sizes = [], [] | |
for i in range(0, len(paths), total_stride): | |
count += 1 | |
batch = paths[i:i+total_stride] | |
if client is None: | |
t0 = time.perf_counter() | |
reader(batch, columns=columns) | |
t1 = time.perf_counter() | |
else: | |
ddf = dd.from_map( | |
reader, | |
[ | |
batch[j:j+worker_stride] | |
for j in range(0, total_stride, worker_stride) | |
], | |
meta=meta, | |
enforce_metadata=False, | |
columns=columns, | |
) | |
t0 = time.perf_counter() | |
len(ddf) | |
t1 = time.perf_counter() | |
t = t1 - t0 | |
file_size = sum(sizes[i:i+total_stride]) / 1e6 | |
times.append(t) | |
file_sizes.append(file_size) | |
print(f"...Read {file_size} MB in {t} s...") | |
# Summarize | |
file_sizes = np.array(file_sizes) | |
times = np.array(times) | |
bwths = file_sizes / times | |
avg = np.mean(bwths) | |
std = np.std(bwths) | |
mx = np.max(bwths) | |
mn = np.min(bwths) | |
print(f"With {args.reader} Reader...") | |
print(f"Summary ({count} trials): avg={avg}, std={std}, min={mn}, max={mx} (MB/s)\n") | |
if __name__ == "__main__": | |
run(args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment