Forked from yong27/apply_df_by_multiprocessing.py
Last active
January 30, 2023 03:03
-
-
Save hendra-herviawan/5ce0dead3b51e51a41696862e23effa7 to your computer and use it in GitHub Desktop.
pandas DataFrame apply multiprocessing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import multiprocessing | |
import pandas as pd | |
import numpy as np | |
def _apply_df(args): | |
df, func, kwargs = args | |
return df.apply(func, **kwargs) | |
def apply_by_multiprocessing(df, func, **kwargs): | |
workers = kwargs.pop('workers') | |
pool = multiprocessing.Pool(processes=workers) | |
result = pool.map(_apply_df, [(d, func, kwargs) | |
for d in np.array_split(df, workers)]) | |
pool.close() | |
return pd.concat(list(result)) | |
def square(x): | |
return x**x | |
if __name__ == '__main__': | |
df = pd.DataFrame({'a':range(10), 'b':range(10)}) | |
apply_by_multiprocessing(df, square, axis=1, workers=4) | |
## run by 4 processors | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
%%time | |
import re | |
from multiprocessing import Pool | |
num_cores = 2 | |
def tgty(x): | |
_brand_name = [] | |
if (isinstance(x['brand_name'], str)): | |
_brand_name = x['brand_name'] | |
else : | |
#_brand_name = set(x['name'].split(" ")).intersection(brand_name) | |
_brand_name = re.findall(brand_name_re, x['name']) | |
if ('A' in _brand_name) or (_brand_name == []): | |
#print(x['name'], _brand_name) | |
_brand_name = x['brand_name'] | |
else : | |
_brand_name = _brand_name[0] | |
#print(x['name'], _brand_name) | |
return _brand_name | |
def _apply_df(args): | |
_df, func, kwargs = args | |
return _df.apply(lambda x: func(x), axis=1) | |
def apply_by_multiprocessing(_df, func, **kwargs): | |
workers = kwargs.pop('workers') | |
with Pool(processes=workers) as pool: | |
result = pool.map(_apply_df, [(d, func, kwargs) for d in np.array_split(_df, workers)]) | |
return pd.concat(list(result)) | |
#name['name'] = name['name'].astype(str) | |
#name['brand_name'] = name.apply(lambda x: tgtg(x), axis = 1) | |
#df['brand_name'] | |
XXXXXX = apply_by_multiprocessing(df, tgty, workers=num_cores) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def applydf2(c, remove_stopwords=True): | |
data = pd.Series() | |
l = c.tolist() | |
#data = c.apply(lambda x: KaggleWord2VecUtility.review_to_sentences(x, remove_stopwords)) | |
data = [] | |
for x in l: | |
data += KaggleWord2VecUtility.review_to_wordlist(x, remove_stopwords) # paragraf to santance, santance to word | |
return data | |
def parallelize_dataframe2(func, c): | |
df_split = np.array_split(c, num_cores) | |
pool = Pool(num_cores) | |
#df = pd.concat(pool.map(func, df_split)) | |
#df = sum(pool.map(func, df_split), []) | |
df = pool.map(func, df_split) | |
pool.close() | |
pool.join() | |
return df | |
sentences1 = parallelize_dataframe(applydf, train["review"]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from multiprocessing import Process, Pool, Queue | |
def Myfunc2(s): | |
paramsCount = {'min_df':100, 'stop_words':'english', 'binary':False, | |
'ngram_range':(1,2), 'max_features':None, 'use_idf':False} | |
print ("TfidfVectorizer: {}".format(s)) | |
return {s+'_count': TfidfVectorizer(**paramsCount).fit_transform(df[s])} | |
if __name__ == '__main__': | |
embedding = {} | |
proc = {} | |
p = Pool(4) | |
proc['name_count'] = p.apply_async(Myfunc1, args=(["name"]) )import pandas as pd | |
import multiprocessing as mp | |
LARGE_FILE = "D:\\my_large_file.txt" | |
CHUNKSIZE = 100000 # processing 100,000 rows at a time | |
def process_frame(df): | |
# process data frame | |
return len(df) | |
if __name__ == '__main__': | |
reader = pd.read_table(LARGE_FILE, chunksize=CHUNKSIZE) | |
pool = mp.Pool(4) # use 4 processes | |
funclist = [] | |
for df in reader: | |
# process each data frame | |
f = pool.apply_async(process_frame,[df]) | |
funclist.append(f) | |
result = 0 | |
for f in funclist: | |
result += f.get(timeout=10) # timeout in 10 seconds | |
print "There are %d rows of data"%(result) | |
proc['name_desciption_count'] = p.apply_async(Myfunc1, args=(['name_desciption'] )) | |
proc['item_description_count'] = p.apply_async(Myfunc1, args=(['item_description'] )) | |
proc['name_tf'] = p.apply_async(Myfunc2, args=(["name"]) ) | |
proc['name_description_tf'] = p.apply_async(Myfunc2, args=(["name_desciption"]) ) | |
proc['item_description_tf'] = p.apply_async(Myfunc2, args=(["item_description"]) ) | |
for pp in proc: | |
embedding.update(proc[pp].get()) # will block | |
p.close() | |
X_train,X_valid,y_train,y_valid = hstackEmbedding(embedding,y) | |
del embedding; gc.collect() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# http://gouthamanbalaraman.com/blog/distributed-processing-pandas.html | |
import pandas as pd | |
import multiprocessing as mp | |
LARGE_FILE = "D:\\my_large_file.txt" | |
CHUNKSIZE = 100000 # processing 100,000 rows at a time | |
def process_frame(df): | |
# process data frame | |
return len(df) | |
if __name__ == '__main__': | |
reader = pd.read_table(LARGE_FILE, chunksize=CHUNKSIZE) | |
pool = mp.Pool(4) # use 4 processes | |
funclist = [] | |
for df in reader: | |
# process each data frame | |
f = pool.apply_async(process_frame,[df]) | |
funclist.append(f) | |
result = 0 | |
for f in funclist: | |
result += f.get(timeout=10) # timeout in 10 seconds | |
print "There are %d rows of data"%(result) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment