-
-
Save liuw/2407154 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python | |
# liuw | |
# Nasty hack to raise exception for other threads | |
import ctypes # Calm down, this has become standard library since 2.5 | |
import threading | |
import time | |
NULL = 0 | |
def ctype_async_raise(thread_obj, exception): | |
found = False | |
target_tid = 0 | |
for tid, tobj in threading._active.items(): | |
if tobj is thread_obj: | |
found = True | |
target_tid = tid | |
break | |
if not found: | |
raise ValueError("Invalid thread object") | |
ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(target_tid, ctypes.py_object(exception)) | |
# ref: http://docs.python.org/c-api/init.html#PyThreadState_SetAsyncExc | |
if ret == 0: | |
raise ValueError("Invalid thread ID") | |
elif ret > 1: | |
# Huh? Why would we notify more than one threads? | |
# Because we punch a hole into C level interpreter. | |
# So it is better to clean up the mess. | |
ctypes.pythonapi.PyThreadState_SetAsyncExc(target_tid, NULL) | |
raise SystemError("PyThreadState_SetAsyncExc failed") | |
print "Successfully set asynchronized exception for", target_tid | |
def f(): | |
try: | |
while True: | |
time.sleep(1) | |
finally: | |
print "Exited" | |
t = threading.Thread(target=f) | |
t.start() | |
print "Thread started" | |
print t.isAlive() | |
time.sleep(5) | |
ctype_async_raise(t, SystemExit) | |
t.join() | |
print t.isAlive() |
@jhcook , @stuaxo , @liuw anyone else having issues with locks and PyThreadState_SetAsyncExc? glenfant/stopit#6
Can't you use thread_obj.ident instead of looping through threading._active? Worried about id reuse?
Forked and ported to Python 3.4. Works pretty well even if remains one of the dirtiest hacks ever seen.
This will not work if the thread to kill is blocked at some I/O or sleep(very_long_time)
This helped me more than you can imagine. Kudos
This doesn't seem to work if the thread in question is doing a time.sleep
... anyone else notice this?
This doesn't seem to work if the thread in question is doing a
time.sleep
... anyone else notice this?
I believe this is because the exception is "async" since the API is _SetAsyncExc
.
In my use case, I call a subprocess in my target thread, and I have to change p.wait()
to a loop like
while time.time() < deadline:
try:
p.wait(0.1)
except subprocess.TimeoutExpired:
pass
to make the thread responsive to the exception.
Works pretty well so far, thank you so much.
note: this won't interrupt sockets/sleeps. for that you still need to send a signal.
This got a lot easier in Python 3.8 or higher, since Thread.native_id
was introduced:
import ctypes
from threading import Thread
from typing import Type
def raise_exception_in_thread(t: Thread, e: Type[BaseException]):
ctypes.pythonapi.PyThreadState_SetAsyncExc(t.native_id, ctypes.py_object(e))
edit: I tested this on Windows, but t.native_id
does not seem to work for me on Ubuntu. Gotta use t.ident
instead! This has the additional advantage, that it works on Python 3.3+
import ctypes
from threading import Thread
from typing import Type
def raise_exception_in_thread(t: Thread, e: Type[BaseException]):
ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(t.ident), ctypes.py_object(e))
I couldn't figure out an easy way to use exceptions with custom messages so below is a workaround for non-builtin exceptions.
def raise_exception_in_thread(thread_obj, exception_cls, message=None):
# Monkey-patch exception with a default message since PyThreadState_SetAsyncExc can only take a class
if message:
def __init__(self, *args, **kwargs):
nonlocal message
super(exception_cls, self).__init__(message, **kwargs)
exception_cls.__init__ = __init__
found = False
target_tid = 0
for tid, tobj in threading._active.items():
if tobj is thread_obj:
found = True
target_tid = tid
break
if not found:
raise ValueError("Invalid thread object")
ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(target_tid), ctypes.py_object(exception_cls))
# ref: http://docs.python.org/c-api/init.html#PyThreadState_SetAsyncExc
if ret == 0:
raise ValueError("Invalid thread ID")
elif ret > 1:
# Huh? Why would we notify more than one threads?
# Because we punch a hole into C level interpreter.
# So it is better to clean up the mess.
ctypes.pythonapi.PyThreadState_SetAsyncExc(target_tid, NULL)
raise SystemError("PyThreadState_SetAsyncExc failed")
print(f"Successfully set asynchronized exception for {target_tid}")
How about the following approach. If you would like to raise ValueError("Error Msg") You create a class with ExceptInstance. This will create a superclass of ValueError where the arg is passed in by init. This should work in most circumstances.
def ExceptInstance(except_class, *args, **kwds):
class Exceptor(except_class):
def __init__(self):
super().__init__(*args, **kwds)
return Exceptor
def program():
try:
thrower()
except ValueError as err:
print('ValueError', str(err))
def thrower():
Exceptor = ExceptInstance(ValueError, 'Threw a value_error')
raise Exceptor
program()
Check [SO]: Python2 killing a thread by PyThreadState_SetAsyncExc (@CristiFati's answer) for time.sleep.
@liuw Do you give permission to use this code as is?
@dstathis Yes. Do whatever you like with the code.
@liuw May I add a small contribution ?
This hack is actually of a great help ! Hereafter a version in a multiprocessing context:
The idea is to have the main process starts an observer thread with pipe to a third independant process and then proceeding with non blocking operations.
The third process can interrupt our main process whenever it needs by the mean of the pipe and the thread observer.
If the main process should be waiting on sockets or sleep, I strongly advise to implement a timeout loop, which is a good practice what so ever.
import ctypes
from time import sleep
from multiprocessing import Process, Pipe
from threading import Thread, get_ident
from typing import Type
def raise_exception_in_thread(tid: int, e: Type[BaseException]):
ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), ctypes.py_object(e))
if ret:
raise KeyboardInterrupt
else:
raise ValueError("Invalid thread ID")
def interrupt_process(send_conn, tid):
print("Starting process interrupter")
sleep(2)
send_conn.send(b"kill")
print("Ending process interrupter")
def observer_thread(recv_conn, tid):
print("Starting thread observer")
try:
while True:
if recv_conn.poll(timeout=0.5):
raise_exception_in_thread(tid, KeyboardInterrupt)
except KeyboardInterrupt:
print("Ending thread observer")
def main():
recv_conn, send_conn = Pipe()
tid = get_ident()
p = Process(target=interrupt_process, args=(send_conn, tid))
p.start()
t = Thread(target=observer_thread, args=(recv_conn, tid))
t.start()
try:
while True:
sleep(1)
except KeyboardInterrupt:
print("Main interruption")
finally:
p.join()
t.join()
if __name__ == "__main__":
main()
PS: I did not sync the printing part.
@glenfant: This worked and saved me loads of time on Mavericks! Ta