Last active
February 6, 2024 00:43
-
-
Save dmyersturnbull/dff552ad719943b1362d005eb86744ec to your computer and use it in GitHub Desktop.
Performs a Pandas groupby operation in parallel
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import itertools | |
import time | |
import multiprocessing | |
from typing import Callable, Tuple, Union | |
def groupby_parallel( | |
groupby_df: pd.core.groupby.DataFrameGroupBy, | |
func: Callable[[Tuple[str, pd.DataFrame]], Union[pd.DataFrame, pd.Series]], | |
num_cpus: int = multiprocessing.cpu_count() - 1, | |
log_fn: Callable[[str], Any] = print, | |
wait_sec: float = 0.4, | |
) -> pd.DataFrame: | |
""" | |
Performs a Pandas groupby operation in parallel. | |
Authors: Tamas Nagy and Douglas Myers-Turnbull | |
Example: | |
import pandas as pd | |
df = pd.DataFrame({"A": [0, 1], "B": [100, 200]}) | |
groupby_parallel(df.groupby("A"), lambda row: row["B"].sum()) | |
""" | |
start = time.time() | |
log_fn(f"\nUsing {num_cpus} CPUs in parallel...") | |
with multiprocessing.Pool(num_cpus) as pool: | |
queue = multiprocessing.Manager().Queue() | |
result = pool.starmap_async(func, [(name, group) for name, group in groupby_df]) | |
cycler = itertools.cycle("\|/―") | |
while not result.ready(): | |
log_fn(f"Percent complete: {queue.qsize()/len(groupby_df):.0%} {next(cycler)}"), end="\r") | |
time.sleep(wait_sec) | |
got = result.get() | |
log_fn(f"\nProcessed {len(got)} rows in {time.time() - start:.1f}s") | |
return pd.concat(got) |
The example does not work this way, right?
Instead of
df.groupby(df.groupby('A'), lambda row: row['B'].sum())
it should be
groupby_parallel(df.groupby('A'), lambda row: row['B'].sum())
The example does not work this way, right? Instead of
df.groupby(df.groupby('A'), lambda row: row['B'].sum())it should be
groupby_parallel(df.groupby('A'), lambda row: row['B'].sum())
@ReneHamburger1993 Fixed. Thanks!
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks for the comments. I just added a kwargs dict into the function definition to make it generic.