-
-
Save liuw/2407154 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python | |
# liuw | |
# Nasty hack to raise exception for other threads | |
import ctypes # Calm down, this has become standard library since 2.5 | |
import threading | |
import time | |
NULL = 0 | |
def ctype_async_raise(thread_obj, exception): | |
found = False | |
target_tid = 0 | |
for tid, tobj in threading._active.items(): | |
if tobj is thread_obj: | |
found = True | |
target_tid = tid | |
break | |
if not found: | |
raise ValueError("Invalid thread object") | |
ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(target_tid, ctypes.py_object(exception)) | |
# ref: http://docs.python.org/c-api/init.html#PyThreadState_SetAsyncExc | |
if ret == 0: | |
raise ValueError("Invalid thread ID") | |
elif ret > 1: | |
# Huh? Why would we notify more than one threads? | |
# Because we punch a hole into C level interpreter. | |
# So it is better to clean up the mess. | |
ctypes.pythonapi.PyThreadState_SetAsyncExc(target_tid, NULL) | |
raise SystemError("PyThreadState_SetAsyncExc failed") | |
print "Successfully set asynchronized exception for", target_tid | |
def f(): | |
try: | |
while True: | |
time.sleep(1) | |
finally: | |
print "Exited" | |
t = threading.Thread(target=f) | |
t.start() | |
print "Thread started" | |
print t.isAlive() | |
time.sleep(5) | |
ctype_async_raise(t, SystemExit) | |
t.join() | |
print t.isAlive() |
Check [SO]: Python2 killing a thread by PyThreadState_SetAsyncExc (@CristiFati's answer) for time.sleep.
@liuw Do you give permission to use this code as is?
@dstathis Yes. Do whatever you like with the code.
@liuw May I add a small contribution ?
This hack is actually of a great help ! Hereafter a version in a multiprocessing context:
The idea is to have the main process starts an observer thread with pipe to a third independant process and then proceeding with non blocking operations.
The third process can interrupt our main process whenever it needs by the mean of the pipe and the thread observer.
If the main process should be waiting on sockets or sleep, I strongly advise to implement a timeout loop, which is a good practice what so ever.
import ctypes
from time import sleep
from multiprocessing import Process, Pipe
from threading import Thread, get_ident
from typing import Type
def raise_exception_in_thread(tid: int, e: Type[BaseException]):
ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), ctypes.py_object(e))
if ret:
raise KeyboardInterrupt
else:
raise ValueError("Invalid thread ID")
def interrupt_process(send_conn, tid):
print("Starting process interrupter")
sleep(2)
send_conn.send(b"kill")
print("Ending process interrupter")
def observer_thread(recv_conn, tid):
print("Starting thread observer")
try:
while True:
if recv_conn.poll(timeout=0.5):
raise_exception_in_thread(tid, KeyboardInterrupt)
except KeyboardInterrupt:
print("Ending thread observer")
def main():
recv_conn, send_conn = Pipe()
tid = get_ident()
p = Process(target=interrupt_process, args=(send_conn, tid))
p.start()
t = Thread(target=observer_thread, args=(recv_conn, tid))
t.start()
try:
while True:
sleep(1)
except KeyboardInterrupt:
print("Main interruption")
finally:
p.join()
t.join()
if __name__ == "__main__":
main()
PS: I did not sync the printing part.
How about the following approach. If you would like to raise ValueError("Error Msg") You create a class with ExceptInstance. This will create a superclass of ValueError where the arg is passed in by init. This should work in most circumstances.