-
-
Save liuw/2407154 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python | |
# liuw | |
# Nasty hack to raise exception for other threads | |
import ctypes # Calm down, this has become standard library since 2.5 | |
import threading | |
import time | |
NULL = 0 | |
def ctype_async_raise(thread_obj, exception): | |
found = False | |
target_tid = 0 | |
for tid, tobj in threading._active.items(): | |
if tobj is thread_obj: | |
found = True | |
target_tid = tid | |
break | |
if not found: | |
raise ValueError("Invalid thread object") | |
ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(target_tid, ctypes.py_object(exception)) | |
# ref: http://docs.python.org/c-api/init.html#PyThreadState_SetAsyncExc | |
if ret == 0: | |
raise ValueError("Invalid thread ID") | |
elif ret > 1: | |
# Huh? Why would we notify more than one threads? | |
# Because we punch a hole into C level interpreter. | |
# So it is better to clean up the mess. | |
ctypes.pythonapi.PyThreadState_SetAsyncExc(target_tid, NULL) | |
raise SystemError("PyThreadState_SetAsyncExc failed") | |
print "Successfully set asynchronized exception for", target_tid | |
def f(): | |
try: | |
while True: | |
time.sleep(1) | |
finally: | |
print "Exited" | |
t = threading.Thread(target=f) | |
t.start() | |
print "Thread started" | |
print t.isAlive() | |
time.sleep(5) | |
ctype_async_raise(t, SystemExit) | |
t.join() | |
print t.isAlive() |
I couldn't figure out an easy way to use exceptions with custom messages so below is a workaround for non-builtin exceptions.
def raise_exception_in_thread(thread_obj, exception_cls, message=None):
# Monkey-patch exception with a default message since PyThreadState_SetAsyncExc can only take a class
if message:
def __init__(self, *args, **kwargs):
nonlocal message
super(exception_cls, self).__init__(message, **kwargs)
exception_cls.__init__ = __init__
found = False
target_tid = 0
for tid, tobj in threading._active.items():
if tobj is thread_obj:
found = True
target_tid = tid
break
if not found:
raise ValueError("Invalid thread object")
ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(target_tid), ctypes.py_object(exception_cls))
# ref: http://docs.python.org/c-api/init.html#PyThreadState_SetAsyncExc
if ret == 0:
raise ValueError("Invalid thread ID")
elif ret > 1:
# Huh? Why would we notify more than one threads?
# Because we punch a hole into C level interpreter.
# So it is better to clean up the mess.
ctypes.pythonapi.PyThreadState_SetAsyncExc(target_tid, NULL)
raise SystemError("PyThreadState_SetAsyncExc failed")
print(f"Successfully set asynchronized exception for {target_tid}")
How about the following approach. If you would like to raise ValueError("Error Msg") You create a class with ExceptInstance. This will create a superclass of ValueError where the arg is passed in by init. This should work in most circumstances.
def ExceptInstance(except_class, *args, **kwds):
class Exceptor(except_class):
def __init__(self):
super().__init__(*args, **kwds)
return Exceptor
def program():
try:
thrower()
except ValueError as err:
print('ValueError', str(err))
def thrower():
Exceptor = ExceptInstance(ValueError, 'Threw a value_error')
raise Exceptor
program()
Check [SO]: Python2 killing a thread by PyThreadState_SetAsyncExc (@CristiFati's answer) for time.sleep.
@liuw Do you give permission to use this code as is?
@dstathis Yes. Do whatever you like with the code.
@liuw May I add a small contribution ?
This hack is actually of a great help ! Hereafter a version in a multiprocessing context:
The idea is to have the main process starts an observer thread with pipe to a third independant process and then proceeding with non blocking operations.
The third process can interrupt our main process whenever it needs by the mean of the pipe and the thread observer.
If the main process should be waiting on sockets or sleep, I strongly advise to implement a timeout loop, which is a good practice what so ever.
import ctypes
from time import sleep
from multiprocessing import Process, Pipe
from threading import Thread, get_ident
from typing import Type
def raise_exception_in_thread(tid: int, e: Type[BaseException]):
ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), ctypes.py_object(e))
if ret:
raise KeyboardInterrupt
else:
raise ValueError("Invalid thread ID")
def interrupt_process(send_conn, tid):
print("Starting process interrupter")
sleep(2)
send_conn.send(b"kill")
print("Ending process interrupter")
def observer_thread(recv_conn, tid):
print("Starting thread observer")
try:
while True:
if recv_conn.poll(timeout=0.5):
raise_exception_in_thread(tid, KeyboardInterrupt)
except KeyboardInterrupt:
print("Ending thread observer")
def main():
recv_conn, send_conn = Pipe()
tid = get_ident()
p = Process(target=interrupt_process, args=(send_conn, tid))
p.start()
t = Thread(target=observer_thread, args=(recv_conn, tid))
t.start()
try:
while True:
sleep(1)
except KeyboardInterrupt:
print("Main interruption")
finally:
p.join()
t.join()
if __name__ == "__main__":
main()
PS: I did not sync the printing part.
This got a lot easier in Python 3.8 or higher, since
Thread.native_id
was introduced:edit: I tested this on Windows, but
t.native_id
does not seem to work for me on Ubuntu. Gotta uset.ident
instead! This has the additional advantage, that it works on Python 3.3+