Last active
July 9, 2024 15:50
-
-
Save brablc/b5a585341af60dc2d2cc417b3d0b5a4e to your computer and use it in GitHub Desktop.
Django management command for Celery monitoring
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# See https://github.com/brablc/swarm-health-alerter/edit/main/README.md#alerting-for-any-service | |
# Place to adm/management/commands/celery_monitor.py (change adm to your main application in the code too) | |
import os | |
import logging | |
import json | |
from celery import Celery | |
from celery.events import EventReceiver | |
from django.core.management.base import BaseCommand | |
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "adm.settings") | |
app = Celery("adm") | |
app.config_from_object("django.conf:settings", namespace="CELERY") | |
class Command(BaseCommand): | |
help = "Monitor Celery events - detect repeatedly failing tasks." | |
status_file: str | |
failed_retries: int | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
logging.basicConfig(level=logging.INFO) | |
self.logger = logging.getLogger(__name__) | |
self.task_info = {} | |
self.failed_tasks = {} | |
def add_arguments(self, parser): | |
parser.add_argument( | |
"--status-file", | |
type=str, | |
required=True, | |
help="Name of the file for writing error output", | |
) | |
parser.add_argument( | |
"--failed-retries", | |
type=int, | |
default=3, | |
help="Include in status file, when reaching number of failed retries", | |
) | |
def handle(self, *args, **kwargs): | |
self.status_file = kwargs.get("status_file") | |
self.failed_retries = kwargs.get("failed_retries") | |
self.logger.info(f"Status file: {self.status_file}") | |
self.capture_events() | |
def update_status(self): | |
status_data = {} | |
for task_name in self.failed_tasks: | |
count = self.failed_tasks[task_name]["count"] | |
if count < self.failed_retries: | |
continue | |
event = self.failed_tasks[task_name]["event"] | |
status_data[task_name] = { | |
"failed_retries": count, | |
"last_failure": { | |
"timestamp": event.get("timestamp", ""), | |
"exception": event.get("exception", ""), | |
"traceback": event.get("traceback", ""), | |
"args": event.get("args", []), | |
"kwargs": event.get("kwargs", {}), | |
}, | |
} | |
if status_data: | |
with open(self.status_file, "w") as f: | |
json.dump(status_data, f, indent=2, default=str) | |
else: | |
if os.path.exists(self.status_file): | |
os.remove(self.status_file) | |
self.logger.info("All tasks healthy") | |
def process_event(self, event): | |
event_type = event["type"] | |
if "uuid" not in event: | |
return | |
task_uuid = event["uuid"] | |
if event_type == "task-received": | |
self.task_info[task_uuid] = event["name"] | |
return | |
if event_type not in ("task-succeeded", "task-failed"): | |
return | |
if task_uuid not in self.task_info: | |
self.logger.warning(f"Task {task_uuid} not found in task_info") | |
return | |
task_name = self.task_info[task_uuid] | |
if event_type == "task-succeeded": | |
if task_name in self.failed_tasks: | |
del self.failed_tasks[task_name] | |
self.logger.info(f"Task {task_name} succeeded") | |
self.update_status() | |
elif event_type == "task-failed": | |
if task_name in self.failed_tasks: | |
self.failed_tasks[task_name]["count"] += 1 | |
self.logger.warning(f"Task {task_name} failed again ({self.failed_tasks[task_name]["count"]})") | |
else: | |
self.failed_tasks[task_name] = {"count": 1} | |
self.logger.error(f"Task {task_name} started failing") | |
self.failed_tasks[task_name]["event"] = event | |
self.update_status() | |
del self.task_info[task_uuid] | |
def capture_events(self): | |
with app.connection() as connection: | |
receiver = EventReceiver( | |
connection, | |
handlers={ | |
"*": self.process_event, | |
}, | |
) | |
receiver.capture(limit=None, timeout=None) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# I use a trick to get dockerize into each service without having to change images | |
# I distribute dockerize to all my swarm nodes to /var/lib/swarm | |
services: | |
postgres: | |
... | |
rabbitmq: | |
... | |
celery-monitor: | |
image: your-app-django:latest | |
tty: true | |
hostname: '{{.Node.Hostname}}' | |
networks: | |
- your-app-network | |
deploy: | |
replicas: 1 | |
placement: | |
constraints: | |
- node.role == manager | |
environment: | |
STATUS_FILE: /tmp/celery-status.json | |
secrets: | |
- source: your-app.env | |
target: /app/.env | |
command: /swarm/bin/dockerize -wait tcp://rabbitmq:5672 sh -c 'python web/manage.py celery_monitor --status-file=$${STATUS_FILE} --failed-retries=3' | |
healthcheck: | |
test: ["CMD-SHELL", "test -f $${STATUS_FILE} || exit 0; cat $${STATUS_FILE}; exit 1"] | |
interval: 10s | |
timeout: 1s | |
retries: 9999 | |
start_period: 10s | |
volumes: | |
- /var/lib/swarm:/swarm | |
celery-worker: | |
image: your-app-django:latest | |
tty: true | |
hostname: '{{.Node.Hostname}}' | |
networks: | |
- your-app-network | |
deploy: | |
replicas: 2 | |
placement: | |
constraints: | |
- node.role == worker | |
update_config: | |
delay: 10s | |
secrets: | |
- source: your-app.env | |
target: /app/.env | |
command: /swarm/bin/dockerize -wait tcp://rabbitmq:5672 -wait tcp://postgres:5432 celery --workdir web -A adm worker --loglevel=info --events | |
volumes: | |
- /var/lib/swarm:/swarm |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment