Created
December 14, 2018 12:07
-
-
Save snakers4/24cf04786224e22325f109ed15a6ac59 to your computer and use it in GitHub Desktop.
Pandas multiprocessing wrappers
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from tqdm import tqdm | |
import numpy as np | |
import pandas as pd | |
from multiprocessing import Pool | |
def _apply_df(args): | |
df, func, num, kwargs = args | |
return num, df.apply(func, **kwargs) | |
def apply_by_multiprocessing(df,func,**kwargs): | |
workers = kwargs.pop('workers') | |
chunks = kwargs.pop('chunks') | |
with Pool(workers) as p: | |
apply_lst = [(d, func, i, kwargs) for i,d in enumerate(np.array_split(df, chunks))] | |
result = list(tqdm(p.imap(_apply_df, apply_lst), total=len(apply_lst))) | |
result=sorted(result,key=lambda x:x[0]) | |
return pd.concat([i[1] for i in result], sort = False) | |
def _apply_df_groupby(args): | |
group, func, name, kwargs = args | |
return name, func(group, **kwargs) | |
def multiprocessing_groupby(groupby, | |
func, | |
**kwargs): | |
workers = kwargs.pop('workers') | |
with Pool(workers) as p: | |
apply_lst = [(group, func, name, kwargs) for name,group in groupby] | |
result = list(tqdm(p.imap(_apply_df_groupby, apply_lst), total=len(apply_lst))) | |
result=sorted(result,key=lambda x:x[0]) | |
return pd.concat([i[1] for i in result], sort = False) | |
def list_multiprocessing(param_lst, | |
func, | |
**kwargs): | |
workers = kwargs.pop('workers') | |
with Pool(workers) as p: | |
apply_lst = [([params], func, i, kwargs) for i,params in enumerate(param_lst)] | |
result = list(tqdm(p.imap(_apply_lst, apply_lst), total=len(apply_lst))) | |
# lists do not need such sorting, but this can be useful later | |
result=sorted(result,key=lambda x:x[0]) | |
return [_[1] for _ in result] | |
def list_multiprocessing_nopbar(param_lst, | |
func, | |
**kwargs): | |
workers = kwargs.pop('workers') | |
with Pool(workers) as p: | |
apply_lst = [([params], func, i, kwargs) for i,params in enumerate(param_lst)] | |
result = list(p.imap(_apply_lst, apply_lst)) | |
# lists do not need such sorting, but this can be useful later | |
result=sorted(result,key=lambda x:x[0]) | |
return [_[1] for _ in result] | |
def _apply_lst(args): | |
params, func, num, kwargs = args | |
return num, func(*params,**kwargs) | |
def dict_multiprocessing(param_dict, | |
func, | |
**kwargs): | |
workers = kwargs.pop('workers') | |
with Pool(workers) as p: | |
apply_lst = [([params], func, i, keys, kwargs) for i,(keys,params) in enumerate(dct.items())] | |
result = list(tqdm(p.imap(_apply_dct, apply_lst), total=len(apply_lst))) | |
# dicts do not need such sorting, but this can be useful later | |
result=sorted(result,key=lambda x:x[0]) | |
return dict(zip([_[2] for _ in result], [_[1] for _ in result])) | |
def _apply_dct(args): | |
params, func, num, key, kwargs = args | |
return num, func(*params,**kwargs), key |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment