Last active
February 6, 2024 00:43
-
-
Save dmyersturnbull/dff552ad719943b1362d005eb86744ec to your computer and use it in GitHub Desktop.
Performs a Pandas groupby operation in parallel
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import itertools | |
import time | |
import multiprocessing | |
from typing import Callable, Tuple, Union | |
def groupby_parallel( | |
groupby_df: pd.core.groupby.DataFrameGroupBy, | |
func: Callable[[Tuple[str, pd.DataFrame]], Union[pd.DataFrame, pd.Series]], | |
num_cpus: int = multiprocessing.cpu_count() - 1, | |
log_fn: Callable[[str], Any] = print, | |
wait_sec: float = 0.4, | |
) -> pd.DataFrame: | |
""" | |
Performs a Pandas groupby operation in parallel. | |
Authors: Tamas Nagy and Douglas Myers-Turnbull | |
Example: | |
import pandas as pd | |
df = pd.DataFrame({"A": [0, 1], "B": [100, 200]}) | |
groupby_parallel(df.groupby("A"), lambda row: row["B"].sum()) | |
""" | |
start = time.time() | |
log_fn(f"\nUsing {num_cpus} CPUs in parallel...") | |
with multiprocessing.Pool(num_cpus) as pool: | |
queue = multiprocessing.Manager().Queue() | |
result = pool.starmap_async(func, [(name, group) for name, group in groupby_df]) | |
cycler = itertools.cycle("\|/―") | |
while not result.ready(): | |
log_fn(f"Percent complete: {queue.qsize()/len(groupby_df):.0%} {next(cycler)}"), end="\r") | |
time.sleep(wait_sec) | |
got = result.get() | |
log_fn(f"\nProcessed {len(got)} rows in {time.time() - start:.1f}s") | |
return pd.concat(got) |
Can you share your idea on how to add additional arguments passing through the func? Thanks
@Jianpeng-Xu You can wrap it in another function.
def x(dfgroup, order: int):
return dfgroup.mean() + order
groupby_parallel(df, lambda group: x(group, 3)
Or, use functools.partial
.
@Jianpeng-Xu You can wrap it in another function.
def x(dfgroup, order: int): return dfgroup.mean() + order groupby_parallel(df, lambda group: x(group, 3)Or, use
functools.partial
.
Thanks for the comments. I just added a kwargs dict into the function definition to make it generic.
The example does not work this way, right?
Instead of
df.groupby(df.groupby('A'), lambda row: row['B'].sum())
it should be
groupby_parallel(df.groupby('A'), lambda row: row['B'].sum())
The example does not work this way, right? Instead of
df.groupby(df.groupby('A'), lambda row: row['B'].sum())it should be
groupby_parallel(df.groupby('A'), lambda row: row['B'].sum())
@ReneHamburger1993 Fixed. Thanks!
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Unless I'm misunderstanding something, the example in the docstring doesn't use
groupby_parallel
.