Skip to content

Instantly share code, notes, and snippets.

@rjzamora
Created October 15, 2020 22:22
Show Gist options
  • Save rjzamora/bfc76bb95babc7bbb70842ec7ea3c106 to your computer and use it in GitHub Desktop.
Save rjzamora/bfc76bb95babc7bbb70842ec7ea3c106 to your computer and use it in GitHub Desktop.
import os
import shutil
import time
import numpy as np
import dask.dataframe as dd
from dask.dataframe.io.demo import names as name_list
import fastavro as fa
import cudf
class CuDFAvroEngine(dd.io.avro.FastAvroEngine):
@classmethod
def read_partition(cls, fs, piece, index, columns):
path = piece["path"]
if "rows" in piece:
skiprows, num_rows = piece["rows"]
df = cudf.io.read_avro(path, skiprows=skiprows, num_rows=num_rows)
else:
df = cudf.io.read_avro(path)
# Deal with index and column selection
if index:
df.set_index(index, inplace=True)
if columns is None:
columns = list(df.columns)
return df[columns]
def read_avro_cudf(size, split_blocks, use_cudf):
# Cleanup fs
tmpdir = "/raid/dask_space/rzamora/scratch"
if os.path.isdir(tmpdir):
shutil.rmtree(tmpdir)
os.mkdir(tmpdir)
# Define avro schema
schema = fa.parse_schema(
{
"name": "avro.example.User",
"type": "record",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"},
],
}
)
# Write avro dataset with two files.
# Collect block and record (row) count while writing.
paths = [
os.path.join(str(tmpdir), "test.0.avro"),
os.path.join(str(tmpdir), "test.1.avro"),
]
tstart = time.time()
for path in paths:
names = np.random.choice(name_list, size)
ages = np.random.randint(18, 100, size)
data = [{"name": names[i], "age": ages[i]} for i in range(size)]
with open(path, "wb") as f:
fa.writer(f, schema, data)
twrite = time.time() - tstart
tstart = time.time()
if use_cudf:
meta = cudf.io.read_avro(paths[0], num_rows=5)
else:
meta = None
tmeta = time.time() - tstart
# Read back with dask.dataframe
tstart = time.time()
df = dd.io.avro.read_avro(
paths,
split_blocks=split_blocks,
meta=meta,
engine=CuDFAvroEngine if use_cudf else None,
)
tgraph = time.time() - tstart
# Full comparison
tstart = time.time()
df.compute(scheduler="synchronous")
tread = time.time() - tstart
return (twrite, tmeta, tgraph, tread)
if __name__ == '__main__':
split_blocks = True
size = 1_000_000
trials = 10
use_cudf = True
write_times = []
meta_times = []
graph_times = []
read_times = []
for trial in range(trials):
result = read_avro_cudf(size, split_blocks, use_cudf)
write_times.append(result[0])
meta_times.append(result[1])
graph_times.append(result[2])
read_times.append(result[3])
print("Write [s]:", np.mean(write_times), "+-", np.std(write_times))
print("Read Metadata [s]:", np.mean(meta_times), "+-", np.std(meta_times))
print("Graph Construct [s]:", np.mean(graph_times), "+-", np.std(graph_times))
print("Reading Data [s]:", np.mean(read_times), "+-", np.std(read_times))
@rjzamora
Copy link
Author

This benchmark is designed to test cudf compatibility with dask#6740

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment