Last active
December 14, 2020 10:40
-
-
Save bachkukkik/110b3b0f46eacbd70de123dae4bae47d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
### !pip install pathos | |
import multiprocessing | |
import concurrent.futures | |
from pathos.multiprocessing import ProcessingPool | |
workers = int(multiprocessing.cpu_count()) | |
@errors_pusher | |
def parallel_threading(main_func, | |
lst_args: list, | |
workers: int=workers) -> list: | |
""" | |
Return list of results produced by main_func | |
parallel using concurrent.futures.ThreadPoolExecutor | |
Example | |
-------- | |
>>> import concurrent.futures | |
>>> from pathos.multiprocessing import ProcessingPool | |
>>> lst_args = [1,2,3] | |
>>> main_func = def plus_one(i): return i+1 | |
>>> parallel_threading(main_func, lst_args) | |
""" | |
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor: ### threading | |
result = executor.map(lambda x: main_func(*x), lst_args) | |
return [r for r in result] | |
@errors_pusher | |
def parallel_processing(main_func, | |
lst_args: list, | |
workers: int=workers) -> list: | |
""" | |
Return list of results produced by main_func | |
parallel using concurrent.futures.ThreadPoolExecutor | |
Example | |
-------- | |
>>> import concurrent.futures | |
>>> from pathos.multiprocessing import ProcessingPool | |
>>> lst_args = [1,2,3] | |
>>> main_func = def plus_one(i): return i+1 | |
>>> parallel_processing(main_func, lst_args) | |
""" | |
with ProcessingPool() as pool: | |
result = pool.map(lambda x: main_func(*x), lst_args) | |
return [r for r in result] | |
@errors_pusher | |
def parallel_reinforcing(parallel_func, | |
main_func, | |
list_of_args_tuple: list, | |
default_vals=None, | |
type_result: type=pd.core.frame.DataFrame, | |
attempt: int=5 | |
) -> list: | |
""" | |
Return list of results produced by main_func | |
Sometime parallelling functions might not produce expected results. | |
Hence, we need to rerun main_func until expected type_result is stisfied | |
Parameters | |
-------- | |
parallel_func: [parallel_threading, parallel_processing], pick one | |
main_func: a function name; function to be parallelized | |
list_of_args_tuple: list of tuples of arguments required by main_func; | |
arguments in tuple must be lined-up in order | |
default_vals: a value marked as missing | |
type_result: an expected Python type for each result | |
Example | |
-------- | |
>>> import requests | |
>>> sess = requests.Session() | |
>>> parallel_reinforcing(parallel_threading, main_func, [(sess, 1), (sess, 2)], type_result=pd.core.frame.DataFrame) | |
""" | |
## keys of dict has to be hashable, so we enforce type str. | |
lst_key_tracker = [str(tup) for tup in list_of_args_tuple] | |
dct_tracker = dict(zip(lst_key_tracker, len(list_of_args_tuple)*[default_vals])) | |
lst_type_checker = list(dct_tracker.values()) | |
#### run main_func if type_result is still not met | |
while len(lst_type_checker) != 0 and attempt > 0: | |
lst_value_tracker = parallel_func(main_func, list_of_args_tuple) | |
## update missing | |
dct_update = dict(zip(lst_key_tracker, lst_value_tracker)) | |
dct_tracker.update(dct_update) | |
lst_type_checker = [k for k,v in dct_tracker.items() if type(v)!=type_result] | |
## prevent forever looping | |
attempt -= 1 | |
return list(dct_tracker.values()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment