Skip to content

Instantly share code, notes, and snippets.

@matanper
Last active July 17, 2024 20:15
Show Gist options
  • Save matanper/ee5143e3bb85d57bbadcd4a3334af534 to your computer and use it in GitHub Desktop.
Save matanper/ee5143e3bb85d57bbadcd4a3334af534 to your computer and use it in GitHub Desktop.
Python asyncio utilities
import asyncio
import logging
from asyncio import AbstractEventLoop
from concurrent.futures import Future
from threading import Thread
from typing import Coroutine
from app.common.logger import logger
def create_event_loop_thread(el_name: str) -> AbstractEventLoop:
"""
From https://gist.github.com/dmfigol/3e7d5b84a16d076df02baa9f53271058
"""
def start_background_loop(loop: AbstractEventLoop) -> None:
asyncio.set_event_loop(loop)
EventLoopMonitor(el_name).start(loop)
loop.run_forever()
eventloop = asyncio.new_event_loop()
thread = Thread(target=start_background_loop, args=(eventloop,), daemon=True)
thread.start()
return eventloop
def run_coroutine_in_thread(coro: Coroutine, el_name: str) -> Future:
"""
From https://gist.github.com/dmfigol/3e7d5b84a16d076df02baa9f53271058
"""
return asyncio.run_coroutine_threadsafe(coro, create_event_loop_thread(el_name))
class EventLoopMonitor:
"""Taken from https://blog.meadsteve.dev/programming/2020/02/23/monitoring-async-python/"""
lag: float = 0
active_tasks: int = 0
def __init__(self, el_name: str, interval: float = 0.25):
self._interval = interval
self._el_name = el_name
def start(self, loop: AbstractEventLoop):
loop.create_task(self._monitor_loop(loop))
async def _monitor_loop(self, loop: AbstractEventLoop):
while loop.is_running():
# Calculate lag
start = loop.time()
await asyncio.sleep(self._interval)
time_slept = loop.time() - start
self.lag = time_slept - self._interval
# Count active tasks
tasks = [t for t in asyncio.all_tasks(loop) if not t.done()]
self.active_tasks = len(tasks)
# TODO: export to monitoring system
# log warning if lag is more than 1 second, might be too noisy adjust appropriately
level = logging.WARN if self.lag > 1 else logging.DEBUG
logger.log(level, 'event loop (%s) monitor lag=%s, tasks=%s', self._el_name, self.lag, self.active_tasks)
class ThreadLocalSingleton:
_class_lock = threading.Lock()
def __init__(self, cls, *args, **kwargs):
self._cls = cls
self._args = args
self._kwargs = kwargs
self._thread_local = threading.local()
def get_instance(self):
if not hasattr(self._thread_local, 'instance'):
with self._class_lock:
if not hasattr(self._thread_local, 'instance'):
self._thread_local.instance = self._cls(*self._args, **self._kwargs)
return self._thread_local.instance
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment