Created
September 10, 2019 07:51
-
-
Save bsitruk/29e90d787cf2944ca58706c6975fc608 to your computer and use it in GitHub Desktop.
Asyncio event loop wrapper
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| import logging | |
| import signal | |
| logger = logging.getLogger(__name__) | |
| class EventLoop: | |
| def __init__(self, loop=None, debug=False): | |
| self.loop = loop or asyncio.get_event_loop() | |
| self.loop.set_debug(debug) | |
| self.stopped = False | |
| self.shutdown_callbacks = [self._cleanup] | |
| for s in (signal.SIGHUP, signal.SIGTERM, signal.SIGINT): | |
| self.loop.add_signal_handler( | |
| s, self.loop.stop) | |
| self.loop.set_exception_handler(self.handle_exception) | |
| def set_debug(self, debug: bool): | |
| self.loop.set_debug(debug) | |
| return self | |
| def add_shutdown_callback(self, callback): | |
| self.shutdown_callbacks.append(callback) | |
| return self | |
| async def shutdown(self): | |
| if self.stopped: | |
| return | |
| logger.debug('Shutdown event loop...') | |
| self.stopped = True | |
| while self.shutdown_callbacks: | |
| callback = self.shutdown_callbacks.pop(0) | |
| # TODO: rewrite with python 3.8 walrus operator | |
| res = callback() | |
| if asyncio.iscoroutine(res): | |
| await res | |
| def close(self): | |
| self.loop.close() | |
| def inner_loop(self): | |
| return self.loop | |
| @staticmethod | |
| def handle_exception(loop, context): | |
| # context["message"] will always be there; but context["exception"] may not | |
| msg = context.get("exception", context["message"]) | |
| logger.error(f"Caught exception: {msg}") | |
| logger.info("Shutting down...") | |
| loop.stop() | |
| @staticmethod | |
| async def _cleanup(): | |
| tasks = [t for t in asyncio.all_tasks() if t is not | |
| asyncio.current_task()] | |
| [task.cancel() for task in tasks] | |
| logger.info(f"Cancelling {len(tasks)} outstanding tasks") | |
| await asyncio.gather(*tasks, return_exceptions=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment