Skip to content

Instantly share code, notes, and snippets.

@njsmith
Created March 16, 2018 10:04
Show Gist options
  • Select an option

  • Save njsmith/14502e2c41bcbdc409c3d5bc9f49a5b5 to your computer and use it in GitHub Desktop.

Select an option

Save njsmith/14502e2c41bcbdc409c3d5bc9f49a5b5 to your computer and use it in GitHub Desktop.
import trio
import asyncio
class AsyncioTrioPortal:
def __init__(self, *, asyncio_loop, trio_token):
self._asyncio_loop = asyncio_loop
self._trio_token = trio_token
async def run_sync_asyncio_from_trio(self, func, *args):
"""Runs the given *synchronous* function in asyncio, from trio"""
trio_task = trio.hazmat.current_task()
def asyncio_side_helper():
result = trio.hazmat.Result.capture(func, *args)
self.trio_token.run_sync_soon(
trio.hazmat.reschedule, trio_task, result
)
self._asyncio_loop.call_soon_threadsafe(asyncio_side_helper)
# This isn't cancellable, because synchronous calls aren't (in either
# trio or asyncio)
def abort(_):
return trio.hazmat.Abort.FAILED
await trio.hazmat.wait_task_rescheduled(abort)
async def run_asyncio_from_trio(self, asyncfunc, *args):
"""Runs the given *asynchronous* function in asyncio, from trio"""
# First, create the task, and get back the reference to it so we can
# cancel it if we need to
asyncio_task = await self.run_sync_asyncio_from_trio(
asyncio.ensure_future, asyncfunc, *args
)
trio_task = trio.hazmat.current_task()
# First, arrange for this trio task to be woken when the asyncio task
# completes
def asyncio_side_done_callback(_):
self._trio_token.run_sync_soon(trio.hazmat.reschedule, trio_task)
self._asyncio_loop.call_soon_threadsafe(
asyncio_task.add_done_callback, asyncio_side_done_callback
)
# Second, here's the code to transfer trio-side cancellations to
# asyncio
trio_cancel_raiser = None
def abort(raiser):
# Save the magic function that knows how to raise an appropriate
# cancellation exception on the trio side
nonlocal trio_cancel_raiser
trio_cancel_raiser = raiser
# Ask asyncio to cancel the task
self._asyncio_loop.call_soon_threadsafe(asyncio_task.cancel)
# Tell trio that we're still waiting
return trio.hazmat.Abort.FAILED
# And finally we go to sleep, until asyncio_side_done_callback wakes
# us up again
await trio.hazmat.wait_task_rescheduled(abort)
# At this point the asyncio_task object isn't running anymore, so it
# should be safe to manipulate directly from this thread
if asyncio_task.cancelled() and trio_cancel_raiser is not None:
# We successfully propagated a cancellation; convert it back into
# a trio cancellation. Ideally we'd keep the original
# asyncio.CancelledError exception and attach it to this as a
# __cause__, to keep the full traceback. Unfortunately, asyncio
# throws that exception away. So... sorry.
trio_cancel_raiser()
else:
return asyncio_task.result()
def _result_to_future(self, result, future):
if type(result) is trio.hazmat.Value:
future.set_result(result.value)
else:
future.set_exception(result.error)
async def run_sync_trio_from_asyncio(self, func, *args):
future = asyncio.Future()
def trio_side_helper():
@trio.hazmat.ki_protection_disabled
def doit():
return func(*args)
result = trio.hazmat.Result.capture(doit)
self._asyncio_loop.call_soon_threadsafe(
self._result_to_future, result, future
)
self._trio_token.run_sync_soon(trio_side_helper)
# We have to use asyncio.shield() here to prevent asyncio-side
# cancellation from messing things up. But asyncio.shield has weird
# semantics: what it does is make sure that if we're cancelled, then
# 'future' is not cancelled. This is important, because if future were
# to be cancelled then it would be finished, and we wouldn't be able
# to keep waiting for it. But if we're cancelled, 'shield' does still
# abort and raise asyncio.CancelledError. It also raises
# CancelledError if that's actually how the future resolved. So we
# have to catch CancelledError, and then figure out whether it was
# caused by us being cancelled or by the future resolving
while True:
try:
return await asyncio.shield(future)
except asyncio.CancelledError:
if future.cancelled():
# This exception is real; carry on.
raise
else:
# False alarm; try again.
continue
async def run_trio_from_asyncio(self, asyncfunc, *args):
future = asyncio.Future()
queue = asyncio.Queue()
async def trio_side_async_helper():
@trio.hazmat.ki_protection_disabled
async def doit():
with trio.open_cancel_scope() as cancel_scope:
await self.run_sync_asyncio_from_trio(
queue.put_nowait, cancel_scope
)
return await func(*args)
if cancel_scope.cancelled_caught:
raise asyncio.CancelledError
result = await trio.hazmat.Result.acapture(doit)
self._asyncio_loop.call_soon_threadsafe(
self._result_to_future, result, future
)
def trio_side_sync_helper():
trio.hazmat.spawn_system_task(trio_side_async_helper)
self._trio_token.run_sync_soon(trio_side_sync_helper)
# Wait for the trio side to finish setting up, and get the cancel
# scope reference back. This part can't be cancelled.
while True:
try:
trio_cancel_scope = await asyncio.shield(queue.get())
except asyncio.CancelledError:
continue
else:
break
while True:
try:
return await asyncio.shield(future)
except asyncio.CancelledError:
if future.cancelled():
# This is actually the final exception from the future, so
# re-raise.
raise
else:
# We were cancelled, so we need to pass that on to the
# trio side.
self._trio_token.run_sync_soon(trio_cancel_scope.cancel)
# Okay, trio knows about it; loop around and keep waiting
# for it to finish unwinding.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment