Skip to content

Instantly share code, notes, and snippets.

@fordnox
Created November 8, 2024 11:54
Show Gist options
  • Save fordnox/a3825f710b87e644771a4d98a48441fc to your computer and use it in GitHub Desktop.
Save fordnox/a3825f710b87e644771a4d98a48441fc to your computer and use it in GitHub Desktop.
python async loop shutdown
import asyncio
import signal
async def hourly_task():
try:
while True:
print("Running hourly task")
await asyncio.sleep(3) # Sleep for 1 hour
except asyncio.CancelledError:
print("Hourly task canceled")
# Perform any cleanup if needed
async def half_hourly_task():
try:
while True:
print("Running half-hourly task")
await asyncio.sleep(1) # Sleep for 30 minutes
except asyncio.CancelledError:
print("Half-hourly task canceled")
# Perform any cleanup if needed
def shutdown(loop, tasks):
print("Received SIGTERM. Cancelling tasks...")
for task in tasks:
task.cancel() # Cancel each task
loop.stop() # Stop the loop after tasks are canceled
# Run the tasks with asyncio
loop = asyncio.new_event_loop()
try:
# Create tasks only once here
task1 = loop.create_task(hourly_task())
task2 = loop.create_task(half_hourly_task())
tasks = [task1, task2]
# Set up SIGTERM handler to cancel tasks
loop.add_signal_handler(signal.SIGTERM, shutdown, loop, tasks)
loop.add_signal_handler(signal.SIGINT, shutdown, loop, tasks)
# Run the loop until the tasks are done or canceled
loop.run_forever()
for t in [t for t in tasks if not (t.done() or t.cancelled())]:
# give canceled tasks the last chance to run
loop.run_until_complete(t)
except asyncio.CancelledError:
print("Tasks were cancelled")
finally:
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment