Created
December 10, 2021 21:32
-
-
Save rjzamora/8fa43338a64f2597cc6f77339471d720 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import importlib | |
import time | |
import dask.dataframe as dd | |
from dask.distributed import LocalCluster, Client | |
try: | |
from dask_cuda import LocalCUDACluster | |
except ImportError: | |
dask_cuda = None | |
try: | |
import cudf, dask_cudf | |
except ImportError: | |
cudf = dask_cudf = None | |
import pandas as pd | |
import numpy as np | |
def run_all_benchmarks( | |
path, | |
columns, | |
engines=("cudf", "fastparquet", "pyarrow", "cudf-dask", "fastparquet-dask", "pyarrow-dask"), | |
cases=("open_parquet_file", "default-cache", "none-cache"), | |
trials=5, | |
storage_options=None, | |
dask_kwargs=None, | |
): | |
if cudf is None and "cudf" in engines: | |
raise ValueError("Cannot benchmark cudf - It is not installed!") | |
storage_options = storage_options or {} | |
results = {} | |
for case in cases: | |
results[case] = {} | |
for engine in engines: | |
if case.startswith("pyarrow") and not ( | |
engine.startswith("pyarrow") or engine == "cudf" | |
): | |
# Skip pyarrow-specific cases for other engines | |
continue | |
results[case][engine] = run_benchmark( | |
path, | |
columns, | |
case, | |
engine, | |
trials, | |
storage_options, | |
(dask_kwargs or {}), | |
) | |
return results | |
def refresh_fs(path=None, storage_options=None, return_fs=True): | |
import fsspec | |
importlib.reload(fsspec) | |
fsspec.spec.AbstractFileSystem.clear_instance_cache() | |
if return_fs: | |
fs = fsspec.get_fs_token_paths( | |
path, | |
storage_options=(storage_options or {}), | |
)[0] | |
return fsspec, fs | |
return | |
def open_file(path, columns, engine, case, storage_options): | |
fsspec, fs = refresh_fs(path=path, storage_options=storage_options) | |
if case == "open_parquet_file": | |
import fsspec.parquet as fsspec_parquet | |
importlib.reload(fsspec_parquet) | |
use_engine = engine.split("-")[0] if not engine.startswith("cudf") else "pyarrow" | |
return fsspec_parquet.open_parquet_file( | |
path, fs=fs, columns=columns, engine=use_engine | |
) | |
elif case == "none-cache": | |
return fs.open(path, mode="rb", cache_type="none") | |
elif case == "default-cache": | |
return fs.open(path, mode="rb") | |
elif case == "pyarrow-s3": | |
from pyarrow import fs as pafs | |
return pafs.S3FileSystem().open_input_file(path.strip("s3://")) | |
else: | |
raise ValueError(f"Case {case} not recognized!") | |
def read_parquet(f, columns, engine, case, use_dask, **dask_kwargs): | |
if use_dask: | |
refresh_fs(return_fs=False) | |
# Supported Cases: ("open_parquet_file", "default-cache", "none-cache") | |
kwargs = dask_kwargs.copy() | |
if case == "open_parquet_file": | |
kwargs["open_options"] = {"file_format": "parquet"} | |
elif case == "pyarrow-s3": | |
from pyarrow import fs as pafs | |
kwargs["open_options"] = {"open_cb" : pafs.S3FileSystem().open_input_file} | |
else: | |
kwargs["open_options"] = { | |
"file_format": None, | |
"cache_type": "none" if case == "none-cache" else "readahead", | |
} | |
if engine == "cudf": | |
return dask_cudf.read_parquet(f, columns=columns, **kwargs).compute() | |
else: | |
return dd.read_parquet(f, columns=columns, engine=engine, **kwargs).compute() | |
else: | |
if engine == "cudf": | |
return cudf.read_parquet(f, columns=columns, use_python_file_object=True) | |
else: | |
return pd.read_parquet(f, columns=columns, engine=engine) | |
def run_benchmark(path, columns, case, engine, trials, storage_options, dask_kwargs): | |
split_engine = engine.split("-") | |
io_engine = split_engine[0] | |
use_dask = False if (len(split_engine) < 2 or split_engine[1] != "dask") else True | |
results = [] | |
for trial in range(trials): | |
t0 = time.time() | |
if use_dask: | |
df = read_parquet( | |
path, | |
columns, | |
io_engine, | |
case, | |
use_dask, | |
storage_options=storage_options, | |
**dask_kwargs, | |
) | |
else: | |
with open_file(path, columns, engine, case, storage_options) as f: | |
df = read_parquet(f, columns, io_engine, case, use_dask) | |
results.append(time.time() - t0) | |
print(f"Case {case} results for {engine}: {results}") | |
return results | |
if __name__ == '__main__': | |
# Deploy local Dask cluster | |
client = Client(LocalCluster()) | |
dask_kwargs = {"split_row_groups": 2} | |
# Choose engines and cases | |
engines = ("cudf", "fastparquet", "pyarrow", "cudf-dask", "fastparquet-dask", "pyarrow-dask") | |
cases = ("open_parquet_file", "none-cache", "default-cache") #, "pyarrow-s3",) | |
trials = 3 | |
# Choose path and protocol | |
path = "dask-cudf-parquet-testing/large_file.parquet" | |
columns = ["int10"] | |
protocol = "gs" # "s3" | |
if protocol == "gs": | |
storage_options = { | |
"project" : 'NV-AI-Infra', | |
"token" : "/home/rzamora/workspace/cudf-21.10b/adc.json", | |
} | |
elif protocol == "s3": | |
storage_options = {} # Use aws config | |
path = protocol + "://" + path | |
results = run_all_benchmarks( | |
path, | |
columns, | |
engines=engines, | |
cases=cases, | |
trials=trials, | |
storage_options=storage_options, | |
dask_kwargs=dask_kwargs, | |
) | |
print(f"Result: {results}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment