Last active
July 9, 2019 22:14
-
-
Save benfasoli/974392f21e7641ce3bcc951359b69f2a to your computer and use it in GitHub Desktop.
Split-apply-combine strategy for data mining in parallel with Pandas DataFrames
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from multiprocessing import cpu_count, Pool | |
import pandas as pd | |
def parallel_apply(df, fun, n_chunks: int = None): | |
"""Apply function to batches of dataframe rows | |
Parameters | |
---------- | |
df : pd.DataFrame | |
Source DataFrame which is split into n_chunks and passed as the first | |
argument to fun | |
fun : function | |
Function that returns a pandas DataFrame or Series | |
n_chunks : int | |
Number of parallel processes to use. If None, defaults to the number of | |
cpus available | |
""" | |
n_chunks = n_chunks or cpu_count() | |
n_rows = len(df) | |
chunk_size = n_rows / n_chunks | |
n_rows_per_chunk = int(chunk_size) + 1 if n_chunks % chunk_size else int(chunk_size) | |
breaks = [x * n_rows_per_chunk for x in range(n_chunks)] + [n_rows] | |
df_chunks = [df.iloc[breaks[i]:breaks[i+1], ] for i in range(n_chunks)] | |
pool = Pool(n_chunks) | |
res = pool.map(fun, df_chunks) | |
pool.close() | |
pool.join() | |
return pd.concat(res) | |
def fun(chunk): | |
"""Function applied to each chunk of the DataFrame | |
This example adds 1 to every value in the x column | |
""" | |
chunk.x += 1 | |
return chunk | |
df = pd.DataFrame([ | |
{'x': 1, 'y': 1}, | |
{'x': 3, 'y': 3}, | |
{'x': 4, 'y': 4} | |
], index=['a', 'b', 'c']) | |
print(df) | |
# x y | |
# a 1 1 | |
# b 3 3 | |
# c 4 4 | |
result = parallel_apply(df, fun) | |
print(result) | |
# x y | |
# a 2 1 | |
# b 4 3 | |
# c 5 4 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment