Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save karenyyng/6cf6adde9e5638f329cbc5722fb628e5 to your computer and use it in GitHub Desktop.
Save karenyyng/6cf6adde9e5638f329cbc5722fb628e5 to your computer and use it in GitHub Desktop.
Parquet multithreaded benchmarks
import gc
import os
import time
import numpy as np
import pandas as pd
from pyarrow.compat import guid
import pyarrow as pa
import pyarrow.parquet as pq
import snappy
def generate_floats(n, pct_null, repeats=1):
nunique = int(n / repeats)
unique_values = np.random.randn(nunique)
num_nulls = int(nunique * pct_null)
null_indices = np.random.choice(nunique, size=num_nulls, replace=False)
unique_values[null_indices] = np.nan
return unique_values.repeat(repeats)
DATA_GENERATORS = {
'float64': generate_floats
}
def generate_data(nrows, ncols, pct_null=0.1, repeats=1, dtype='float64'):
type_ = np.dtype('float64')
datagen_func = DATA_GENERATORS[dtype]
data = {
'c' + str(i): datagen_func(nrows, pct_null, repeats)
for i in range(ncols)
}
return pd.DataFrame(data)
def write_to_parquet(df, out_path, use_dictionary=True,
compression='SNAPPY'):
arrow_table = pa.Table.from_pandas(df)
if compression.lower() == 'uncompressed':
compression = None
pq.write_table(arrow_table, out_path, use_dictionary=use_dictionary,
compression=compression)
def read_pyarrow(path, nthreads=1):
return pq.read_table(path, nthreads=nthreads).to_pandas()
def get_timing(f, path, niter):
start = time.clock_gettime(time.CLOCK_MONOTONIC)
for i in range(niter):
f(path)
elapsed = time.clock_gettime(time.CLOCK_MONOTONIC) - start
return elapsed
MEGABYTE = 1 << 20
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
NROWS = DATA_SIZE / NCOLS / np.dtype('float64').itemsize
cases = {
'low_entropy_dict': {
'pct_null': 0.1,
'repeats': 1000,
'use_dictionary': True
},
'low_entropy': {
'pct_null': 0.1,
'repeats': 1000,
'use_dictionary': False
},
'high_entropy_dict': {
'pct_null': 0.1,
'repeats': 1,
'use_dictionary': True
},
'high_entropy': {
'pct_null': 0.1,
'repeats': 1,
'use_dictionary': False
}
}
NITER = 5
results = []
readers = [
('pyarrow', lambda path: read_pyarrow(path)),
('pyarrow 2 threads', lambda path: read_pyarrow(path, nthreads=2)),
('pyarrow 4 threads', lambda path: read_pyarrow(path, nthreads=4))
]
COMPRESSIONS = ['UNCOMPRESSED', 'SNAPPY'] # , 'GZIP']
case_files = {}
for case, params in cases.items():
for compression in COMPRESSIONS:
path = '{0}_{1}.parquet'.format(case, compression or 'UNCOMPRESSED')
df = generate_data(NROWS, NCOLS, repeats=params['repeats'])
write_to_parquet(df, path, compression=compression,
use_dictionary=params['use_dictionary'])
df = None
case_files[case, compression] = path
for case, params in cases.items():
for compression in COMPRESSIONS:
path = case_files[case, compression]
compression = compression if compression != 'UNCOMPRESSED' else None
# prime the file cache
read_pyarrow(path)
read_pyarrow(path)
for reader_name, f in readers:
elapsed = get_timing(f, path, NITER) / NITER
result = case, compression, reader_name, elapsed
print(result)
results.append(result)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment