Created
October 15, 2020 22:22
-
-
Save rjzamora/bfc76bb95babc7bbb70842ec7ea3c106 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import shutil | |
import time | |
import numpy as np | |
import dask.dataframe as dd | |
from dask.dataframe.io.demo import names as name_list | |
import fastavro as fa | |
import cudf | |
class CuDFAvroEngine(dd.io.avro.FastAvroEngine): | |
@classmethod | |
def read_partition(cls, fs, piece, index, columns): | |
path = piece["path"] | |
if "rows" in piece: | |
skiprows, num_rows = piece["rows"] | |
df = cudf.io.read_avro(path, skiprows=skiprows, num_rows=num_rows) | |
else: | |
df = cudf.io.read_avro(path) | |
# Deal with index and column selection | |
if index: | |
df.set_index(index, inplace=True) | |
if columns is None: | |
columns = list(df.columns) | |
return df[columns] | |
def read_avro_cudf(size, split_blocks, use_cudf): | |
# Cleanup fs | |
tmpdir = "/raid/dask_space/rzamora/scratch" | |
if os.path.isdir(tmpdir): | |
shutil.rmtree(tmpdir) | |
os.mkdir(tmpdir) | |
# Define avro schema | |
schema = fa.parse_schema( | |
{ | |
"name": "avro.example.User", | |
"type": "record", | |
"fields": [ | |
{"name": "name", "type": "string"}, | |
{"name": "age", "type": "int"}, | |
], | |
} | |
) | |
# Write avro dataset with two files. | |
# Collect block and record (row) count while writing. | |
paths = [ | |
os.path.join(str(tmpdir), "test.0.avro"), | |
os.path.join(str(tmpdir), "test.1.avro"), | |
] | |
tstart = time.time() | |
for path in paths: | |
names = np.random.choice(name_list, size) | |
ages = np.random.randint(18, 100, size) | |
data = [{"name": names[i], "age": ages[i]} for i in range(size)] | |
with open(path, "wb") as f: | |
fa.writer(f, schema, data) | |
twrite = time.time() - tstart | |
tstart = time.time() | |
if use_cudf: | |
meta = cudf.io.read_avro(paths[0], num_rows=5) | |
else: | |
meta = None | |
tmeta = time.time() - tstart | |
# Read back with dask.dataframe | |
tstart = time.time() | |
df = dd.io.avro.read_avro( | |
paths, | |
split_blocks=split_blocks, | |
meta=meta, | |
engine=CuDFAvroEngine if use_cudf else None, | |
) | |
tgraph = time.time() - tstart | |
# Full comparison | |
tstart = time.time() | |
df.compute(scheduler="synchronous") | |
tread = time.time() - tstart | |
return (twrite, tmeta, tgraph, tread) | |
if __name__ == '__main__': | |
split_blocks = True | |
size = 1_000_000 | |
trials = 10 | |
use_cudf = True | |
write_times = [] | |
meta_times = [] | |
graph_times = [] | |
read_times = [] | |
for trial in range(trials): | |
result = read_avro_cudf(size, split_blocks, use_cudf) | |
write_times.append(result[0]) | |
meta_times.append(result[1]) | |
graph_times.append(result[2]) | |
read_times.append(result[3]) | |
print("Write [s]:", np.mean(write_times), "+-", np.std(write_times)) | |
print("Read Metadata [s]:", np.mean(meta_times), "+-", np.std(meta_times)) | |
print("Graph Construct [s]:", np.mean(graph_times), "+-", np.std(graph_times)) | |
print("Reading Data [s]:", np.mean(read_times), "+-", np.std(read_times)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This benchmark is designed to test cudf compatibility with dask#6740