Skip to content

Instantly share code, notes, and snippets.

@njsmith
Last active September 6, 2022 07:54
Show Gist options
  • Save njsmith/be24d376faea61cb3999f08318b780f3 to your computer and use it in GitHub Desktop.
Save njsmith/be24d376faea61cb3999f08318b780f3 to your computer and use it in GitHub Desktop.
import trio
# Process pool based on concurrent.futures
import concurrent.futures
class TrioProcessExecutor(trio.abc.AsyncResource):
def __init__(self, max_workers=None):
self._executor = concurrent.futures.ProcessPoolExecutor(max_workers=max_workers)
async def run_sync(self, fn, *args):
fut = self._executor.submit(fn, *args)
task = trio.hazmat.current_task()
token = trio.hazmat.current_trio_token()
def cb(_):
# If we successfully cancelled from cancel_fn, then
# this callback still gets called, but we were already
# rescheduled so we don't need to do it again.
if not fut.cancelled():
token.run_sync_soon(trio.hazmat.reschedule, task)
fut.add_done_callback(cb)
def cancel_fn(_):
if fut.cancel():
return trio.hazmat.Abort.SUCCEEDED
else:
return trio.hazmat.Abort.FAILED
await trio.hazmat.wait_task_rescheduled(cancel_fn)
assert fut.done()
return fut.result()
async def aclose(self):
# shutdown() has no cancellation support, so we just have to wait it out
with trio.open_cancel_scope(shield=True):
await trio.run_sync_in_worker_thread(self._executor.shutdown())
@miracle2k
Copy link

miracle2k commented Aug 12, 2018

I had been using some code adapted from this, and it tended to crash about once or twice per day. I think I figured out the culprit. There is a race condition here, what can happen is the following:

  • The task is waiting in line 22 on wait_task_rescheduled.
  • The task is cancelled, and the abort callback is invoked, i.e. cancel_fn() runs in the trio thread.
  • The task was not yet processed by the executor, which is apparently quite rate in my case. Therefore fut.cancel() returns True and we give back to trio Abort.SUCCEEDED, which makes trio reschedule the task.
  • However, for a succeeded future, the done callback is still invoked.
  • So no, cb runs and calls trio.hazmat.reschedule (which btw in my version of the code is run through token.run_sync_soon - I don't remember if or why I added this.
  • Now the task was rescheduled twice. What this apparently means is that the next time run_sync is invoked, rio.hazmat.wait_task_rescheduled almost immediately returns (having a reschedule already waiting for it?), at which point assert fut.done() fails.

I changed it to make cb check for future.cancelled().

@njsmith
Copy link
Author

njsmith commented Oct 1, 2018

@miracle2k Oh, nice diagnosis. I updated the gist to check for fut.cancelled().

cb runs and calls trio.hazmat.reschedule (which btw in my version of the code is run through token.run_sync_soon - I don't remember if or why I added this.

Huh! You're right, it looks like ProcessPoolExecutor calls the callback from another thread, and the docs permit this. Okay, I added the token to the gist too :-)

@burnpanck
Copy link

Hey, I have a need for this one too! Did it already make it's way into trio or some add-on library?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment