Last active
August 27, 2024 14:41
-
-
Save Lonami/3f79ed774d2e0100ded5b171a47f2caf to your computer and use it in GitHub Desktop.
Wrapper to launch async tasks from threaded code
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading | |
import asyncio | |
from queue import Queue as BlockingQueue | |
class TwoSidedQueue: | |
""" | |
Behaves like an `asyncio.Queue`, but `get` and `put` act on different ends. | |
""" | |
def __init__(self, queue_in, queue_out): | |
self._queue_in = queue_in | |
self._queue_out = queue_out | |
self._sides = { | |
'empty': queue_out, | |
'full': queue_out, | |
'get': queue_in, | |
'get_nowait': queue_in, | |
'join': queue_out, | |
'put': queue_out, | |
'put_nowait': queue_out, | |
'qsize': queue_out, | |
'task_done': queue_in, | |
} | |
def __getattr__(self, name): | |
return getattr(self._sides.get(name, self._queue_in), name) | |
class LaunchAsync: | |
""" | |
Provides a way to safely `put` data into `coro` (an `async def`) and `get` data back. | |
It will launch the `coro` in a new thread, so be careful it doesn't crash. | |
Make sure this `coro` does not interact with a `coro` launched from another instance | |
of `LaunchAsync`, since the `asyncio` event loops won't match and could lead to errors. | |
When the context manager exits, the task will be cancelled. You can catch this error in | |
`coro`, but if the `coro` doesn't eventually exit, the context manager exit will hang. | |
""" | |
def __init__(self, coro, *args, **kwargs): | |
self._coro = coro | |
self._args = args | |
self._kwargs = kwargs | |
self._thread = None | |
self._loop = None | |
self._task = None | |
self._queue_in = None | |
self._queue_out = None | |
self._size = 0 | |
def size(self, size): | |
""" | |
Change the size of the inbound and outbound queues. The default is unbounded (`None`). Example: | |
>>> with LaunchAsync(async_def).size(10) as queue: | |
>>> ... # ^^^^^^^^^ | |
""" | |
self._size = size or 0 | |
return self | |
def put(self, data, *, timeout=None): | |
""" | |
`put` data in for the `coro` to `get` out. Will block if the maximum `size` was reached. | |
Does nothing if the `coro` is dead. | |
""" | |
try: | |
return asyncio.run_coroutine_threadsafe(self._queue_out.put(data), self._loop).result(timeout) | |
except RuntimeError: | |
if self._loop.is_running(): | |
raise | |
else: | |
return None | |
def get(self, *, timeout=None): | |
""" | |
`get` data out of the `coro` it `put` in. Will block if the queue is empty. | |
Returns `None` if the `coro` is dead. | |
""" | |
try: | |
return asyncio.run_coroutine_threadsafe(self._queue_in.get(), self._loop).result(timeout) | |
except RuntimeError: | |
if self._loop.is_running(): | |
raise | |
else: | |
return None | |
def dead(self): | |
""" | |
Return `true` if the other side is dead (the `coro` has exited, with or without error). | |
""" | |
return not self._loop.is_running() | |
def __enter__(self): | |
# asyncio.run is used as it's a battle-tested way to safely set up a new loop and tear | |
# it down. However it does mean it's necessary to wait for the task to run before it's | |
# possible to get said loop and task back. For this, the usual blocking queue is used. | |
oneshot = BlockingQueue(1) | |
self._thread = threading.Thread(target=asyncio.run, args=( | |
self._run(self._coro, self._size, oneshot, self._args, self._kwargs),)) | |
self._thread.start() | |
self._loop, self._task, self._queue_in, self._queue_out = oneshot.get() | |
return self | |
def __exit__(self, exc_type, exc_value, exc_traceback): | |
try: | |
self._loop.call_soon_threadsafe(self._task.cancel) | |
except RuntimeError: | |
if self._loop.is_running(): | |
raise | |
finally: | |
self._thread.join() | |
@staticmethod | |
async def _run(coro, size, oneshot, args, kwargs): | |
# asyncio.Queue's are created here so that they pick up the right loop. | |
queue_in, queue_out = asyncio.Queue(size), asyncio.Queue(size) | |
oneshot.put((asyncio.get_event_loop(), asyncio.current_task(), queue_in, queue_out)) | |
try: | |
# `queue_in` and `queue_out` are intentionally swapped here. | |
await coro(TwoSidedQueue(queue_out, queue_in), *args, **kwargs) | |
except asyncio.CancelledError: | |
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This is an example of what your other code can look like. | |
from launchasync import LaunchAsync | |
# You don't need a class to send commands. Your commands can be anything. | |
# You can send strings, numbers, and any other object you want directly. | |
# This is just a nice way to send commands with ID and optional data. | |
class Command: | |
def __init__(self, id, data=None): | |
self.id = id | |
self.data = data | |
# This is the "async main" function where you will do everything that needs async. | |
# If you need to call other async functions or create more tasks, do it inside here. | |
# `queue` acts like an `asyncio.Queue` but `put` and `get` won't interfere. | |
async def async_main(queue): | |
while True: | |
command = await queue.get() | |
if command.id == 'print': | |
print('Hello from async!') | |
elif command.id == 'double': | |
await queue.put(command.data * 2) | |
# The first argument must be a coroutine function (do NOT call it with ()). | |
# In this case, it's `async_main` (without `()`). | |
# | |
# The function passed here will always receive the shared queue as the first parameter. | |
# You can pass more parameters in `LaunchAsync` to forward them to your `async def`. | |
# | |
# LaunchAsync is a context manager which gives you access to a shared queue with the | |
# async code. | |
with LaunchAsync(async_main) as queue: | |
# Here's your threaded code. You can `put` commands to execute in async-land. | |
# You can even launch a new thread and `put` things from that new thread. | |
queue.put(Command('print')) | |
# And you can also `get` commands back (note that this can block, but you can set | |
# `timeout` in both `put` and `get`. It only makes sense if `LaunchAsync.size` was used). | |
queue.put(Command('double', 7)) | |
response = queue.get(timeout=1) | |
print('The result of doubling 7 is', response) | |
# Here the `async_main` task will be cancelled automatically and the thread joined. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment