Created
April 25, 2022 12:05
-
-
Save nat-n/401fe38d08989a520639caced0f52816 to your computer and use it in GitHub Desktop.
A utility for running non-async code concurrently using threads and asyncio
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
A utility for running non-async code concurrently using threads and asyncio. | |
# Call a function as async in a background thread | |
```python | |
import asyncio | |
asyncify = AsyncThread() | |
def some_io_intensive_operation(): | |
... | |
async def main(): | |
await asyncify.exec(some_io_intensive_operation) | |
asyncio.run(main()) | |
``` | |
# Generate async version of specific methods. | |
```python | |
@AsyncThread.asyncify() | |
class ThingClient: | |
@AsyncThread.asyncify() | |
def get_things(self, name: str): | |
... | |
``` | |
Will result in the creation of a class like: | |
```python | |
class ThingClient: | |
def get_things(self, name: str): | |
... | |
async def get_things_async(self, name: str): | |
... | |
``` | |
""" | |
import asyncio | |
from concurrent.futures import ThreadPoolExecutor | |
from types import FunctionType | |
from typing import Any, Callable, Optional | |
class AsyncThread: | |
_executor: ThreadPoolExecutor | |
_instance: Optional["AsyncThread"] = None | |
def __init__(self): | |
self._executor = ThreadPoolExecutor(16) | |
def __del__(self): | |
self._executor.shutdown() | |
@classmethod | |
def asyncify(cls, *args): | |
if cls._instance is None: | |
cls._instance = AsyncThread() | |
if args: | |
# pylint: disable=not-callable | |
return cls._instance(*args) | |
return cls._instance | |
def __call__(self, target): | |
if isinstance(target, type): | |
# Replace the target class with a version modified to add async versions | |
# of all annotated methods | |
new_methods = {} | |
for member_name in dir(target): | |
member = getattr(target, member_name) | |
if isinstance(member, FunctionType) and getattr( | |
member, "__asyncify", False | |
): | |
new_methods[f"{member_name}_async"] = self._get_async_version( | |
member | |
) | |
return type( | |
target.__name__, | |
(target,), | |
new_methods, | |
) | |
elif isinstance(target, FunctionType): | |
# Annotate the target method to indicate that it should have an async | |
# version added to the class | |
setattr(target, "__asyncify", True) | |
return target | |
def _get_async_version(self, member: Callable[[Any], Any]): | |
async def async_version(*args, **kwargs): | |
def exec_member(): | |
return member(*args, **kwargs) | |
return await self.exec(exec_member) | |
return async_version | |
async def exec(self, exec_member: Callable[[Any], Any]): | |
loop = asyncio.get_event_loop() | |
return await loop.run_in_executor(self._executor, exec_member) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment