Last active
July 17, 2024 20:15
-
-
Save matanper/ee5143e3bb85d57bbadcd4a3334af534 to your computer and use it in GitHub Desktop.
Python asyncio utilities
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import logging | |
from asyncio import AbstractEventLoop | |
from concurrent.futures import Future | |
from threading import Thread | |
from typing import Coroutine | |
from app.common.logger import logger | |
def create_event_loop_thread(el_name: str) -> AbstractEventLoop: | |
""" | |
From https://gist.github.com/dmfigol/3e7d5b84a16d076df02baa9f53271058 | |
""" | |
def start_background_loop(loop: AbstractEventLoop) -> None: | |
asyncio.set_event_loop(loop) | |
EventLoopMonitor(el_name).start(loop) | |
loop.run_forever() | |
eventloop = asyncio.new_event_loop() | |
thread = Thread(target=start_background_loop, args=(eventloop,), daemon=True) | |
thread.start() | |
return eventloop | |
def run_coroutine_in_thread(coro: Coroutine, el_name: str) -> Future: | |
""" | |
From https://gist.github.com/dmfigol/3e7d5b84a16d076df02baa9f53271058 | |
""" | |
return asyncio.run_coroutine_threadsafe(coro, create_event_loop_thread(el_name)) | |
class EventLoopMonitor: | |
"""Taken from https://blog.meadsteve.dev/programming/2020/02/23/monitoring-async-python/""" | |
lag: float = 0 | |
active_tasks: int = 0 | |
def __init__(self, el_name: str, interval: float = 0.25): | |
self._interval = interval | |
self._el_name = el_name | |
def start(self, loop: AbstractEventLoop): | |
loop.create_task(self._monitor_loop(loop)) | |
async def _monitor_loop(self, loop: AbstractEventLoop): | |
while loop.is_running(): | |
# Calculate lag | |
start = loop.time() | |
await asyncio.sleep(self._interval) | |
time_slept = loop.time() - start | |
self.lag = time_slept - self._interval | |
# Count active tasks | |
tasks = [t for t in asyncio.all_tasks(loop) if not t.done()] | |
self.active_tasks = len(tasks) | |
# TODO: export to monitoring system | |
# log warning if lag is more than 1 second, might be too noisy adjust appropriately | |
level = logging.WARN if self.lag > 1 else logging.DEBUG | |
logger.log(level, 'event loop (%s) monitor lag=%s, tasks=%s', self._el_name, self.lag, self.active_tasks) | |
class ThreadLocalSingleton: | |
_class_lock = threading.Lock() | |
def __init__(self, cls, *args, **kwargs): | |
self._cls = cls | |
self._args = args | |
self._kwargs = kwargs | |
self._thread_local = threading.local() | |
def get_instance(self): | |
if not hasattr(self._thread_local, 'instance'): | |
with self._class_lock: | |
if not hasattr(self._thread_local, 'instance'): | |
self._thread_local.instance = self._cls(*self._args, **self._kwargs) | |
return self._thread_local.instance |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment