Skip to content

Instantly share code, notes, and snippets.

@internetimagery
Created June 13, 2022 10:22
Show Gist options
  • Save internetimagery/69d8e96682c75d0fb312513d73cf6e81 to your computer and use it in GitHub Desktop.
Save internetimagery/69d8e96682c75d0fb312513d73cf6e81 to your computer and use it in GitHub Desktop.
Simple decorator to run generator in another process
from multiprocessing import Queue, Event, Process, ProcessError
from inspect import isgeneratorfunction
from functools import wraps, partial
from queue import Empty
def yieldFromProcess(initializer=None):
"""
Decorator to run a generator function in another process.
All data in and out needs to be pickleable.
Usual caution with processes should be adhered to. Such as not relying on shared
state across process boundaries.
>>> @yieldFromProcess()
>>> def walk(meters):
>>> for i in range(meters):
>>> yield "I have walked {} meters".format(i)
>>> for message in walk(10):
>>> print(message)
Args:
initializer (Optional[Callable[[], Any]]): Optional callable to trigger a setup at the start of the process
Returns:
Callable[Callable[..., Iterator[T]], Callable[..., Iterator[T]]]: Returns decorator
"""
def decorator(func):
if not isgeneratorfunction(func):
raise TypeError("Function needs to yield values. {}".format(func))
@wraps(func)
def wrapper(*args, **kwargs):
# Set up a queue to return our data.
# Set up an event to stop the iteration early if the loop here is exited early.
# Set up process to run the logic.
queue = Queue()
stop = Event()
proc = Process(
target=_iterateInProcess,
args=(initializer, stop, queue, partial(func, *args, **kwargs)),
)
proc.start()
try:
# Keep reading data back from the process as it comes in, yielding it to the caller.
while True:
try:
# Read data back from the queue. But allow for a deadlocked pipe in case
# the process has died unexpectedly. Such as a segfault.
result = queue.get(timeout=30)
except Empty:
if proc.is_alive():
continue
raise ProcessError("Process Died unexpectedly with exit code: {}".format(proc.exitcode))
if getattr(result, "__class__", None) is _StopProcessing:
# We have signaled that the process is finishing iterating.
if result.error:
raise result.error
break
yield result
finally:
# Ensure process loop stops first, then clear queue.
# This is inverse order to the process (puts in queue then checks before next loop).
stop.set()
try:
while True:
queue.get_nowait()
except Empty:
pass
# Give the process a chance to finish gracefully. Else leave it.
proc.join(timeout=10)
return wrapper
return decorator
class _StopProcessing(StopIteration):
"""
Special Error raised from the process when it is done.
Will optionally contain an error that can be raised in the main process.
Args:
error (Optional[Exception]): Error if one was thrown. Else nothing.
"""
def __init__(self, error=None):
super(_StopProcessing, self).__init__(error)
@property
def error(self):
return self.args[0]
def _iterateInProcess(initializer, stop, queue, func):
""" Run in separate process. Loop through iterator. Return results through the queue. """
try:
if initializer:
initializer()
for result in func():
# Put in our calculated result first, then check if we should be stopping.
queue.put(result)
if stop.is_set():
break
except Exception as err:
queue.put(_StopProcessing(err))
else:
queue.put(_StopProcessing())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment