Last active
June 10, 2020 22:43
-
-
Save alexrudy/5ad8af689f405596d80c4871e91a58a4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
PPA - Progress Parallel Apply | |
A quick tool for parallel apply in python using multiprocessing. | |
""" | |
import functools | |
import multiprocessing as mp | |
from typing import Dict, Iterable, NamedTuple, Optional, Tuple, Union | |
import numpy as np | |
import pandas as pd | |
from pandas.io.parquet import to_parquet | |
from tqdm import tqdm | |
SeriesLike = Union[pd.DataFrame, pd.Series] | |
def papply_worker(series: SeriesLike, func, *args, **kwargs): | |
return series.apply(func, *args, **kwargs) | |
def _pcore(parts, func, args, kwargs, progress=True, ncores=None, process=pd.concat, total=None): | |
""" | |
Core implementation of parallel apply using a multiprocessing pool. | |
""" | |
total = sum(map(len, parts)) if total is None else total | |
ncores = mp.cpu_count() if ncores is None else ncores | |
with tqdm(total=total, disable=not progress) as pbar, mp.Pool(ncores) as pool: | |
def pbarupdate(chunk): | |
pbar.update(n=len(chunk)) | |
futures = [pool.apply_async(papply_worker, (part, func, *args), kwargs, callback=pbarupdate) for part in parts] | |
result = process(future.get() for future in futures) | |
return result | |
class ApplyParameters(NamedTuple): | |
ncores: Optional[int] | |
nchunks: Optional[int] | |
parts: Iterable[SeriesLike] | |
progress: bool | |
args: Tuple | |
kwargs: Dict | |
@classmethod | |
def build(cls, series, args, kwargs): | |
ncores = kwargs.pop("ncores", mp.cpu_count()) | |
nchunks = kwargs.pop("nchunks", ncores) | |
parts = np.array_split(series, nchunks) | |
progress = kwargs.pop("progress", True) | |
return cls(ncores, nchunks, parts, progress, args, kwargs) | |
def papply(series, func, *args, **kwargs): | |
""" | |
Parallel pandas series.apply() | |
Uses multiprocessing to distribute the work, in chunks, of a pandas | |
Series.apply operation across multiple CPU cores. | |
Parameters | |
---------- | |
series: pd.Series | |
Target pandas series for apply method. | |
func: callable | |
Function for use in the apply loop. | |
ncores: int | |
Number of cores across which to distribute work. | |
nchunks: int | |
Number of chunks to split array into, defaults to `ncores`. | |
progress: bool | |
Whether to show a progress bar using tqdm. | |
Any remaining arguments and keyword arguments are passed to `func` | |
""" | |
params = ApplyParameters.build(series, args, kwargs) | |
return _pcore( | |
params.parts, | |
func, | |
params.args, | |
params.kwargs, | |
progress=params.progress, | |
ncores=params.ncores, | |
process=pd.concat, | |
total=None, | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment