Skip to content

Instantly share code, notes, and snippets.

@bsitruk
Created September 10, 2019 07:51
Show Gist options
  • Save bsitruk/29e90d787cf2944ca58706c6975fc608 to your computer and use it in GitHub Desktop.
Save bsitruk/29e90d787cf2944ca58706c6975fc608 to your computer and use it in GitHub Desktop.
Asyncio event loop wrapper
import asyncio
import logging
import signal
logger = logging.getLogger(__name__)
class EventLoop:
def __init__(self, loop=None, debug=False):
self.loop = loop or asyncio.get_event_loop()
self.loop.set_debug(debug)
self.stopped = False
self.shutdown_callbacks = [self._cleanup]
for s in (signal.SIGHUP, signal.SIGTERM, signal.SIGINT):
self.loop.add_signal_handler(
s, self.loop.stop)
self.loop.set_exception_handler(self.handle_exception)
def set_debug(self, debug: bool):
self.loop.set_debug(debug)
return self
def add_shutdown_callback(self, callback):
self.shutdown_callbacks.append(callback)
return self
async def shutdown(self):
if self.stopped:
return
logger.debug('Shutdown event loop...')
self.stopped = True
while self.shutdown_callbacks:
callback = self.shutdown_callbacks.pop(0)
# TODO: rewrite with python 3.8 walrus operator
res = callback()
if asyncio.iscoroutine(res):
await res
def close(self):
self.loop.close()
def inner_loop(self):
return self.loop
@staticmethod
def handle_exception(loop, context):
# context["message"] will always be there; but context["exception"] may not
msg = context.get("exception", context["message"])
logger.error(f"Caught exception: {msg}")
logger.info("Shutting down...")
loop.stop()
@staticmethod
async def _cleanup():
tasks = [t for t in asyncio.all_tasks() if t is not
asyncio.current_task()]
[task.cancel() for task in tasks]
logger.info(f"Cancelling {len(tasks)} outstanding tasks")
await asyncio.gather(*tasks, return_exceptions=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment