|
import vaex |
|
import numpy as np |
|
import dask.dataframe as dd |
|
import dask |
|
import dask.distributed |
|
import json |
|
import os |
|
import time |
|
import argparse |
|
import multiprocessing |
|
|
|
default_filename = 'string_benchmark.hdf5' |
|
|
|
parser = argparse.ArgumentParser('bench.py') |
|
parser.add_argument('--number', "-n", dest="n", type=float, default=7, |
|
help="log number of rows to use") |
|
parser.add_argument('--partitions', type=int, default=multiprocessing.cpu_count() * 2, |
|
help="number of partitions to split (default: 2x number cores)") |
|
parser.add_argument('--npandas', dest="npandas", type=float, default=7, |
|
help="number of rows to use for pandas") |
|
parser.add_argument('--filter', dest="filter", default=None, |
|
help="filter for benchmark") |
|
parser.add_argument('--filename', default=default_filename, |
|
help='filename to use for benchmark export/reading') |
|
parser.add_argument('--backend', default='vaex', |
|
help='The backend to test {vaex, dask, pandas}') |
|
args = parser.parse_args() |
|
|
|
|
|
timings = {} |
|
|
|
|
|
def mytimeit(expr, N, scope): |
|
times = [] |
|
for i in range(N): |
|
t0 = time.time() |
|
eval(expr, scope) |
|
times.append(time.time() - t0) |
|
if args.backend == 'dask': |
|
# Give time for dask's GC to run |
|
time.sleep(1.0) |
|
return times |
|
|
|
|
|
def vaex_nop(df): |
|
df.nop() |
|
|
|
|
|
def dask_nop(df): |
|
# We use `persist` here instead of `compute`. It is uncommon to call |
|
# `compute` on large dataframes in dask, since that will pull the large |
|
# results back to the client process (a potentially expensive process). |
|
# Rather we call `persist` to do all the operations but leave the data on |
|
# the workers. I believe this is a more fair comparison to vaex's `nop` |
|
dask.distributed.wait(df.persist()) |
|
|
|
|
|
def pandas_nop(df): |
|
pass |
|
|
|
|
|
if __name__ == '__main__': |
|
if not os.path.exists(args.filename): |
|
s = np.arange(0, int(10**args.n)).astype(str) |
|
df_vaex = vaex.from_arrays(x=s, s=s) |
|
print("Writing file") |
|
df_vaex.export(args.filename, progress=True, shuffle=True) |
|
del df_vaex |
|
|
|
df_vaex = vaex.open(args.filename) |
|
if args.backend == 'vaex': |
|
df = df_vaex |
|
df.executor.buffer_size = len(df) // args.partitions |
|
scope = {'df': df, 'nop': vaex_nop} |
|
elif args.backend == 'dask': |
|
# Start a local cluster with 1 thread per process (nprocesses = ncores |
|
# by default) |
|
dask.distributed.Client(threads_per_worker=1) |
|
df_pandas = df_vaex.to_pandas_df() |
|
# Load the data on the cluster already, to be fair in comparison to vaex |
|
df = dd.from_pandas(df_pandas, npartitions=args.partitions).persist() |
|
del df_pandas |
|
scope = {'df': df, 'nop': dask_nop} |
|
elif args.backend == 'pandas': |
|
df = df_vaex.to_pandas_df() |
|
scope = {'df': df, 'nop': pandas_nop} |
|
else: |
|
raise ValueError("Unknown backend %s" % args.backend) |
|
del df_vaex |
|
|
|
def test(name, expr): |
|
if args.filter and args.filter not in name: |
|
return |
|
print(name) |
|
results = mytimeit('nop(%s)' % expr, 5, scope=scope) |
|
t = min(results) / (10 ** args.n) |
|
timings[name] = t |
|
|
|
print("Benchmarking %s" % args.backend) |
|
test('capitalize', 'df.s.str.capitalize()') |
|
test('cat', 'df.s.str.cat(df.s)') |
|
test('contains', 'df.s.str.contains("9", regex=False)') |
|
test('contains(regex)', 'df.s.str.contains("9", regex=True)') |
|
test('count', 'df.s.str.count("9")') |
|
test('endswith', 'df.s.str.endswith("9")') |
|
test('find', 'df.s.str.find("4")') |
|
test('get', 'df.s.str.get(1)') |
|
test('split+join', 'df.s.str.split(".").str.join("-")') |
|
test('len', 'df.s.str.len()') |
|
test('ljust', 'df.s.str.ljust(10)') |
|
test('lower', 'df.s.str.lower()') |
|
test('lstrip', 'df.s.str.lstrip("9")') |
|
test('match', 'df.s.str.match("1.*")') |
|
test('pad', 'df.s.str.pad(10)') |
|
test('repeat', 'df.s.str.repeat(2)') |
|
test('replace(default)', 'df.s.str.replace("123", "321")') |
|
test('replace(no regex)', 'df.s.str.replace("123", "321", regex=False)') |
|
test('replace(regex)', 'df.s.str.replace("1?[45]4", "1004", regex=True)') |
|
test('rfind', 'df.s.str.rfind("4")') |
|
test('rjust', 'df.s.str.rjust(10)') |
|
test('rstrip', 'df.s.str.rstrip("9")') |
|
test('slice', 'df.s.str.slice(1, 3)') |
|
test('split', 'df.s.str.split(".")') |
|
test('startswith', 'df.s.str.startswith("9")') |
|
test('strip', 'df.s.str.strip("0")') # issues? |
|
test('title', 'df.s.str.title()') |
|
test('upper', 'df.s.str.upper()') |
|
test('zfill', 'df.s.str.zfill(10)') |
|
|
|
fn = "%s.json" % args.backend |
|
with open(fn, "w") as f: |
|
json.dump(timings, f) |