Created
January 2, 2019 11:22
-
-
Save kgantsov/85be7251b42767d229cd0e1adb48b7a2 to your computer and use it in GitHub Desktop.
Helper functions that revokes celery tasks by name and ID
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from celery.task.control import revoke | |
from celery.task.control import inspect | |
def revoke_tasks_by_name(task_name, worker_prefix=''): | |
""" | |
Revoke all tasks by the name of the celery task | |
:param task_name: Name of the celery task | |
:param worker_prefix: Prefix for the worker | |
:return: None | |
Examples: | |
revoke_tasks_by_name( | |
task_name='users.tasks.indexing.reindex_all_users' | |
) | |
revoke_tasks_by_name( | |
worker_prefix='celery@users-indexer-', | |
task_name='users.tasks.indexing.reindex_all_users' | |
) | |
""" | |
for worker_name, tasks in inspect().active().items(): | |
if worker_name.startswith(worker_prefix): | |
for task in tasks: | |
if task['type'] == task_name: | |
print('Revoking task {}'.format(task)) | |
revoke(task['id'], terminate=True) | |
def revoke_task_by_name_and_id(task_name, task_id, worker_prefix=''): | |
""" | |
Revoke a celery task by its name and id | |
:param task_name: Name of the celery task | |
:param task_id: ID of the celery task | |
:param worker_prefix: Prefix for the worker | |
:return: None | |
Examples: | |
revoke_task_by_name_and_id( | |
task_name='users.tasks.indexing.reindex_all_users' | |
task_id='a3d966d7-7ad3-4865-9898-3669b6de8179', | |
worker_prefix='celery@users-indexer-', | |
) | |
""" | |
for worker_name, tasks in inspect().active().items(): | |
if worker_name.startswith(worker_prefix): | |
for task in tasks: | |
if task['type'] == task_name and task['id'] == task_id: | |
print('Revoking task {}'.format(task)) | |
revoke(task['id'], terminate=True) |
For celery v5 it should be enough to change the imports with this:
from backend.celery import celery_app
revoke = celery_app.control.revoke
inspect = celery_app.control.inspect
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This is not supported in celery v5 onwards.