Skip to content

Instantly share code, notes, and snippets.

@alexrudy
Last active June 10, 2020 22:43
Show Gist options
  • Save alexrudy/5ad8af689f405596d80c4871e91a58a4 to your computer and use it in GitHub Desktop.
Save alexrudy/5ad8af689f405596d80c4871e91a58a4 to your computer and use it in GitHub Desktop.
"""
PPA - Progress Parallel Apply
A quick tool for parallel apply in python using multiprocessing.
"""
import functools
import multiprocessing as mp
from typing import Dict, Iterable, NamedTuple, Optional, Tuple, Union
import numpy as np
import pandas as pd
from pandas.io.parquet import to_parquet
from tqdm import tqdm
SeriesLike = Union[pd.DataFrame, pd.Series]
def papply_worker(series: SeriesLike, func, *args, **kwargs):
return series.apply(func, *args, **kwargs)
def _pcore(parts, func, args, kwargs, progress=True, ncores=None, process=pd.concat, total=None):
"""
Core implementation of parallel apply using a multiprocessing pool.
"""
total = sum(map(len, parts)) if total is None else total
ncores = mp.cpu_count() if ncores is None else ncores
with tqdm(total=total, disable=not progress) as pbar, mp.Pool(ncores) as pool:
def pbarupdate(chunk):
pbar.update(n=len(chunk))
futures = [pool.apply_async(papply_worker, (part, func, *args), kwargs, callback=pbarupdate) for part in parts]
result = process(future.get() for future in futures)
return result
class ApplyParameters(NamedTuple):
ncores: Optional[int]
nchunks: Optional[int]
parts: Iterable[SeriesLike]
progress: bool
args: Tuple
kwargs: Dict
@classmethod
def build(cls, series, args, kwargs):
ncores = kwargs.pop("ncores", mp.cpu_count())
nchunks = kwargs.pop("nchunks", ncores)
parts = np.array_split(series, nchunks)
progress = kwargs.pop("progress", True)
return cls(ncores, nchunks, parts, progress, args, kwargs)
def papply(series, func, *args, **kwargs):
"""
Parallel pandas series.apply()
Uses multiprocessing to distribute the work, in chunks, of a pandas
Series.apply operation across multiple CPU cores.
Parameters
----------
series: pd.Series
Target pandas series for apply method.
func: callable
Function for use in the apply loop.
ncores: int
Number of cores across which to distribute work.
nchunks: int
Number of chunks to split array into, defaults to `ncores`.
progress: bool
Whether to show a progress bar using tqdm.
Any remaining arguments and keyword arguments are passed to `func`
"""
params = ApplyParameters.build(series, args, kwargs)
return _pcore(
params.parts,
func,
params.args,
params.kwargs,
progress=params.progress,
ncores=params.ncores,
process=pd.concat,
total=None,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment