Last active
September 30, 2024 20:40
-
-
Save Skyross/2f4c95f5df2446b71f74f4f9d9771125 to your computer and use it in GitHub Desktop.
Celery Task with lock
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from celery import Task | |
from django.conf import settings | |
from django.core.cache import caches | |
from celery.utils.log import get_task_logger | |
logger = get_task_logger(__name__) | |
# noinspection PyAbstractClass | |
class TaskWithLock(Task): | |
""" | |
Base task with lock to prevent multiple execution of tasks with ETA. | |
It's happens with multiple workers for tasks with any delay (countdown, ETA). | |
You may override cache backend by setting `CELERY_TASK_LOCK_CACHE` in your Django settings file | |
""" | |
abstract = True | |
cache = caches[getattr(settings, 'CELERY_TASK_LOCK_CACHE', 'default')] | |
lock_expiration = 60 * 60 * 24 # 1 day | |
@property | |
def lock_key(self): | |
""" | |
Unique string for task as lock key | |
""" | |
return 'TaskLock_%s_%s_%s' % (self.__class__.__name__, self.request.id, self.request.retries) | |
def acquire_lock(self): | |
""" | |
Set lock | |
""" | |
result = self.cache.add(self.lock_key, True, self.lock_expiration) | |
logger.debug('Acquiring %s key %s', self.lock_key, 'succeed' if result else 'failed') | |
return result | |
def __call__(self, *args, **kwargs): | |
""" | |
Checking for lock existence | |
""" | |
if self.acquire_lock(): | |
logger.debug('Task %s execution with lock started', self.request.id) | |
return super(TaskWithLock, self).__call__(*args, **kwargs) | |
logger.warning('Task %s skipped due lock detection', self.request.id) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I've implemented a
LockedTask
class using Django cache system as a distributed lock which supports lock refreshing, so long running tasks will be still locked (since the TTL will be updated). This behavior can also be configured onsettings
. It's a so common use case so I added to my team's Django project template: https://github.com/PythonicCafe/cookiecutter-dokku-django/blob/main/%7B%7B%20cookiecutter.project_slug%20%7D%7D/project/utils/celery.py#L14