Skip to content

Instantly share code, notes, and snippets.

@njsmith
Last active September 6, 2022 07:54
Show Gist options
  • Save njsmith/be24d376faea61cb3999f08318b780f3 to your computer and use it in GitHub Desktop.
Save njsmith/be24d376faea61cb3999f08318b780f3 to your computer and use it in GitHub Desktop.
import trio
# Process pool based on concurrent.futures
import concurrent.futures
class TrioProcessExecutor(trio.abc.AsyncResource):
def __init__(self, max_workers=None):
self._executor = concurrent.futures.ProcessPoolExecutor(max_workers=max_workers)
async def run_sync(self, fn, *args):
fut = self._executor.submit(fn, *args)
task = trio.hazmat.current_task()
token = trio.hazmat.current_trio_token()
def cb(_):
# If we successfully cancelled from cancel_fn, then
# this callback still gets called, but we were already
# rescheduled so we don't need to do it again.
if not fut.cancelled():
token.run_sync_soon(trio.hazmat.reschedule, task)
fut.add_done_callback(cb)
def cancel_fn(_):
if fut.cancel():
return trio.hazmat.Abort.SUCCEEDED
else:
return trio.hazmat.Abort.FAILED
await trio.hazmat.wait_task_rescheduled(cancel_fn)
assert fut.done()
return fut.result()
async def aclose(self):
# shutdown() has no cancellation support, so we just have to wait it out
with trio.open_cancel_scope(shield=True):
await trio.run_sync_in_worker_thread(self._executor.shutdown())
@burnpanck
Copy link

Hey, I have a need for this one too! Did it already make it's way into trio or some add-on library?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment