Created
November 8, 2020 18:18
-
-
Save M-Anwar/502ef6f629b7abfbdc4cb1e328bb122c to your computer and use it in GitHub Desktop.
A quick way to distribute a python task with differing parameters across a thread pool.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import concurrent.futures | |
from tqdm import tqdm | |
def threadPoolCall(func, task_args, max_workers = 8, raise_exception=True, log_errors=True, show_progress=True, description=None): | |
""" Distributes out a task between a pool of worker threads and collects | |
the results. The task is a python function, and the arguments to the function | |
are specified in an array of tuple arguments, each element representing a | |
task. | |
Args: | |
func -- [Python.function] The function reference for the task | |
task_args -- [Array[Tuple]] The arguments to pass to the function | |
max_workers -- [int] The number of worker pool threads to use. Default 8. | |
raise_exception -- [bool] Whether to throw an error on exception. Default true. | |
log_errors -- [bool] Whether to log errors, works in tandem with raise_exception | |
to control how errors are handled. Default true | |
show_progress - [bool] Whether to show a tqdm progress bar for the tasks. Default true. | |
description - [string] The description for the progress bar. Default function name. | |
""" | |
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: | |
if show_progress: | |
pbar = tqdm(total=len(task_args)) | |
if description: | |
pbar.set_description(description) | |
else: | |
pbar.set_description(func.__name__) | |
futures = { | |
executor.submit( | |
func, | |
*(args,) if type(args)!=tuple else args | |
):args | |
for args in task_args | |
} | |
results = [] | |
for future in concurrent.futures.as_completed(futures): | |
if show_progress: pbar.update(1) | |
try: | |
results.append(future.result()) | |
except Exception as exc: | |
if log_errors: | |
print("[ERROR] Input to {} -> {} : {}".format(func.__name__, futures[future], exc)) | |
if raise_exception: | |
if show_progress: pbar.close() | |
raise exc | |
if show_progress: pbar.close() | |
return results |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment