Last active
February 6, 2021 04:59
-
-
Save yeraydiazdiaz/b8c059c6dcfaf3255c65806de39175a7 to your computer and use it in GitHub Desktop.
Cancellation on run_in_executor using ThreadPoolExecutor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
A demonstration on how raising KeyboardInterrupt in the context of tasks | |
spawned via asyncio's loop.run_in_executor does not cancel the threads | |
using any of the cancellation methods in asyncio Futures. | |
The only "proper" way to cancel is to: | |
1. unregister the `atexit` registered `_python_exit` function | |
2. call `shutdown(wait=False)` | |
The reason is that the `thread` module registers `_python_exit` forcing a | |
join of the remaining worker threads (https://github.com/python/cpython/blob/master/Lib/concurrent/futures/thread.py#L33-L42) | |
This alone is not enough as `shutdown` will also join the threads by default | |
so we need to pass `wait=False` to the call as well. | |
""" | |
import asyncio | |
import atexit | |
import time | |
import concurrent.futures | |
def sleepy(n=10): | |
try: | |
print('Sleepy') | |
time.sleep(n) | |
print('Done sleeping') | |
return n | |
except Exception as e: | |
# not triggered by cancellation | |
print('Got an exception ', e) | |
raise | |
def main(executor): | |
try: | |
print(f'Running sleepy in executor {executor}') | |
futures = [loop.run_in_executor(executor, sleepy) for _ in range(10)] | |
return futures | |
except Exception as e: | |
# not triggered by cancellation | |
print('Got an exception ', e) | |
raise | |
loop = asyncio.get_event_loop() | |
atexit.unregister(concurrent.futures.thread._python_exit) | |
try: | |
executor = concurrent.futures.ThreadPoolExecutor() | |
futures = main(executor) | |
future = asyncio.gather(*futures) | |
result = loop.run_until_complete(future) | |
print('Result: ', result) | |
except KeyboardInterrupt: | |
print('Main thread') | |
# cancelling the gather future marks them as cancelled but doesn't stop the threads | |
future.cancel() | |
for fut in futures: | |
if not fut.done(): | |
# this will not be executed as cancelling the parent gather cancels child futures | |
print('Cancelling: ', fut) | |
fut.cancel() | |
# shutdown is design to stop new work coming in, but not to stop already existing work | |
# https://github.com/python/cpython/blob/master/Lib/concurrent/futures/thread.py#L198-L204 | |
executor.shutdown(wait=False) | |
finally: | |
# when threads return the loop is already closed and a lot of | |
# `RuntimeError: Event loop is closed` are raised | |
loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
In Python >= 3.7 the following part will return an error:
In order to workaround this issue you can add the following line to the importing part of your program: