Created
June 13, 2022 10:22
-
-
Save internetimagery/69d8e96682c75d0fb312513d73cf6e81 to your computer and use it in GitHub Desktop.
Simple decorator to run generator in another process
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from multiprocessing import Queue, Event, Process, ProcessError | |
from inspect import isgeneratorfunction | |
from functools import wraps, partial | |
from queue import Empty | |
def yieldFromProcess(initializer=None): | |
""" | |
Decorator to run a generator function in another process. | |
All data in and out needs to be pickleable. | |
Usual caution with processes should be adhered to. Such as not relying on shared | |
state across process boundaries. | |
>>> @yieldFromProcess() | |
>>> def walk(meters): | |
>>> for i in range(meters): | |
>>> yield "I have walked {} meters".format(i) | |
>>> for message in walk(10): | |
>>> print(message) | |
Args: | |
initializer (Optional[Callable[[], Any]]): Optional callable to trigger a setup at the start of the process | |
Returns: | |
Callable[Callable[..., Iterator[T]], Callable[..., Iterator[T]]]: Returns decorator | |
""" | |
def decorator(func): | |
if not isgeneratorfunction(func): | |
raise TypeError("Function needs to yield values. {}".format(func)) | |
@wraps(func) | |
def wrapper(*args, **kwargs): | |
# Set up a queue to return our data. | |
# Set up an event to stop the iteration early if the loop here is exited early. | |
# Set up process to run the logic. | |
queue = Queue() | |
stop = Event() | |
proc = Process( | |
target=_iterateInProcess, | |
args=(initializer, stop, queue, partial(func, *args, **kwargs)), | |
) | |
proc.start() | |
try: | |
# Keep reading data back from the process as it comes in, yielding it to the caller. | |
while True: | |
try: | |
# Read data back from the queue. But allow for a deadlocked pipe in case | |
# the process has died unexpectedly. Such as a segfault. | |
result = queue.get(timeout=30) | |
except Empty: | |
if proc.is_alive(): | |
continue | |
raise ProcessError("Process Died unexpectedly with exit code: {}".format(proc.exitcode)) | |
if getattr(result, "__class__", None) is _StopProcessing: | |
# We have signaled that the process is finishing iterating. | |
if result.error: | |
raise result.error | |
break | |
yield result | |
finally: | |
# Ensure process loop stops first, then clear queue. | |
# This is inverse order to the process (puts in queue then checks before next loop). | |
stop.set() | |
try: | |
while True: | |
queue.get_nowait() | |
except Empty: | |
pass | |
# Give the process a chance to finish gracefully. Else leave it. | |
proc.join(timeout=10) | |
return wrapper | |
return decorator | |
class _StopProcessing(StopIteration): | |
""" | |
Special Error raised from the process when it is done. | |
Will optionally contain an error that can be raised in the main process. | |
Args: | |
error (Optional[Exception]): Error if one was thrown. Else nothing. | |
""" | |
def __init__(self, error=None): | |
super(_StopProcessing, self).__init__(error) | |
@property | |
def error(self): | |
return self.args[0] | |
def _iterateInProcess(initializer, stop, queue, func): | |
""" Run in separate process. Loop through iterator. Return results through the queue. """ | |
try: | |
if initializer: | |
initializer() | |
for result in func(): | |
# Put in our calculated result first, then check if we should be stopping. | |
queue.put(result) | |
if stop.is_set(): | |
break | |
except Exception as err: | |
queue.put(_StopProcessing(err)) | |
else: | |
queue.put(_StopProcessing()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment