Skip to content

Instantly share code, notes, and snippets.

@olned
Created August 27, 2019 16:39
Show Gist options
  • Save olned/f4ad83c1115910b69ca6602d7e8bcc8d to your computer and use it in GitHub Desktop.
Save olned/f4ad83c1115910b69ca6602d7e8bcc8d to your computer and use it in GitHub Desktop.
Handle a signal in an asyncio python app
import asyncio
import functools
import signal
async def my_task(task_num):
try:
while True:
print(f'Task {task_num}')
await asyncio.sleep(task_num + 1)
except asyncio.CancelledError:
print(f"Task {task_num} was cancelled")
async def main():
tasks = []
for i in range(3):
tasks.append(asyncio.ensure_future(my_task(i)))
loop = asyncio.get_event_loop()
def signal_handler(sig):
print(sig)
loop.remove_signal_handler(sig)
for task in tasks:
task.cancel()
loop.add_signal_handler(signal.SIGINT, functools.partial(signal_handler, sig=signal.SIGINT))
loop.add_signal_handler(signal.SIGTERM, functools.partial(signal_handler, sig=signal.SIGTERM))
await asyncio.gather(*tasks)
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment