import vaex |
import numpy as np |
import dask.dataframe as dd |
import dask |
import dask.distributed |
import json |
import os |
import time |
import argparse |
import multiprocessing |
default_filename = 'string_benchmark.hdf5' |
parser = argparse.ArgumentParser('bench.py') |
parser.add_argument('--number', "-n", dest="n", type=float, default=7, |
help="log number of rows to use") |
parser.add_argument('--partitions', type=int, default=multiprocessing.cpu_count() * 2, |
help="number of partitions to split (default: 2x number cores)") |
parser.add_argument('--npandas', dest="npandas", type=float, default=7, |
help="number of rows to use for pandas") |
parser.add_argument('--filter', dest="filter", default=None, |
help="filter for benchmark") |
parser.add_argument('--filename', default=default_filename, |
help='filename to use for benchmark export/reading') |
parser.add_argument('--backend', default='vaex', |
help='The backend to test {vaex, dask, pandas}') |
args = parser.parse_args() |
timings = {} |
def mytimeit(expr, N, scope): |
times = [] |
for i in range(N): |
t0 = time.time() |
eval(expr, scope) |
times.append(time.time() - t0) |
if args.backend == 'dask': |
# Give time for dask's GC to run |
time.sleep(1.0) |
return times |
def vaex_nop(df): |
df.nop() |
def dask_nop(df): |
# We use `persist` here instead of `compute`. It is uncommon to call |
# `compute` on large dataframes in dask, since that will pull the large |
# results back to the client process (a potentially expensive process). |
# Rather we call `persist` to do all the operations but leave the data on |
# the workers. I believe this is a more fair comparison to vaex's `nop` |
dask.distributed.wait(df.persist()) |
def pandas_nop(df): |
pass |
if __name__ == '__main__': |
if not os.path.exists(args.filename): |
s = np.arange(0, int(10**args.n)).astype(str) |
df_vaex = vaex.from_arrays(x=s, s=s) |
print("Writing file") |
df_vaex.export(args.filename, progress=True, shuffle=True) |
del df_vaex |
df_vaex = vaex.open(args.filename) |
if args.backend == 'vaex': |
df = df_vaex |
df.executor.buffer_size = len(df) // args.partitions |
scope = {'df': df, 'nop': vaex_nop} |
elif args.backend == 'dask': |
# Start a local cluster with 1 thread per process (nprocesses = ncores |
# by default) |
dask.distributed.Client(threads_per_worker=1) |
df_pandas = df_vaex.to_pandas_df() |
# Load the data on the cluster already, to be fair in comparison to vaex |
df = dd.from_pandas(df_pandas, npartitions=args.partitions).persist() |
del df_pandas |
scope = {'df': df, 'nop': dask_nop} |
elif args.backend == 'pandas': |
df = df_vaex.to_pandas_df() |
scope = {'df': df, 'nop': pandas_nop} |
else: |
raise ValueError("Unknown backend %s" % args.backend) |
del df_vaex |
def test(name, expr): |
if args.filter and args.filter not in name: |
return |
print(name) |
results = mytimeit('nop(%s)' % expr, 5, scope=scope) |
t = min(results) / (10 ** args.n) |
timings[name] = t |
print("Benchmarking %s" % args.backend) |
test('capitalize', 'df.s.str.capitalize()') |
test('cat', 'df.s.str.cat(df.s)') |
test('contains', 'df.s.str.contains("9", regex=False)') |
test('contains(regex)', 'df.s.str.contains("9", regex=True)') |
test('count', 'df.s.str.count("9")') |
test('endswith', 'df.s.str.endswith("9")') |
test('find', 'df.s.str.find("4")') |
test('get', 'df.s.str.get(1)') |
test('split+join', 'df.s.str.split(".").str.join("-")') |
test('len', 'df.s.str.len()') |
test('ljust', 'df.s.str.ljust(10)') |
test('lower', 'df.s.str.lower()') |
test('lstrip', 'df.s.str.lstrip("9")') |
test('match', 'df.s.str.match("1.*")') |
test('pad', 'df.s.str.pad(10)') |
test('repeat', 'df.s.str.repeat(2)') |
test('replace(default)', 'df.s.str.replace("123", "321")') |
test('replace(no regex)', 'df.s.str.replace("123", "321", regex=False)') |
test('replace(regex)', 'df.s.str.replace("1?[45]4", "1004", regex=True)') |
test('rfind', 'df.s.str.rfind("4")') |
test('rjust', 'df.s.str.rjust(10)') |
test('rstrip', 'df.s.str.rstrip("9")') |
test('slice', 'df.s.str.slice(1, 3)') |
test('split', 'df.s.str.split(".")') |
test('startswith', 'df.s.str.startswith("9")') |
test('strip', 'df.s.str.strip("0")') # issues? |
test('title', 'df.s.str.title()') |
test('upper', 'df.s.str.upper()') |
test('zfill', 'df.s.str.zfill(10)') |
fn = "%s.json" % args.backend |
with open(fn, "w") as f: |
json.dump(timings, f) |