Created
February 23, 2025 05:48
-
-
Save imnotdev25/9ee3ae61117ca0704cc640a31c30515a to your computer and use it in GitHub Desktop.
This decorator provides a robust way to execute async functions across multiple threads while maintaining compatibility with both async and synchronous Python code. Use it when you need to: - Parallelize I/O-bound async operations - Integrate async code into legacy synchronous systems - Scale beyond single-threaded asyncio while maintaining asyn…
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import threading | |
from functools import wraps | |
from concurrent.futures import ThreadPoolExecutor | |
from typing import Optional, Callable, Coroutine | |
class ThreadedAsync: | |
_executor: Optional[ThreadPoolExecutor] = None | |
def __init__(self, max_threads: Optional[int] = None): | |
self.max_threads = max_threads | |
def __call__(self, func: Callable[..., Coroutine]) -> Callable: | |
@wraps(func) | |
def wrapper(*args, **kwargs) -> asyncio.Future: | |
# Create a new event loop in thread if needed | |
if not ThreadedAsync._executor: | |
ThreadedAsync._executor = ThreadPoolExecutor( | |
max_workers=self.max_threads, | |
thread_name_prefix='AsyncThread' | |
) | |
# Create future compatible with asyncio | |
asyncio_future = asyncio.get_event_loop().create_future() | |
def _run_in_thread(): | |
thread_loop = asyncio.new_event_loop() | |
try: | |
thread_loop.set_debug(True) | |
result = thread_loop.run_until_complete(func(*args, **kwargs)) | |
asyncio_future.get_loop().call_soon_threadsafe( | |
asyncio_future.set_result, result | |
) | |
except Exception as e: | |
asyncio_future.get_loop().call_soon_threadsafe( | |
asyncio_future.set_exception, e | |
) | |
finally: | |
thread_loop.close() | |
# Submit task to thread pool | |
ThreadedAsync._executor.submit(_run_in_thread) | |
return asyncio_future | |
return wrapper | |
@classmethod | |
def shutdown(cls): | |
if cls._executor: | |
cls._executor.shutdown(wait=True) | |
cls._executor = None | |
# Use cases | |
# Async awaitable | |
@ThreadedAsync(max_threads=4) | |
async def fetch_data(url: str) -> str: | |
print(f"Fetching {url} (Thread {threading.get_ident()})") | |
await asyncio.sleep(1) # Simulate async I/O | |
return f"Data from {url}" | |
async def main(): | |
futures = [ | |
fetch_data("https://api.com/1"), | |
fetch_data("https://api.com/2") | |
] | |
# Process results as they complete | |
for coro in asyncio.as_completed(futures): | |
result = await coro | |
print(f"Received: {result}") | |
ThreadedAsync.shutdown() | |
asyncio.run(main()) | |
# sync blocking | |
@ThreadedAsync() | |
async def process_file(filename: str) -> int: | |
print(f"Processing {filename} (Thread {threading.get_ident()})") | |
await asyncio.sleep(0.5) | |
return hash(filename) | |
def sync_main(): | |
tasks = [process_file(f"file_{i}.txt") for i in range(3)] | |
# Wait for all results (blocking) | |
results = [future.result() for future in asyncio.wait(tasks).result()[0]] | |
print(f"All results: {results}") | |
ThreadedAsync.shutdown() | |
sync_main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment