Created
April 14, 2026 11:37
-
-
Save kingbuzzman/38672d5f592ba17c9ddd561de7730ea0 to your computer and use it in GitHub Desktop.
check that celery re-enqueues tasks while shutting down
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| diff --git a/Dockerfile b/Dockerfile | |
| new file mode 100644 | |
| index 0000000..ece4e7a | |
| --- /dev/null | |
| +++ b/Dockerfile | |
| @@ -0,0 +1,10 @@ | |
| +FROM python:3.12-slim | |
| + | |
| +WORKDIR /app | |
| + | |
| +RUN pip install Django==5.2 celery==5.4.0 | |
| + | |
| +COPY . . | |
| + | |
| +ENV PYTHONPATH=/app | |
| +ENV DJANGO_SETTINGS_MODULE=myproject.settings | |
| diff --git a/docker-compose.yml b/docker-compose.yml | |
| new file mode 100644 | |
| index 0000000..12a47d0 | |
| --- /dev/null | |
| +++ b/docker-compose.yml | |
| @@ -0,0 +1,35 @@ | |
| +services: | |
| + rabbitmq: | |
| + image: rabbitmq:3-management | |
| + ports: | |
| + - "5672:5672" | |
| + - "15672:15672" | |
| + healthcheck: | |
| + test: rabbitmq-diagnostics -q ping | |
| + interval: 5s | |
| + timeout: 10s | |
| + retries: 5 | |
| + | |
| + celery: | |
| + build: . | |
| + command: python -B -m celery -A myproject worker --loglevel=info --concurrency=1 | |
| + volumes: | |
| + - .:/app | |
| + environment: | |
| + - CELERY_BROKER_URL=amqp://guest:guest@rabbitmq:5672// | |
| + depends_on: | |
| + rabbitmq: | |
| + condition: service_healthy | |
| + | |
| + task_sender: | |
| + build: . | |
| + command: python -B send_tasks.py | |
| + volumes: | |
| + - .:/app | |
| + environment: | |
| + - CELERY_BROKER_URL=amqp://guest:guest@rabbitmq:5672// | |
| + depends_on: | |
| + rabbitmq: | |
| + condition: service_healthy | |
| + profiles: | |
| + - send | |
| diff --git a/kill_test.sh b/kill_test.sh | |
| new file mode 100755 | |
| index 0000000..a29e3ed | |
| --- /dev/null | |
| +++ b/kill_test.sh | |
| @@ -0,0 +1,94 @@ | |
| +#!/bin/bash | |
| +# Aggressive test: SIGTERM celery worker every 8 seconds | |
| +# This simulates a crash and should demonstrate task loss with acks_late=False | |
| + | |
| +cd "$(dirname "$0")" | |
| + | |
| +count_completed() { | |
| + local count=0 | |
| + for i in 1 2 3 4 5; do | |
| + [ -f "output/task_${i}.txt" ] && ((count++)) | |
| + done | |
| + echo $count | |
| +} | |
| + | |
| +show_completed() { | |
| + for i in 1 2 3 4 5; do | |
| + if [ -f "output/task_${i}.txt" ]; then | |
| + cat "output/task_${i}.txt" | |
| + fi | |
| + done | |
| +} | |
| + | |
| +echo "=== Aggressive Celery Task Loss Test ===" | |
| +echo "Tasks take 10s, worker killed with SIGTERM every 8s" | |
| +echo "With default acks_late=False, tasks WILL be lost!" | |
| +echo "" | |
| + | |
| +# Clear previous output | |
| +rm -f output/* | |
| +mkdir -p output | |
| + | |
| +# Stop any existing containers | |
| +docker compose down 2>/dev/null | |
| + | |
| +# Start rabbitmq and celery | |
| +echo "Starting rabbitmq and celery..." | |
| +docker compose up -d rabbitmq celery | |
| +sleep 5 | |
| + | |
| +echo "" | |
| +echo "Sending 5 tasks (5s intervals between each)..." | |
| +docker compose run --rm task_sender | |
| + | |
| +echo "" | |
| +echo "=== Starting SIGTERM loop ===" | |
| +echo "" | |
| + | |
| +max_iterations=20 | |
| +iteration=0 | |
| + | |
| +while [ $iteration -lt $max_iterations ]; do | |
| + iteration=$((iteration + 1)) | |
| + | |
| + echo "--- Iteration $iteration ---" | |
| + echo "Completed tasks:" | |
| + show_completed | |
| + [ $(count_completed) -eq 0 ] && echo "(none)" | |
| + | |
| + completed=$(count_completed) | |
| + | |
| + if [ "$completed" -ge 5 ]; then | |
| + echo "" | |
| + echo "=== ALL 5 TASKS COMPLETED! ===" | |
| + show_completed | |
| + docker compose down | |
| + exit 0 | |
| + fi | |
| + | |
| + echo "" | |
| + echo "Sending SIGTERM to celery worker..." | |
| + docker compose kill -s SIGTERM celery | |
| + | |
| + echo "Restarting celery..." | |
| + docker compose up -d celery | |
| + | |
| + echo "Waiting 8 seconds..." | |
| + sleep 8 | |
| +done | |
| + | |
| +echo "" | |
| +echo "=== TEST FINISHED (max iterations) ===" | |
| +echo "Final results:" | |
| +show_completed | |
| +completed=$(count_completed) | |
| +echo "" | |
| +echo "Total completed: $completed out of 5" | |
| +if [ "$completed" -lt 5 ]; then | |
| + echo "" | |
| + echo "*** TASKS WERE LOST! ***" | |
| + echo "This confirms that with acks_late=False (default)," | |
| + echo "tasks acknowledged on receipt are lost if the worker crashes." | |
| +fi | |
| + | |
| +docker compose down | |
| diff --git a/myproject/__init__.py b/myproject/__init__.py | |
| new file mode 100644 | |
| index 0000000..e69de29 | |
| diff --git a/myproject/celery.py b/myproject/celery.py | |
| new file mode 100644 | |
| index 0000000..1ebf325 | |
| --- /dev/null | |
| +++ b/myproject/celery.py | |
| @@ -0,0 +1,8 @@ | |
| +import os | |
| +from celery import Celery | |
| + | |
| +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings') | |
| + | |
| +app = Celery('myproject') | |
| +app.config_from_object('django.conf:settings', namespace='CELERY') | |
| +app.autodiscover_tasks() | |
| diff --git a/myproject/settings.py b/myproject/settings.py | |
| new file mode 100644 | |
| index 0000000..f76d837 | |
| --- /dev/null | |
| +++ b/myproject/settings.py | |
| @@ -0,0 +1,19 @@ | |
| +import os | |
| + | |
| +SECRET_KEY = 'test-secret-key' | |
| +DEBUG = True | |
| +ALLOWED_HOSTS = ['*'] | |
| + | |
| +INSTALLED_APPS = [ | |
| + 'django.contrib.contenttypes', | |
| + 'django.contrib.auth', | |
| + 'myproject', | |
| +] | |
| + | |
| +CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL', 'amqp://guest:guest@rabbitmq:5672//') | |
| + | |
| +# NO RESULT BACKEND - intentionally omitted to test task persistence | |
| +# CELERY_RESULT_BACKEND = None | |
| + | |
| +# CELERY_TASK_ACKS_LATE = os.environ.get('CELERY_TASK_ACKS_LATE', '').lower() == 'true' | |
| +# CELERY_TASK_REJECT_ON_WORKER_LOST = os.environ.get('CELERY_TASK_REJECT_ON_WORKER_LOST', '').lower() == 'true' | |
| \ No newline at end of file | |
| diff --git a/myproject/tasks.py b/myproject/tasks.py | |
| new file mode 100644 | |
| index 0000000..ccb8d30 | |
| --- /dev/null | |
| +++ b/myproject/tasks.py | |
| @@ -0,0 +1,14 @@ | |
| +import time | |
| +from datetime import datetime | |
| +from celery import shared_task | |
| + | |
| +@shared_task | |
| +def write_task_number(task_num): | |
| + """Write task number to its own file after simulating work.""" | |
| + time.sleep(10) # Simulate 10 seconds of work | |
| + | |
| + with open(f'/app/output/task_{task_num}.txt', 'w') as f: | |
| + timestamp = datetime.now().isoformat() | |
| + f.write(f"Task {task_num} completed at {timestamp}\n") | |
| + | |
| + return f"Task {task_num} done" | |
| diff --git a/send_tasks.py b/send_tasks.py | |
| new file mode 100644 | |
| index 0000000..9b60578 | |
| --- /dev/null | |
| +++ b/send_tasks.py | |
| @@ -0,0 +1,24 @@ | |
| +#!/usr/bin/env python | |
| +"""Send 5 tasks with 5-second intervals.""" | |
| +import os | |
| +import sys | |
| +import time | |
| +from datetime import datetime, timedelta | |
| + | |
| +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings') | |
| +sys.path.insert(0, '/app') | |
| + | |
| +from myproject.celery import app | |
| +from myproject.tasks import write_task_number | |
| +now = datetime.now() | |
| + | |
| +print("Sending 5 tasks with 5-second intervals...") | |
| +print(f"Each task takes 10 seconds to complete. now is {now}\n") | |
| + | |
| +for i in range(1, 6): | |
| + x = time.perf_counter() | |
| + eta = now + timedelta(seconds=5 * i) | |
| + result = write_task_number.apply_async(args=[i], eta=eta) | |
| + print(f"Task {i} sent with id: {result.id} at {eta} -- {time.perf_counter() - x} seconds") | |
| + | |
| +print("\nAll tasks sent! Check output/* for results.") |
Author
Author
This proves that the default settings of celery while shutting down a worker, will move any and all tasks it picked up to another celery task if its shutting down.
In this example, there is only one worker, only one celery. If its shutting down, it sends it back to rabbit, only for rabbit to give it back to celery when it comes back online
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Run this in
/tmp.. something likecd /tmp; mkdir temp; cd tempand then
curl https://gist.githubusercontent.com/kingbuzzman/38672d5f592ba17c9ddd561de7730ea0/raw/abc2ce3c235ae16aa29de9420b9ad0d7f5b2f69a/celery-tasks.diff | patch -p1; docker compose build && bash kill_test.shoutput looks like this: