Created
February 12, 2024 05:24
-
-
Save adivekar-utexas/95758673d5014a9556a027a1712a80ca to your computer and use it in GitHub Desktop.
How to ensure you kill a ThreadPoolExecutor or ProcessPoolExecutor in Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import * | |
import ctypes | |
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor | |
def stop_executor( | |
executor: Optional[Union[ThreadPoolExecutor, ProcessPoolExecutor]], | |
force: bool = True, ## Forcefully terminate, might lead to work being lost. | |
): | |
if executor is not None: | |
if isinstance(executor, ThreadPoolExecutor): | |
if force: | |
executor.shutdown(wait=False) ## Cancels pending items | |
for tid in worker_ids(executor): | |
kill_thread(tid, SystemExit) ## Note; after calling this, you can still submit | |
executor.shutdown(wait=False) ## Note; after calling this, you cannot submit | |
else: | |
executor.shutdown(wait=True) | |
del executor | |
elif isinstance(executor, ProcessPoolExecutor): | |
executor.shutdown(wait=True) | |
del executor | |
def kill_thread(tid: int, exctype: Type[BaseException]): | |
""" | |
Dirty hack to *actually* stop a thread: raises an exception in threads with this thread id. | |
How it works: | |
- kill_thread function uses ctypes.pythonapi.PyThreadState_SetAsyncExc to raise an exception in a thread. | |
- By passing SystemExit, it attempts to terminate the thread. | |
Risks and Considerations | |
- Resource Leaks: If the thread holds a lock or other resources, these may not be properly released. | |
- Data Corruption: If the thread is manipulating shared data, partial updates may lead to data corruption. | |
- Deadlocks: If the thread is killed while holding a lock that other threads are waiting on, it can cause a | |
deadlock. | |
- Undefined Behavior: The Python runtime does not expect threads to be killed in this manner, which may cause | |
undefined behavior. | |
""" | |
if not issubclass(exctype, BaseException): | |
raise TypeError("Only types derived from BaseException are allowed") | |
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), ctypes.py_object(exctype)) | |
print(f'...killed thread ID: {tid}') | |
if res == 0: | |
raise ValueError(f"Invalid thread ID: {tid}") | |
elif res != 1: | |
# If it returns a number greater than one, you're in trouble, | |
# and you should call it again with exc=NULL to revert the effect | |
ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), None) | |
raise SystemError("PyThreadState_SetAsyncExc failed") | |
def worker_ids(executor: Optional[Union[ThreadPoolExecutor, ProcessPoolExecutor]]) -> Set[int]: | |
if isinstance(executor, ThreadPoolExecutor): | |
return {th.ident for th in executor._threads} | |
elif isinstance(executor, ProcessPoolExecutor): | |
return {p.pid for p in executor._processes.values()} | |
raise NotImplementedError(f'Cannot get worker ids for executor of type: {executor}') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment