-
-
Save matclayton/344788 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime, timedelta | |
from dateutil.parser import parse as iso8601 | |
from celery.task.base import Task | |
from celery.utils import timedelta_seconds | |
class ExpireTask(Task): | |
"""Task with an expiration time. | |
If the task is expired, the task is not run after all. | |
The :meth:`apply_async` method supports an additional ``expires`` argument, | |
a :class:`datetime.timedelta` instance. | |
.. attribute:: default_expires | |
A :class:`datetime.timedelta` object to define the default expiration time. | |
The default is ``None`` (never expire.) | |
""" | |
default_expires = None | |
abstract = True | |
def __call__(self, *args, **kwargs): | |
expires = kwargs.pop("expires", None) | |
date_sent = kwargs.pop("date_sent", None) | |
if expires and date_sent: | |
if iso8601(date_sent) > timedelta(seconds=expires): | |
return | |
return self.run(*args, **kwargs) | |
@classmethod | |
def apply_async(self, args=None, kwargs=None, expires=None, **options): | |
kwargs = kwargs or {} | |
kwargs.setdefault("date_sent", datetime.now()) | |
if expires is None: | |
expires = self.default_expires | |
kwargs["expires"] = expires | |
kwargs.setdefault("expires", self.default_expires) | |
return super(ExpireTask, self).apply_async(args, kwargs, **options) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment