Last active
May 15, 2019 09:20
-
-
Save fdemmer/7551bff2bab80b56aac5018060aded55 to your computer and use it in GitHub Desktop.
Alternative crontab schedule for Celery
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import pytz | |
import six | |
import tzcron | |
from kombu.utils import cached_property | |
from pytz import AmbiguousTimeError, NonExistentTimeError | |
from celery import schedules | |
from celery.utils.time import is_naive | |
logger = logging.getLogger(__name__) | |
class tzcrontab(schedules.BaseSchedule): | |
def __init__(self, expression, tz=None, *args, **kwargs): | |
""" | |
Crontab schedule supporting a timezone. | |
Expression parsing and generation of event times is using `tzcron`: | |
https://tzcron.readthedocs.io | |
:param expression: cron expression (with year), see `tzcron` docs for details | |
:param tz: timezone as string (eg. 'Europe/Vienna') or `pytz.timezone` | |
""" | |
self.expression = expression | |
# set 'timezone' attribute like on `self.app`, use `self.tz` property to retrieve | |
if isinstance(tz, six.text_type): | |
self.timezone = pytz.timezone(tz) | |
else: | |
self.timezone = tz | |
super(tzcrontab, self).__init__(**kwargs) | |
@cached_property | |
def tz(self): | |
return self.timezone or self.app.timezone | |
def __repr__(self): | |
template = "<{}: {} @{}>" | |
return template.format(self.__class__.__name__, self.expression, self.tz) | |
def __eq__(self, other): | |
if isinstance(other, tzcrontab): | |
return all([ | |
other.expression == self.expression, | |
other.timezone == self.timezone, | |
super(tzcrontab, self).__eq__(other) | |
]) | |
return NotImplemented | |
def now(self): | |
now = super(tzcrontab, self).now() | |
assert not is_naive(now), "Please, don't use naive datetimes!" | |
return now | |
def remaining_estimate(self, since): | |
# Make a schedule from the cron expression, starting at given datetime. | |
event_datetimes = tzcron.Schedule(self.expression, self.tz, since) | |
logger.debug('Schedule from cron expression: %s', event_datetimes) | |
try: | |
# Find the next event. | |
next_datetime = next(event_datetimes) | |
except AmbiguousTimeError: | |
logger.exception( | |
"Time is ambiguous in the requested timezone! " | |
"Task will not be scheduled!" | |
) | |
return | |
except NonExistentTimeError: | |
logger.exception( | |
"Time does not exist in the requested timezone! " | |
"Task will not be scheduled!" | |
) | |
return | |
remaining_delta = next_datetime - self.now() | |
logger.debug( | |
'remaining_estimate: @%s, next_datetime: %s, remaining_seconds: %s', | |
self.tz, next_datetime, remaining_delta.total_seconds(), | |
) | |
return remaining_delta | |
def _remaining_seconds(self, since): | |
remaining_delta = self.remaining_estimate(since) | |
if remaining_delta is not None: | |
return remaining_delta.total_seconds() | |
def _is_due(self, remaining_seconds): | |
# lower bound at 1 second past do not trigger old events | |
# upper bound at 1 second future to trigger almost due events | |
is_due = -1 < remaining_seconds < 1 | |
return is_due | |
def determine_is_due(self, since): | |
remaining_seconds = self._remaining_seconds(since) | |
# when determined time was ambiguous or non-existent | |
if remaining_seconds is None: | |
logger.warning('could not determine remaining_seconds') | |
return False, self.app.conf.beat_max_loop_interval | |
is_due = self._is_due(remaining_seconds) | |
return is_due, remaining_seconds | |
def is_due(self, last_run_at): | |
""" | |
Return tuple of `(is_due, remaining_seconds)`. | |
""" | |
# Remaining time is negative and close to zero, when a task is due. | |
# However `last_run_at` can be very far in the past if the | |
# `PersistentScheduler` was stopped for a while. | |
# To prevent triggering tasks based on outdated scheduled events, we | |
# limit the time to 1 minute in the past. | |
now = self.now() | |
is_due, remaining_seconds = self.determine_is_due(last_run_at) | |
if not is_due and remaining_seconds < 0: | |
# The task was not due, due to it being very far in the past. | |
# Calculate the next runtime from just a moment ago to find earliest due. | |
since = now - timedelta(seconds=1) | |
is_due, remaining_seconds = self.determine_is_due(since) | |
if remaining_seconds < 1: | |
since = now + timedelta(seconds=1) | |
_, remaining_seconds = self.determine_is_due(since) | |
# is_due == True triggers task, remaining_seconds sets time for next tick | |
return is_due, remaining_seconds | |
class pytzcrontab(tzcrontab): | |
def __init__(self, minute='*', hour='*', day_of_week='*', | |
day_of_month='*', month_of_year='*', year='*', | |
tz=None, *args, **kwargs): | |
""" | |
Wrapper for `tzcrontab` with more "pythonic" interface, that allows passing | |
the parts of a cron expression as separate arguments. | |
:param minute: 0-59 or pattern, default: '*' | |
:param hour: 0-23 or pattern, default: '*' | |
:param day_of_week: 1-7 (Monday to Sunday) or pattern, default: '*' | |
:param day_of_month: 1-31 or pattern, default: '*' | |
:param month_of_year: 1-12 or pattern, default: '*' | |
:param year: full year (yyyy) or '*', default: '*' | |
:param tz: timezone as string (eg. 'Europe/Vienna') or `pytz.timezone` | |
""" | |
expression = ' '.join([ | |
str(arg) for arg | |
in (minute, hour, day_of_month, month_of_year, day_of_week, year) | |
]) | |
super(pytzcrontab, self).__init__(expression, tz=tz, *args, **kwargs) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment