Last active
June 15, 2023 20:44
-
-
Save govorunov/3d1a214dc067b7f9df54d481f46ffb68 to your computer and use it in GitHub Desktop.
Parallelize Pandas apply over multiple workers using Joblib.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from joblib import Parallel, delayed, effective_n_jobs | |
import pandas as pd | |
def gen_even_slices(n, n_packs, *, n_samples=None): | |
"""Generator to create n_packs slices going up to n. | |
Parameters | |
---------- | |
n : int | |
n_packs : int | |
Number of slices to generate. | |
n_samples : int, default=None | |
Number of samples. Pass n_samples when the slices are to be used for | |
sparse matrix indexing; slicing off-the-end raises an exception, while | |
it works for NumPy arrays. | |
Yields | |
------ | |
slice | |
See Also | |
-------- | |
gen_batches: Generator to create slices containing batch_size elements | |
from 0 to n. | |
Examples | |
-------- | |
>>> from sklearn.utils import gen_even_slices | |
>>> list(gen_even_slices(10, 1)) | |
[slice(0, 10, None)] | |
>>> list(gen_even_slices(10, 10)) | |
[slice(0, 1, None), slice(1, 2, None), ..., slice(9, 10, None)] | |
>>> list(gen_even_slices(10, 5)) | |
[slice(0, 2, None), slice(2, 4, None), ..., slice(8, 10, None)] | |
>>> list(gen_even_slices(10, 3)) | |
[slice(0, 4, None), slice(4, 7, None), slice(7, 10, None)] | |
""" | |
start = 0 | |
if n_packs < 1: | |
raise ValueError("gen_even_slices got n_packs=%s, must be >=1" % n_packs) | |
for pack_num in range(n_packs): | |
this_n = n // n_packs | |
if pack_num < n % n_packs: | |
this_n += 1 | |
if this_n > 0: | |
end = start + this_n | |
if n_samples is not None: | |
end = min(n_samples, end) | |
yield slice(start, end, None) | |
start = end | |
def parallel_apply(df, func, n_jobs=-1, **kwargs): | |
""" Pandas apply in parallel using joblib. | |
Args: | |
df: Pandas DataFrame, Series, or any other object that supports slicing and apply. | |
func: Callable to apply | |
n_jobs: Desired number of workers. Default value -1 means use all available cores. | |
**kwargs: Any additional parameters will be supplied to the apply function | |
Returns: | |
Same as for normal Pandas DataFrame.apply() | |
""" | |
if effective_n_jobs(n_jobs) == 1: | |
return df.apply(func, **kwargs) | |
else: | |
ret = Parallel(n_jobs=n_jobs)( | |
delayed(type(df).apply)(df.iloc[s], func, **kwargs) | |
for s in gen_even_slices(len(df), effective_n_jobs(n_jobs))) | |
return pd.concat(ret) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment