Skip to content

Instantly share code, notes, and snippets.

@nat-n
Created April 25, 2022 12:05
Show Gist options
  • Save nat-n/401fe38d08989a520639caced0f52816 to your computer and use it in GitHub Desktop.
Save nat-n/401fe38d08989a520639caced0f52816 to your computer and use it in GitHub Desktop.
A utility for running non-async code concurrently using threads and asyncio
"""
A utility for running non-async code concurrently using threads and asyncio.
# Call a function as async in a background thread
```python
import asyncio
asyncify = AsyncThread()
def some_io_intensive_operation():
...
async def main():
await asyncify.exec(some_io_intensive_operation)
asyncio.run(main())
```
# Generate async version of specific methods.
```python
@AsyncThread.asyncify()
class ThingClient:
@AsyncThread.asyncify()
def get_things(self, name: str):
...
```
Will result in the creation of a class like:
```python
class ThingClient:
def get_things(self, name: str):
...
async def get_things_async(self, name: str):
...
```
"""
import asyncio
from concurrent.futures import ThreadPoolExecutor
from types import FunctionType
from typing import Any, Callable, Optional
class AsyncThread:
_executor: ThreadPoolExecutor
_instance: Optional["AsyncThread"] = None
def __init__(self):
self._executor = ThreadPoolExecutor(16)
def __del__(self):
self._executor.shutdown()
@classmethod
def asyncify(cls, *args):
if cls._instance is None:
cls._instance = AsyncThread()
if args:
# pylint: disable=not-callable
return cls._instance(*args)
return cls._instance
def __call__(self, target):
if isinstance(target, type):
# Replace the target class with a version modified to add async versions
# of all annotated methods
new_methods = {}
for member_name in dir(target):
member = getattr(target, member_name)
if isinstance(member, FunctionType) and getattr(
member, "__asyncify", False
):
new_methods[f"{member_name}_async"] = self._get_async_version(
member
)
return type(
target.__name__,
(target,),
new_methods,
)
elif isinstance(target, FunctionType):
# Annotate the target method to indicate that it should have an async
# version added to the class
setattr(target, "__asyncify", True)
return target
def _get_async_version(self, member: Callable[[Any], Any]):
async def async_version(*args, **kwargs):
def exec_member():
return member(*args, **kwargs)
return await self.exec(exec_member)
return async_version
async def exec(self, exec_member: Callable[[Any], Any]):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(self._executor, exec_member)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment