Last active
October 9, 2015 21:53
-
-
Save Makman2/88ac3f8ce5bfcc5237a5 to your computer and use it in GitHub Desktop.
WIP-Prototype for InterruptableThread
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Prototype for an InterruptableProcess. | |
# 8.10.2015 --> Makman2 | |
import ctypes | |
import multiprocessing | |
import threading | |
def _wait_for_single(*events): | |
""" | |
Waits for at least one event being set. | |
Returns immediately if an event was set before invoking this function. | |
:param events: The events to wait for. | |
""" | |
def set_callback(self): | |
control_event.set() | |
self._old_set() | |
if not any(event.is_set() for event in events): | |
control_event = multiprocessing.Event() | |
for event in events: | |
event._old_set = event.set | |
event.set = set_callback | |
control_event.wait() | |
for event in events: | |
event.set = event._old_set | |
delattr(event, "_old_set") | |
class InterruptableProcess(multiprocessing.Process): | |
""" | |
A `multiprocessing.Process` with interrupt capability. | |
""" | |
def __init__(self, | |
group=None, | |
target=None, | |
name=None, | |
args=(), | |
kwargs={}, | |
daemon=None): | |
""" | |
Instantiates a new InterruptableProcess. | |
See also `threading.Thread` or `multiprocessing.Process` class. | |
:param group: The process group. This value exists for sole | |
compatability with the `threading` module and should | |
always be `None`. | |
:param target: The target method to run. | |
:param name: The name of the process. This does not have to be a | |
unique name. Supplying `None` generates one | |
automatically. | |
:param args: Target method invocation arguments. | |
:param kwargs: Target method key-value invocation arguments. | |
:param daemon: Specifies if this is a daemon process. | |
""" | |
# TODO What's the `*` parameter in python docs for mp.Process? | |
multiprocessing.Process.__init__(self, | |
group, | |
target, | |
name, | |
args, | |
kwargs, | |
daemon=daemon) | |
self._interrupt_event = multiprocessing.Event() | |
self._was_started = False | |
self._exception_pipe_receiver, self._exception_pipe_sender = ( | |
multiprocessing.Pipe(duplex=False)) | |
def interrupt(self): | |
""" | |
Safely interrupts the process. | |
This raises a KeyboardInterrupt inside the given target. | |
This call blocks until the interrupt request was successfully delivered. | |
Further calls to this method after a successful interrupt raise a | |
`RuntimeError`. Calling this function again after a fail is obviously | |
possible. | |
:raises RuntimeError: Raised under following conditions: | |
- An interrupt is already requested but not | |
delegated to the target. | |
- Process was not started. | |
- Process is not alive any more. | |
- The interrupt request failed (for unknown | |
reasons). | |
""" | |
# TODO Define more Errors so they can be catched better. | |
# Suggestion: | |
# - RuntimeError: For "Process was not started" and "not alive any more" | |
# - InterruptError: For "Interrupt failed" and "Already delegating" | |
# Further distinction? | |
if self._interrupt_event.is_set(): | |
# Catch an already set Event, otherwise we would wait for the | |
# interrupt exception notification from the Pipe forever. | |
raise RuntimeError("Already delegating an interrupt.") | |
if not self._was_started: | |
raise RuntimeError("Process was not started.") | |
if not self.is_alive(): | |
raise RuntimeError("Process is not alive any more.") | |
self._interrupt_event.set() | |
exception = self._exception_pipe_receiver.recv() | |
if exception is not None: | |
raise RuntimeError("Interrupt failed.") from exception | |
def start(self): | |
""" | |
Starts the process. | |
""" | |
multiprocessing.Process.start(self) | |
self._was_started = True | |
def run(self): | |
""" | |
Runs the process target. | |
""" | |
worker_thread_joined_event = threading.Event() | |
def worker(self): | |
multiprocessing.Process.run(self) | |
worker_thread_joined_event.set() | |
worker_thread = threading.Thread(target=worker, args=(self,)) | |
worker_thread.start() | |
while True: | |
_wait_for_single(self._interrupt_event, worker_thread_joined_event) | |
if worker_thread_joined_event.is_set(): | |
if self._interrupt_event.is_set(): | |
# Received interrupt request but thread already finished. | |
# But don't clear the interrupt event so we can block | |
# further interrupt requests. | |
self._exception_pipe_sender.send(None) | |
break | |
elif self._interrupt_event.is_set(): | |
try: | |
InterruptableThread._raise_exception_async( | |
worker_thread, | |
KeyboardInterrupt) | |
self._exception_pipe_sender.send(None) | |
self._interrupt_event.clear() | |
break | |
except BaseException as ex: | |
self._exception_pipe_sender.send(ex) | |
# It's good practice to join each thread, even if we can be sure it | |
# isn't alive any more. | |
worker_thread.join() | |
@staticmethod | |
def _raise_exception_async(thread, exception): | |
""" | |
Raises an exception asynchronously in another thread. | |
For threads inside the `threading` module. | |
:param thread: The thread where to raise an exception | |
asynchronously. | |
:param exception: The exception to raise in thread. | |
:raises ValueError: Raised when supplying an invalid or not running | |
thread. | |
:raises SystemError: Raised when the async-raise failed on the given | |
thread. | |
""" | |
if not thrad.is_alive(): | |
raise ValueError("Thread is not running.") | |
# It's not possible (as far as I know) to define an atomic section of | |
# code (whether using the Global Interpreter Lock or other mechanisms). | |
# We have to hope that Python does not interrupt after the async | |
# exception request so we can check if everything is alright and revert | |
# in case. If we have bad luck and the async-exception-set fails, the | |
# target thread gets executed while it exceeds the count of statements | |
# for the next exception check. If this happens the exception is not | |
# reverted, we address more than one thread and everything is going to | |
# be unstable... It's even not possible with a native C-module. | |
# TODO: Really even not possible with C? | |
result = ctypes.pythonapi.PyThreadState_SetAsyncExc( | |
thread.ident, | |
ctypes.py_object(exception)) | |
if result == 0: | |
raise ValueError("Invalid thread ID.") | |
elif result > 1: | |
# Something went wrong, we addressed more than one thread with the | |
# async exception request. Revert. | |
ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, None) | |
raise SystemError("Failed to set an asynchronous exception for " | |
"given thread. More than one thread was " | |
"addressed by the request.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment