Created
March 16, 2018 10:04
-
-
Save njsmith/14502e2c41bcbdc409c3d5bc9f49a5b5 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import trio | |
| import asyncio | |
| class AsyncioTrioPortal: | |
| def __init__(self, *, asyncio_loop, trio_token): | |
| self._asyncio_loop = asyncio_loop | |
| self._trio_token = trio_token | |
| async def run_sync_asyncio_from_trio(self, func, *args): | |
| """Runs the given *synchronous* function in asyncio, from trio""" | |
| trio_task = trio.hazmat.current_task() | |
| def asyncio_side_helper(): | |
| result = trio.hazmat.Result.capture(func, *args) | |
| self.trio_token.run_sync_soon( | |
| trio.hazmat.reschedule, trio_task, result | |
| ) | |
| self._asyncio_loop.call_soon_threadsafe(asyncio_side_helper) | |
| # This isn't cancellable, because synchronous calls aren't (in either | |
| # trio or asyncio) | |
| def abort(_): | |
| return trio.hazmat.Abort.FAILED | |
| await trio.hazmat.wait_task_rescheduled(abort) | |
| async def run_asyncio_from_trio(self, asyncfunc, *args): | |
| """Runs the given *asynchronous* function in asyncio, from trio""" | |
| # First, create the task, and get back the reference to it so we can | |
| # cancel it if we need to | |
| asyncio_task = await self.run_sync_asyncio_from_trio( | |
| asyncio.ensure_future, asyncfunc, *args | |
| ) | |
| trio_task = trio.hazmat.current_task() | |
| # First, arrange for this trio task to be woken when the asyncio task | |
| # completes | |
| def asyncio_side_done_callback(_): | |
| self._trio_token.run_sync_soon(trio.hazmat.reschedule, trio_task) | |
| self._asyncio_loop.call_soon_threadsafe( | |
| asyncio_task.add_done_callback, asyncio_side_done_callback | |
| ) | |
| # Second, here's the code to transfer trio-side cancellations to | |
| # asyncio | |
| trio_cancel_raiser = None | |
| def abort(raiser): | |
| # Save the magic function that knows how to raise an appropriate | |
| # cancellation exception on the trio side | |
| nonlocal trio_cancel_raiser | |
| trio_cancel_raiser = raiser | |
| # Ask asyncio to cancel the task | |
| self._asyncio_loop.call_soon_threadsafe(asyncio_task.cancel) | |
| # Tell trio that we're still waiting | |
| return trio.hazmat.Abort.FAILED | |
| # And finally we go to sleep, until asyncio_side_done_callback wakes | |
| # us up again | |
| await trio.hazmat.wait_task_rescheduled(abort) | |
| # At this point the asyncio_task object isn't running anymore, so it | |
| # should be safe to manipulate directly from this thread | |
| if asyncio_task.cancelled() and trio_cancel_raiser is not None: | |
| # We successfully propagated a cancellation; convert it back into | |
| # a trio cancellation. Ideally we'd keep the original | |
| # asyncio.CancelledError exception and attach it to this as a | |
| # __cause__, to keep the full traceback. Unfortunately, asyncio | |
| # throws that exception away. So... sorry. | |
| trio_cancel_raiser() | |
| else: | |
| return asyncio_task.result() | |
| def _result_to_future(self, result, future): | |
| if type(result) is trio.hazmat.Value: | |
| future.set_result(result.value) | |
| else: | |
| future.set_exception(result.error) | |
| async def run_sync_trio_from_asyncio(self, func, *args): | |
| future = asyncio.Future() | |
| def trio_side_helper(): | |
| @trio.hazmat.ki_protection_disabled | |
| def doit(): | |
| return func(*args) | |
| result = trio.hazmat.Result.capture(doit) | |
| self._asyncio_loop.call_soon_threadsafe( | |
| self._result_to_future, result, future | |
| ) | |
| self._trio_token.run_sync_soon(trio_side_helper) | |
| # We have to use asyncio.shield() here to prevent asyncio-side | |
| # cancellation from messing things up. But asyncio.shield has weird | |
| # semantics: what it does is make sure that if we're cancelled, then | |
| # 'future' is not cancelled. This is important, because if future were | |
| # to be cancelled then it would be finished, and we wouldn't be able | |
| # to keep waiting for it. But if we're cancelled, 'shield' does still | |
| # abort and raise asyncio.CancelledError. It also raises | |
| # CancelledError if that's actually how the future resolved. So we | |
| # have to catch CancelledError, and then figure out whether it was | |
| # caused by us being cancelled or by the future resolving | |
| while True: | |
| try: | |
| return await asyncio.shield(future) | |
| except asyncio.CancelledError: | |
| if future.cancelled(): | |
| # This exception is real; carry on. | |
| raise | |
| else: | |
| # False alarm; try again. | |
| continue | |
| async def run_trio_from_asyncio(self, asyncfunc, *args): | |
| future = asyncio.Future() | |
| queue = asyncio.Queue() | |
| async def trio_side_async_helper(): | |
| @trio.hazmat.ki_protection_disabled | |
| async def doit(): | |
| with trio.open_cancel_scope() as cancel_scope: | |
| await self.run_sync_asyncio_from_trio( | |
| queue.put_nowait, cancel_scope | |
| ) | |
| return await func(*args) | |
| if cancel_scope.cancelled_caught: | |
| raise asyncio.CancelledError | |
| result = await trio.hazmat.Result.acapture(doit) | |
| self._asyncio_loop.call_soon_threadsafe( | |
| self._result_to_future, result, future | |
| ) | |
| def trio_side_sync_helper(): | |
| trio.hazmat.spawn_system_task(trio_side_async_helper) | |
| self._trio_token.run_sync_soon(trio_side_sync_helper) | |
| # Wait for the trio side to finish setting up, and get the cancel | |
| # scope reference back. This part can't be cancelled. | |
| while True: | |
| try: | |
| trio_cancel_scope = await asyncio.shield(queue.get()) | |
| except asyncio.CancelledError: | |
| continue | |
| else: | |
| break | |
| while True: | |
| try: | |
| return await asyncio.shield(future) | |
| except asyncio.CancelledError: | |
| if future.cancelled(): | |
| # This is actually the final exception from the future, so | |
| # re-raise. | |
| raise | |
| else: | |
| # We were cancelled, so we need to pass that on to the | |
| # trio side. | |
| self._trio_token.run_sync_soon(trio_cancel_scope.cancel) | |
| # Okay, trio knows about it; loop around and keep waiting | |
| # for it to finish unwinding. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment