Last active
February 26, 2024 11:22
-
-
Save jelmervdl/62a7f357c40680b26c0e632f263d3864 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import multiprocessing as mp | |
from functools import wraps | |
from typing import Callable, Iterable, ParamSpec, TypeVar, Union | |
class EndOfIter: | |
"""Signals end of the offloaded iterator""" | |
exc: Optional[Exception] | |
def __init__(self, exc: Optional[Exception] = None): | |
self.exc = exc | |
P = ParamSpec("P") | |
R = TypeVar("R") | |
def offload_iter( | |
fn: Callable[P, Iterable[R]], *, maxsize: int = 0 | |
) -> Callable[P, Iterable[R]]: | |
"""Offload a generator function to another process.""" | |
def offload_iter_worker( | |
queue: "mp.Queue[Union[R,EndOfIter]]", | |
fn: Callable[P, Iterable[R]], | |
args: P.args, | |
kwargs: P.kwargs, | |
) -> None: | |
""" | |
Function executed in the other process that runs the generator | |
and puts its items on the queue. | |
""" | |
try: | |
for item in fn(*args, **kwargs): | |
queue.put(item) | |
queue.put(EndOfIter()) | |
except Exception as exc: | |
queue.put(EndOfIter(exc)) | |
@wraps(fn) | |
def wrapper(*args, **kwargs) -> Iterable[R]: | |
""" | |
Wrapper around the generator that communicates with the offloaded | |
worker. If an exception is thrown in the offloaded generator, it will | |
be copied and raised by this wrapper as well. | |
""" | |
queue: "mp.Queue[Union[R,EndOfIter]]" = mp.Queue(maxsize) | |
proc = mp.Process( | |
target=offload_iter_worker, args=(queue, fn, args, kwargs), daemon=True | |
) | |
proc.start() | |
val: Union[R, EndOfIter] | |
while True: | |
val = queue.get() | |
if not isinstance(val, EndOfIter): | |
yield val | |
else: | |
break | |
proc.join() | |
if val.exc: | |
raise val.exc | |
return wrapper |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment