Created
March 21, 2024 12:51
-
-
Save freol35241/177f79e90e17763385a7dac390b90a3b to your computer and use it in GitHub Desktop.
Multiprocessing with multiprocess library
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Tuple, Callable, Dict, Any, Hashable, List | |
from multiprocess.pool import Pool | |
from dataclasses import dataclass, field | |
from uuid import uuid4 | |
@dataclass | |
class Task: | |
func: Callable | |
args: Tuple = () | |
kwargs: Dict = field(default_factory=lambda: {}) | |
def run_in_process_pool( | |
tasks: Dict[Hashable, Task], | |
no_of_processes: int | None = None, | |
context: Dict[str, Any] | None = None, | |
) -> Dict[str, Any]: | |
total_no_of_tasks = len(tasks) | |
ctx_id = uuid4() | |
with tqdm(total=total_no_of_tasks) as progess_bar: | |
def _initializer(): | |
# Create the globals | |
if context is not None: | |
globals()[ctx_id] = context | |
print(f"Process initialized successfully!") | |
def _wrap_task(task: Task): | |
def _wrapped_task(): | |
context = globals()[ctx_id] | |
return task.func(*task.args, **task.kwargs, context=context) | |
return _wrapped_task | |
def _progress_callback(*args, **kwargs): | |
progess_bar.update(1) | |
def _error_callback(the_error: Exception): | |
import traceback | |
traceback.print_exception(the_error) | |
print() | |
print("Starting sub-processes...") | |
with Pool(no_of_processes, initializer=_initializer) as pool: | |
apply_results = { | |
key: pool.apply_async( | |
_wrap_task(task), | |
callback=_progress_callback, | |
error_callback=_error_callback, | |
) | |
for key, task in tasks.items() | |
} | |
while not all( | |
[apply_result.ready() for apply_result in apply_results.values()] | |
): | |
time.sleep(1) | |
results = { | |
key: apply_result.get() for key, apply_result in apply_results.items() | |
} | |
progess_bar.close() | |
return results |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment