Skip to content

Instantly share code, notes, and snippets.

@adivekar-utexas
Created February 12, 2024 05:24
Show Gist options
  • Save adivekar-utexas/95758673d5014a9556a027a1712a80ca to your computer and use it in GitHub Desktop.
Save adivekar-utexas/95758673d5014a9556a027a1712a80ca to your computer and use it in GitHub Desktop.
How to ensure you kill a ThreadPoolExecutor or ProcessPoolExecutor in Python
from typing import *
import ctypes
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def stop_executor(
executor: Optional[Union[ThreadPoolExecutor, ProcessPoolExecutor]],
force: bool = True, ## Forcefully terminate, might lead to work being lost.
):
if executor is not None:
if isinstance(executor, ThreadPoolExecutor):
if force:
executor.shutdown(wait=False) ## Cancels pending items
for tid in worker_ids(executor):
kill_thread(tid, SystemExit) ## Note; after calling this, you can still submit
executor.shutdown(wait=False) ## Note; after calling this, you cannot submit
else:
executor.shutdown(wait=True)
del executor
elif isinstance(executor, ProcessPoolExecutor):
executor.shutdown(wait=True)
del executor
def kill_thread(tid: int, exctype: Type[BaseException]):
"""
Dirty hack to *actually* stop a thread: raises an exception in threads with this thread id.
How it works:
- kill_thread function uses ctypes.pythonapi.PyThreadState_SetAsyncExc to raise an exception in a thread.
- By passing SystemExit, it attempts to terminate the thread.
Risks and Considerations
- Resource Leaks: If the thread holds a lock or other resources, these may not be properly released.
- Data Corruption: If the thread is manipulating shared data, partial updates may lead to data corruption.
- Deadlocks: If the thread is killed while holding a lock that other threads are waiting on, it can cause a
deadlock.
- Undefined Behavior: The Python runtime does not expect threads to be killed in this manner, which may cause
undefined behavior.
"""
if not issubclass(exctype, BaseException):
raise TypeError("Only types derived from BaseException are allowed")
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), ctypes.py_object(exctype))
print(f'...killed thread ID: {tid}')
if res == 0:
raise ValueError(f"Invalid thread ID: {tid}")
elif res != 1:
# If it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect
ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), None)
raise SystemError("PyThreadState_SetAsyncExc failed")
def worker_ids(executor: Optional[Union[ThreadPoolExecutor, ProcessPoolExecutor]]) -> Set[int]:
if isinstance(executor, ThreadPoolExecutor):
return {th.ident for th in executor._threads}
elif isinstance(executor, ProcessPoolExecutor):
return {p.pid for p in executor._processes.values()}
raise NotImplementedError(f'Cannot get worker ids for executor of type: {executor}')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment