Skip to content

Instantly share code, notes, and snippets.

@imnotdev25
Created February 23, 2025 05:48
Show Gist options
  • Save imnotdev25/9ee3ae61117ca0704cc640a31c30515a to your computer and use it in GitHub Desktop.
Save imnotdev25/9ee3ae61117ca0704cc640a31c30515a to your computer and use it in GitHub Desktop.
This decorator provides a robust way to execute async functions across multiple threads while maintaining compatibility with both async and synchronous Python code. Use it when you need to: - Parallelize I/O-bound async operations - Integrate async code into legacy synchronous systems - Scale beyond single-threaded asyncio while maintaining asyn…
import asyncio
import threading
from functools import wraps
from concurrent.futures import ThreadPoolExecutor
from typing import Optional, Callable, Coroutine
class ThreadedAsync:
_executor: Optional[ThreadPoolExecutor] = None
def __init__(self, max_threads: Optional[int] = None):
self.max_threads = max_threads
def __call__(self, func: Callable[..., Coroutine]) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs) -> asyncio.Future:
# Create a new event loop in thread if needed
if not ThreadedAsync._executor:
ThreadedAsync._executor = ThreadPoolExecutor(
max_workers=self.max_threads,
thread_name_prefix='AsyncThread'
)
# Create future compatible with asyncio
asyncio_future = asyncio.get_event_loop().create_future()
def _run_in_thread():
thread_loop = asyncio.new_event_loop()
try:
thread_loop.set_debug(True)
result = thread_loop.run_until_complete(func(*args, **kwargs))
asyncio_future.get_loop().call_soon_threadsafe(
asyncio_future.set_result, result
)
except Exception as e:
asyncio_future.get_loop().call_soon_threadsafe(
asyncio_future.set_exception, e
)
finally:
thread_loop.close()
# Submit task to thread pool
ThreadedAsync._executor.submit(_run_in_thread)
return asyncio_future
return wrapper
@classmethod
def shutdown(cls):
if cls._executor:
cls._executor.shutdown(wait=True)
cls._executor = None
# Use cases
# Async awaitable
@ThreadedAsync(max_threads=4)
async def fetch_data(url: str) -> str:
print(f"Fetching {url} (Thread {threading.get_ident()})")
await asyncio.sleep(1) # Simulate async I/O
return f"Data from {url}"
async def main():
futures = [
fetch_data("https://api.com/1"),
fetch_data("https://api.com/2")
]
# Process results as they complete
for coro in asyncio.as_completed(futures):
result = await coro
print(f"Received: {result}")
ThreadedAsync.shutdown()
asyncio.run(main())
# sync blocking
@ThreadedAsync()
async def process_file(filename: str) -> int:
print(f"Processing {filename} (Thread {threading.get_ident()})")
await asyncio.sleep(0.5)
return hash(filename)
def sync_main():
tasks = [process_file(f"file_{i}.txt") for i in range(3)]
# Wait for all results (blocking)
results = [future.result() for future in asyncio.wait(tasks).result()[0]]
print(f"All results: {results}")
ThreadedAsync.shutdown()
sync_main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment