Skip to content

Instantly share code, notes, and snippets.

@matanper
Created February 2, 2025 09:19
Show Gist options
  • Save matanper/8f2439667c853b737a160f0d9932cc24 to your computer and use it in GitHub Desktop.
Save matanper/8f2439667c853b737a160f0d9932cc24 to your computer and use it in GitHub Desktop.
Python3 Asyncio runner with grafecfull exit
import signal
import asyncio
from prometheus_client import start_http_server as start_prometheus_server
EXIT_EVENT = asyncio.Event()
async def infinite_run():
while not EXIT_EVENT.is_set():
# do stuff...
await asyncio.sleep(1)
else:
print("Stopped gracefully")
async def run():
loop = asyncio.get_running_loop()
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
for s in signals:
loop.add_signal_handler(s, lambda: EXIT_EVENT.set())
# Run in prometheus in separate thread to avoid blocking
loop.run_in_executor(None, start_prometheus_server, 80)
# Run tasks
tasks = []
tasks.append(loop.create_task(infinite_run()))
# more tasks...
# Wait for exit event
await EXIT_EVENT.wait()
logger.info('Shutdown signal received, stopping tasks gracefully...')
# Ensure all tasks complete
await asyncio.gather(*tasks, return_exceptions=True)
logger.info('All tasks stopped cleanly.')
if __name__ == '__main__':
asyncio.run(run())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment