Last active
September 6, 2022 07:54
-
-
Save njsmith/be24d376faea61cb3999f08318b780f3 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import trio | |
# Process pool based on concurrent.futures | |
import concurrent.futures | |
class TrioProcessExecutor(trio.abc.AsyncResource): | |
def __init__(self, max_workers=None): | |
self._executor = concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) | |
async def run_sync(self, fn, *args): | |
fut = self._executor.submit(fn, *args) | |
task = trio.hazmat.current_task() | |
token = trio.hazmat.current_trio_token() | |
def cb(_): | |
# If we successfully cancelled from cancel_fn, then | |
# this callback still gets called, but we were already | |
# rescheduled so we don't need to do it again. | |
if not fut.cancelled(): | |
token.run_sync_soon(trio.hazmat.reschedule, task) | |
fut.add_done_callback(cb) | |
def cancel_fn(_): | |
if fut.cancel(): | |
return trio.hazmat.Abort.SUCCEEDED | |
else: | |
return trio.hazmat.Abort.FAILED | |
await trio.hazmat.wait_task_rescheduled(cancel_fn) | |
assert fut.done() | |
return fut.result() | |
async def aclose(self): | |
# shutdown() has no cancellation support, so we just have to wait it out | |
with trio.open_cancel_scope(shield=True): | |
await trio.run_sync_in_worker_thread(self._executor.shutdown()) |
@miracle2k Oh, nice diagnosis. I updated the gist to check for fut.cancelled()
.
cb
runs and callstrio.hazmat.reschedule
(which btw in my version of the code is run throughtoken.run_sync_soon
- I don't remember if or why I added this.
Huh! You're right, it looks like ProcessPoolExecutor
calls the callback from another thread, and the docs permit this. Okay, I added the token to the gist too :-)
Hey, I have a need for this one too! Did it already make it's way into trio or some add-on library?
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I had been using some code adapted from this, and it tended to crash about once or twice per day. I think I figured out the culprit. There is a race condition here, what can happen is the following:
wait_task_rescheduled
.cancel_fn()
runs in the trio thread.fut.cancel()
returnsTrue
and we give back to trioAbort.SUCCEEDED
, which makes trio reschedule the task.cb
runs and callstrio.hazmat.reschedule
(which btw in my version of the code is run throughtoken.run_sync_soon
- I don't remember if or why I added this.run_sync
is invoked,rio.hazmat.wait_task_rescheduled
almost immediately returns (having a reschedule already waiting for it?), at which pointassert fut.done()
fails.I changed it to make
cb
check forfuture.cancelled()
.