Skip to content

Instantly share code, notes, and snippets.

@bachkukkik
Last active December 14, 2020 10:40
Show Gist options
  • Save bachkukkik/110b3b0f46eacbd70de123dae4bae47d to your computer and use it in GitHub Desktop.
Save bachkukkik/110b3b0f46eacbd70de123dae4bae47d to your computer and use it in GitHub Desktop.
### !pip install pathos
import multiprocessing
import concurrent.futures
from pathos.multiprocessing import ProcessingPool
workers = int(multiprocessing.cpu_count())
@errors_pusher
def parallel_threading(main_func,
lst_args: list,
workers: int=workers) -> list:
"""
Return list of results produced by main_func
parallel using concurrent.futures.ThreadPoolExecutor
Example
--------
>>> import concurrent.futures
>>> from pathos.multiprocessing import ProcessingPool
>>> lst_args = [1,2,3]
>>> main_func = def plus_one(i): return i+1
>>> parallel_threading(main_func, lst_args)
"""
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor: ### threading
result = executor.map(lambda x: main_func(*x), lst_args)
return [r for r in result]
@errors_pusher
def parallel_processing(main_func,
lst_args: list,
workers: int=workers) -> list:
"""
Return list of results produced by main_func
parallel using concurrent.futures.ThreadPoolExecutor
Example
--------
>>> import concurrent.futures
>>> from pathos.multiprocessing import ProcessingPool
>>> lst_args = [1,2,3]
>>> main_func = def plus_one(i): return i+1
>>> parallel_processing(main_func, lst_args)
"""
with ProcessingPool() as pool:
result = pool.map(lambda x: main_func(*x), lst_args)
return [r for r in result]
@errors_pusher
def parallel_reinforcing(parallel_func,
main_func,
list_of_args_tuple: list,
default_vals=None,
type_result: type=pd.core.frame.DataFrame,
attempt: int=5
) -> list:
"""
Return list of results produced by main_func
Sometime parallelling functions might not produce expected results.
Hence, we need to rerun main_func until expected type_result is stisfied
Parameters
--------
parallel_func: [parallel_threading, parallel_processing], pick one
main_func: a function name; function to be parallelized
list_of_args_tuple: list of tuples of arguments required by main_func;
arguments in tuple must be lined-up in order
default_vals: a value marked as missing
type_result: an expected Python type for each result
Example
--------
>>> import requests
>>> sess = requests.Session()
>>> parallel_reinforcing(parallel_threading, main_func, [(sess, 1), (sess, 2)], type_result=pd.core.frame.DataFrame)
"""
## keys of dict has to be hashable, so we enforce type str.
lst_key_tracker = [str(tup) for tup in list_of_args_tuple]
dct_tracker = dict(zip(lst_key_tracker, len(list_of_args_tuple)*[default_vals]))
lst_type_checker = list(dct_tracker.values())
#### run main_func if type_result is still not met
while len(lst_type_checker) != 0 and attempt > 0:
lst_value_tracker = parallel_func(main_func, list_of_args_tuple)
## update missing
dct_update = dict(zip(lst_key_tracker, lst_value_tracker))
dct_tracker.update(dct_update)
lst_type_checker = [k for k,v in dct_tracker.items() if type(v)!=type_result]
## prevent forever looping
attempt -= 1
return list(dct_tracker.values())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment