Created
July 2, 2018 07:39
-
-
Save jonascheng/f240310f23388cc5ada3d906da67cdbf to your computer and use it in GitHub Desktop.
Celery beat health check script, you may specify this inside Dockerfile
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import arrow | |
import shelve | |
import os.path | |
from datetime import datetime, timedelta | |
# Name of the file used by PersistentScheduler to store the last run times of periodic tasks. | |
FN_CELERYBEAT = 'celerybeat-schedule' | |
def is_tolerable_due(task): | |
now = arrow.utcnow().naive | |
tdelta = timedelta(minutes=-10) | |
print('now: {}'.format(now)) | |
print('task: {}'.format(task)) | |
print('task.last_run_at: {}'.format(task.last_run_at)) | |
try: | |
print('task.schedule.run_every: {}'.format(task.schedule.run_every)) | |
next_run_at = task.last_run_at + task.schedule.run_every | |
print('next_run_at: {}'.format(next_run_at)) | |
remaining = next_run_at - now | |
print('remaining: {}'.format(remaining)) | |
if remaining > tdelta: | |
# still tolerable | |
return True | |
else: | |
# out of tolerance | |
return False | |
except AttributeError: | |
remaining = task.schedule.remaining_estimate(task.last_run_at) | |
print('timedelta(): {}'.format(tdelta)) | |
print('task.schedule.remaining_estimate: {}'.format(remaining)) | |
if remaining > tdelta: | |
# still tolerable in 15 mins | |
return True | |
else: | |
return False | |
if __name__ == '__main__': | |
if os.path.isfile(FN_CELERYBEAT) is False: | |
# file does not exist, assume the container is not celery beat worker | |
print('file does not exist, assume the container is not celery beat worker') | |
sys.exit(0) | |
file_data = shelve.open(FN_CELERYBEAT) | |
for task_name, task in file_data['entries'].items(): | |
is_due, next_time_to_check = task.schedule.is_due(task.last_run_at) | |
print('is_due: {} next_time_to_check: {}'.format(is_due, next_time_to_check)) | |
if is_due is True: | |
if is_tolerable_due(task) is True: | |
print('the task {} may be over due, but is tolerable'.format(task_name)) | |
else: | |
print('the task {} is over due'.format(task_name)) | |
sys.exit(1) | |
else: | |
print('the task {} is not due yet'.format(task_name)) | |
sys.exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Just what I was looking for. Thank you for sharing