Skip to content

Instantly share code, notes, and snippets.

@rjzamora
Created December 10, 2021 21:32
Show Gist options
  • Save rjzamora/8fa43338a64f2597cc6f77339471d720 to your computer and use it in GitHub Desktop.
Save rjzamora/8fa43338a64f2597cc6f77339471d720 to your computer and use it in GitHub Desktop.
import importlib
import time
import dask.dataframe as dd
from dask.distributed import LocalCluster, Client
try:
from dask_cuda import LocalCUDACluster
except ImportError:
dask_cuda = None
try:
import cudf, dask_cudf
except ImportError:
cudf = dask_cudf = None
import pandas as pd
import numpy as np
def run_all_benchmarks(
path,
columns,
engines=("cudf", "fastparquet", "pyarrow", "cudf-dask", "fastparquet-dask", "pyarrow-dask"),
cases=("open_parquet_file", "default-cache", "none-cache"),
trials=5,
storage_options=None,
dask_kwargs=None,
):
if cudf is None and "cudf" in engines:
raise ValueError("Cannot benchmark cudf - It is not installed!")
storage_options = storage_options or {}
results = {}
for case in cases:
results[case] = {}
for engine in engines:
if case.startswith("pyarrow") and not (
engine.startswith("pyarrow") or engine == "cudf"
):
# Skip pyarrow-specific cases for other engines
continue
results[case][engine] = run_benchmark(
path,
columns,
case,
engine,
trials,
storage_options,
(dask_kwargs or {}),
)
return results
def refresh_fs(path=None, storage_options=None, return_fs=True):
import fsspec
importlib.reload(fsspec)
fsspec.spec.AbstractFileSystem.clear_instance_cache()
if return_fs:
fs = fsspec.get_fs_token_paths(
path,
storage_options=(storage_options or {}),
)[0]
return fsspec, fs
return
def open_file(path, columns, engine, case, storage_options):
fsspec, fs = refresh_fs(path=path, storage_options=storage_options)
if case == "open_parquet_file":
import fsspec.parquet as fsspec_parquet
importlib.reload(fsspec_parquet)
use_engine = engine.split("-")[0] if not engine.startswith("cudf") else "pyarrow"
return fsspec_parquet.open_parquet_file(
path, fs=fs, columns=columns, engine=use_engine
)
elif case == "none-cache":
return fs.open(path, mode="rb", cache_type="none")
elif case == "default-cache":
return fs.open(path, mode="rb")
elif case == "pyarrow-s3":
from pyarrow import fs as pafs
return pafs.S3FileSystem().open_input_file(path.strip("s3://"))
else:
raise ValueError(f"Case {case} not recognized!")
def read_parquet(f, columns, engine, case, use_dask, **dask_kwargs):
if use_dask:
refresh_fs(return_fs=False)
# Supported Cases: ("open_parquet_file", "default-cache", "none-cache")
kwargs = dask_kwargs.copy()
if case == "open_parquet_file":
kwargs["open_options"] = {"file_format": "parquet"}
elif case == "pyarrow-s3":
from pyarrow import fs as pafs
kwargs["open_options"] = {"open_cb" : pafs.S3FileSystem().open_input_file}
else:
kwargs["open_options"] = {
"file_format": None,
"cache_type": "none" if case == "none-cache" else "readahead",
}
if engine == "cudf":
return dask_cudf.read_parquet(f, columns=columns, **kwargs).compute()
else:
return dd.read_parquet(f, columns=columns, engine=engine, **kwargs).compute()
else:
if engine == "cudf":
return cudf.read_parquet(f, columns=columns, use_python_file_object=True)
else:
return pd.read_parquet(f, columns=columns, engine=engine)
def run_benchmark(path, columns, case, engine, trials, storage_options, dask_kwargs):
split_engine = engine.split("-")
io_engine = split_engine[0]
use_dask = False if (len(split_engine) < 2 or split_engine[1] != "dask") else True
results = []
for trial in range(trials):
t0 = time.time()
if use_dask:
df = read_parquet(
path,
columns,
io_engine,
case,
use_dask,
storage_options=storage_options,
**dask_kwargs,
)
else:
with open_file(path, columns, engine, case, storage_options) as f:
df = read_parquet(f, columns, io_engine, case, use_dask)
results.append(time.time() - t0)
print(f"Case {case} results for {engine}: {results}")
return results
if __name__ == '__main__':
# Deploy local Dask cluster
client = Client(LocalCluster())
dask_kwargs = {"split_row_groups": 2}
# Choose engines and cases
engines = ("cudf", "fastparquet", "pyarrow", "cudf-dask", "fastparquet-dask", "pyarrow-dask")
cases = ("open_parquet_file", "none-cache", "default-cache") #, "pyarrow-s3",)
trials = 3
# Choose path and protocol
path = "dask-cudf-parquet-testing/large_file.parquet"
columns = ["int10"]
protocol = "gs" # "s3"
if protocol == "gs":
storage_options = {
"project" : 'NV-AI-Infra',
"token" : "/home/rzamora/workspace/cudf-21.10b/adc.json",
}
elif protocol == "s3":
storage_options = {} # Use aws config
path = protocol + "://" + path
results = run_all_benchmarks(
path,
columns,
engines=engines,
cases=cases,
trials=trials,
storage_options=storage_options,
dask_kwargs=dask_kwargs,
)
print(f"Result: {results}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment