Skip to content

Instantly share code, notes, and snippets.

@cgimenes
Last active December 12, 2024 22:47
Show Gist options
  • Save cgimenes/fde07c92043793aeb38b943af10a9308 to your computer and use it in GitHub Desktop.
Save cgimenes/fde07c92043793aeb38b943af10a9308 to your computer and use it in GitHub Desktop.
Terminate all tasks in Celery
from config.celery import celery_app
celery_app.control.purge()
inspect = celery_app.control.inspect()
scheduled_tasks = inspect.scheduled()
if scheduled_tasks:
for worker, tasks in scheduled_tasks.items():
for task in tasks:
celery_app.control.revoke(task["id"], terminate=True)
reserved_tasks = inspect.reserved()
if reserved_tasks:
for worker, tasks in reserved_tasks.items():
for task in tasks:
celery_app.control.revoke(task["id"], terminate=True)
active_tasks = inspect.active()
if active_tasks:
for worker, tasks in active_tasks.items():
for task in tasks:
celery_app.control.revoke(task["id"], terminate=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment