Skip to content

Instantly share code, notes, and snippets.

@jcrist
Last active January 28, 2024 11:59
Show Gist options
  • Save jcrist/9969e0e7c345de665916c01764bcd1cd to your computer and use it in GitHub Desktop.
Save jcrist/9969e0e7c345de665916c01764bcd1cd to your computer and use it in GitHub Desktop.
Vaex String benchmarks, updated with dask fixes

Here I've reproduced the benchmarks in this vaex blogpost, updated to correct a few issues that were unfair towards Dask:

  • The original script uses uses the multiprocessing scheduler, which is almost never advised (we should maybe even remove it). Rather we now advise to use either the threaded scheduler or the distributed scheduler (which runs fine locally on a single machine). See https://docs.dask.org/en/latest/scheduling.html for more information.

  • The vaex code uses .nop() which computes the result but then drops it, avoiding the conversion to pandas. I've updated the dask code to use .persist() (compute on the workers, but don't bring the data back to the client process) which I think should be a fair comparison.

  • The original code would send the data to the workers every call, even though the data already is in memory. In cases like this it's more common to .persist() after the original load, so the data is already distributed throughout the cluster.

I also split the script out to do a separate run per backend, to minimize competing memory usage.

I ran the benchmark on my macbook (4 real cores (8 with hyperthreading), 16 GB RAM) as follows:

$ python bench.py --backend vaex
$ python bench.py --backend dask
$ python bench.py --backend pandas
$ python plot_bench.py

With these updated benchmarks, I get the following results:

results.svg

From the results we can see that Dask is 2-5x faster than pandas for these operations on my laptop (which is about what I'd expect given 4 real cores and easily parallelizable operations). Vaex is still an order of magnitude faster than Dask(!!), which is impressive to see. Dask inherits pandas memory model, so we still have the Python string representation (and the GIL along with it). When pandas string arrays move to arrow's representation Dask should speedup significantly here.

import vaex
import numpy as np
import dask.dataframe as dd
import dask
import dask.distributed
import json
import os
import time
import argparse
import multiprocessing
default_filename = 'string_benchmark.hdf5'
parser = argparse.ArgumentParser('bench.py')
parser.add_argument('--number', "-n", dest="n", type=float, default=7,
help="log number of rows to use")
parser.add_argument('--partitions', type=int, default=multiprocessing.cpu_count() * 2,
help="number of partitions to split (default: 2x number cores)")
parser.add_argument('--npandas', dest="npandas", type=float, default=7,
help="number of rows to use for pandas")
parser.add_argument('--filter', dest="filter", default=None,
help="filter for benchmark")
parser.add_argument('--filename', default=default_filename,
help='filename to use for benchmark export/reading')
parser.add_argument('--backend', default='vaex',
help='The backend to test {vaex, dask, pandas}')
args = parser.parse_args()
timings = {}
def mytimeit(expr, N, scope):
times = []
for i in range(N):
t0 = time.time()
eval(expr, scope)
times.append(time.time() - t0)
if args.backend == 'dask':
# Give time for dask's GC to run
time.sleep(1.0)
return times
def vaex_nop(df):
df.nop()
def dask_nop(df):
# We use `persist` here instead of `compute`. It is uncommon to call
# `compute` on large dataframes in dask, since that will pull the large
# results back to the client process (a potentially expensive process).
# Rather we call `persist` to do all the operations but leave the data on
# the workers. I believe this is a more fair comparison to vaex's `nop`
dask.distributed.wait(df.persist())
def pandas_nop(df):
pass
if __name__ == '__main__':
if not os.path.exists(args.filename):
s = np.arange(0, int(10**args.n)).astype(str)
df_vaex = vaex.from_arrays(x=s, s=s)
print("Writing file")
df_vaex.export(args.filename, progress=True, shuffle=True)
del df_vaex
df_vaex = vaex.open(args.filename)
if args.backend == 'vaex':
df = df_vaex
df.executor.buffer_size = len(df) // args.partitions
scope = {'df': df, 'nop': vaex_nop}
elif args.backend == 'dask':
# Start a local cluster with 1 thread per process (nprocesses = ncores
# by default)
dask.distributed.Client(threads_per_worker=1)
df_pandas = df_vaex.to_pandas_df()
# Load the data on the cluster already, to be fair in comparison to vaex
df = dd.from_pandas(df_pandas, npartitions=args.partitions).persist()
del df_pandas
scope = {'df': df, 'nop': dask_nop}
elif args.backend == 'pandas':
df = df_vaex.to_pandas_df()
scope = {'df': df, 'nop': pandas_nop}
else:
raise ValueError("Unknown backend %s" % args.backend)
del df_vaex
def test(name, expr):
if args.filter and args.filter not in name:
return
print(name)
results = mytimeit('nop(%s)' % expr, 5, scope=scope)
t = min(results) / (10 ** args.n)
timings[name] = t
print("Benchmarking %s" % args.backend)
test('capitalize', 'df.s.str.capitalize()')
test('cat', 'df.s.str.cat(df.s)')
test('contains', 'df.s.str.contains("9", regex=False)')
test('contains(regex)', 'df.s.str.contains("9", regex=True)')
test('count', 'df.s.str.count("9")')
test('endswith', 'df.s.str.endswith("9")')
test('find', 'df.s.str.find("4")')
test('get', 'df.s.str.get(1)')
test('split+join', 'df.s.str.split(".").str.join("-")')
test('len', 'df.s.str.len()')
test('ljust', 'df.s.str.ljust(10)')
test('lower', 'df.s.str.lower()')
test('lstrip', 'df.s.str.lstrip("9")')
test('match', 'df.s.str.match("1.*")')
test('pad', 'df.s.str.pad(10)')
test('repeat', 'df.s.str.repeat(2)')
test('replace(default)', 'df.s.str.replace("123", "321")')
test('replace(no regex)', 'df.s.str.replace("123", "321", regex=False)')
test('replace(regex)', 'df.s.str.replace("1?[45]4", "1004", regex=True)')
test('rfind', 'df.s.str.rfind("4")')
test('rjust', 'df.s.str.rjust(10)')
test('rstrip', 'df.s.str.rstrip("9")')
test('slice', 'df.s.str.slice(1, 3)')
test('split', 'df.s.str.split(".")')
test('startswith', 'df.s.str.startswith("9")')
test('strip', 'df.s.str.strip("0")') # issues?
test('title', 'df.s.str.title()')
test('upper', 'df.s.str.upper()')
test('zfill', 'df.s.str.zfill(10)')
fn = "%s.json" % args.backend
with open(fn, "w") as f:
json.dump(timings, f)
import json
import matplotlib
matplotlib.use('cairo',warn=False, force=True)
import pandas as pd
def load(backend):
with open(backend + '.json') as f:
return pd.DataFrame.from_dict(
json.load(f), orient='index', columns=[backend]
)
dask_times = load('dask')
pandas_times = load('pandas')
vaex_times = load('vaex')
times = pd.concat([vaex_times, dask_times, pandas_times], axis=1)
ax = (1 / times).plot.barh(
logx=True,
figsize=(10, 8),
title="Rows/second (larger is better)",
xlim=(10**6, 10**9),
)
ax.set_xlabel("time (s)")
ax.legend(loc="upper right")
fig = ax.get_figure()
fig.savefig('results.svg')
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment