Skip to content

Instantly share code, notes, and snippets.

@kingbuzzman
Created April 14, 2026 11:37
Show Gist options
  • Select an option

  • Save kingbuzzman/38672d5f592ba17c9ddd561de7730ea0 to your computer and use it in GitHub Desktop.

Select an option

Save kingbuzzman/38672d5f592ba17c9ddd561de7730ea0 to your computer and use it in GitHub Desktop.
check that celery re-enqueues tasks while shutting down
diff --git a/Dockerfile b/Dockerfile
new file mode 100644
index 0000000..ece4e7a
--- /dev/null
+++ b/Dockerfile
@@ -0,0 +1,10 @@
+FROM python:3.12-slim
+
+WORKDIR /app
+
+RUN pip install Django==5.2 celery==5.4.0
+
+COPY . .
+
+ENV PYTHONPATH=/app
+ENV DJANGO_SETTINGS_MODULE=myproject.settings
diff --git a/docker-compose.yml b/docker-compose.yml
new file mode 100644
index 0000000..12a47d0
--- /dev/null
+++ b/docker-compose.yml
@@ -0,0 +1,35 @@
+services:
+ rabbitmq:
+ image: rabbitmq:3-management
+ ports:
+ - "5672:5672"
+ - "15672:15672"
+ healthcheck:
+ test: rabbitmq-diagnostics -q ping
+ interval: 5s
+ timeout: 10s
+ retries: 5
+
+ celery:
+ build: .
+ command: python -B -m celery -A myproject worker --loglevel=info --concurrency=1
+ volumes:
+ - .:/app
+ environment:
+ - CELERY_BROKER_URL=amqp://guest:guest@rabbitmq:5672//
+ depends_on:
+ rabbitmq:
+ condition: service_healthy
+
+ task_sender:
+ build: .
+ command: python -B send_tasks.py
+ volumes:
+ - .:/app
+ environment:
+ - CELERY_BROKER_URL=amqp://guest:guest@rabbitmq:5672//
+ depends_on:
+ rabbitmq:
+ condition: service_healthy
+ profiles:
+ - send
diff --git a/kill_test.sh b/kill_test.sh
new file mode 100755
index 0000000..a29e3ed
--- /dev/null
+++ b/kill_test.sh
@@ -0,0 +1,94 @@
+#!/bin/bash
+# Aggressive test: SIGTERM celery worker every 8 seconds
+# This simulates a crash and should demonstrate task loss with acks_late=False
+
+cd "$(dirname "$0")"
+
+count_completed() {
+ local count=0
+ for i in 1 2 3 4 5; do
+ [ -f "output/task_${i}.txt" ] && ((count++))
+ done
+ echo $count
+}
+
+show_completed() {
+ for i in 1 2 3 4 5; do
+ if [ -f "output/task_${i}.txt" ]; then
+ cat "output/task_${i}.txt"
+ fi
+ done
+}
+
+echo "=== Aggressive Celery Task Loss Test ==="
+echo "Tasks take 10s, worker killed with SIGTERM every 8s"
+echo "With default acks_late=False, tasks WILL be lost!"
+echo ""
+
+# Clear previous output
+rm -f output/*
+mkdir -p output
+
+# Stop any existing containers
+docker compose down 2>/dev/null
+
+# Start rabbitmq and celery
+echo "Starting rabbitmq and celery..."
+docker compose up -d rabbitmq celery
+sleep 5
+
+echo ""
+echo "Sending 5 tasks (5s intervals between each)..."
+docker compose run --rm task_sender
+
+echo ""
+echo "=== Starting SIGTERM loop ==="
+echo ""
+
+max_iterations=20
+iteration=0
+
+while [ $iteration -lt $max_iterations ]; do
+ iteration=$((iteration + 1))
+
+ echo "--- Iteration $iteration ---"
+ echo "Completed tasks:"
+ show_completed
+ [ $(count_completed) -eq 0 ] && echo "(none)"
+
+ completed=$(count_completed)
+
+ if [ "$completed" -ge 5 ]; then
+ echo ""
+ echo "=== ALL 5 TASKS COMPLETED! ==="
+ show_completed
+ docker compose down
+ exit 0
+ fi
+
+ echo ""
+ echo "Sending SIGTERM to celery worker..."
+ docker compose kill -s SIGTERM celery
+
+ echo "Restarting celery..."
+ docker compose up -d celery
+
+ echo "Waiting 8 seconds..."
+ sleep 8
+done
+
+echo ""
+echo "=== TEST FINISHED (max iterations) ==="
+echo "Final results:"
+show_completed
+completed=$(count_completed)
+echo ""
+echo "Total completed: $completed out of 5"
+if [ "$completed" -lt 5 ]; then
+ echo ""
+ echo "*** TASKS WERE LOST! ***"
+ echo "This confirms that with acks_late=False (default),"
+ echo "tasks acknowledged on receipt are lost if the worker crashes."
+fi
+
+docker compose down
diff --git a/myproject/__init__.py b/myproject/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/myproject/celery.py b/myproject/celery.py
new file mode 100644
index 0000000..1ebf325
--- /dev/null
+++ b/myproject/celery.py
@@ -0,0 +1,8 @@
+import os
+from celery import Celery
+
+os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
+
+app = Celery('myproject')
+app.config_from_object('django.conf:settings', namespace='CELERY')
+app.autodiscover_tasks()
diff --git a/myproject/settings.py b/myproject/settings.py
new file mode 100644
index 0000000..f76d837
--- /dev/null
+++ b/myproject/settings.py
@@ -0,0 +1,19 @@
+import os
+
+SECRET_KEY = 'test-secret-key'
+DEBUG = True
+ALLOWED_HOSTS = ['*']
+
+INSTALLED_APPS = [
+ 'django.contrib.contenttypes',
+ 'django.contrib.auth',
+ 'myproject',
+]
+
+CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL', 'amqp://guest:guest@rabbitmq:5672//')
+
+# NO RESULT BACKEND - intentionally omitted to test task persistence
+# CELERY_RESULT_BACKEND = None
+
+# CELERY_TASK_ACKS_LATE = os.environ.get('CELERY_TASK_ACKS_LATE', '').lower() == 'true'
+# CELERY_TASK_REJECT_ON_WORKER_LOST = os.environ.get('CELERY_TASK_REJECT_ON_WORKER_LOST', '').lower() == 'true'
\ No newline at end of file
diff --git a/myproject/tasks.py b/myproject/tasks.py
new file mode 100644
index 0000000..ccb8d30
--- /dev/null
+++ b/myproject/tasks.py
@@ -0,0 +1,14 @@
+import time
+from datetime import datetime
+from celery import shared_task
+
+@shared_task
+def write_task_number(task_num):
+ """Write task number to its own file after simulating work."""
+ time.sleep(10) # Simulate 10 seconds of work
+
+ with open(f'/app/output/task_{task_num}.txt', 'w') as f:
+ timestamp = datetime.now().isoformat()
+ f.write(f"Task {task_num} completed at {timestamp}\n")
+
+ return f"Task {task_num} done"
diff --git a/send_tasks.py b/send_tasks.py
new file mode 100644
index 0000000..9b60578
--- /dev/null
+++ b/send_tasks.py
@@ -0,0 +1,24 @@
+#!/usr/bin/env python
+"""Send 5 tasks with 5-second intervals."""
+import os
+import sys
+import time
+from datetime import datetime, timedelta
+
+os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
+sys.path.insert(0, '/app')
+
+from myproject.celery import app
+from myproject.tasks import write_task_number
+now = datetime.now()
+
+print("Sending 5 tasks with 5-second intervals...")
+print(f"Each task takes 10 seconds to complete. now is {now}\n")
+
+for i in range(1, 6):
+ x = time.perf_counter()
+ eta = now + timedelta(seconds=5 * i)
+ result = write_task_number.apply_async(args=[i], eta=eta)
+ print(f"Task {i} sent with id: {result.id} at {eta} -- {time.perf_counter() - x} seconds")
+
+print("\nAll tasks sent! Check output/* for results.")
@kingbuzzman
Copy link
Copy Markdown
Author

Run this in /tmp .. something like cd /tmp; mkdir temp; cd temp

and then

curl https://gist.githubusercontent.com/kingbuzzman/38672d5f592ba17c9ddd561de7730ea0/raw/abc2ce3c235ae16aa29de9420b9ad0d7f5b2f69a/celery-tasks.diff | patch -p1; docker compose build && bash kill_test.sh

output looks like this:

....
Sending SIGTERM to celery worker...
no container to killRestarting celery...
[+] Running 2/2
 ✔ Container thing-rabbitmq-1  Healthy                                                                                                                              0.5s 
 ✔ Container thing-celery-1    Started                                                                                                                              0.1s 
Waiting 8 seconds...
--- Iteration 11 ---
Completed tasks:
Task 1 completed at 2026-04-14T06:43:38.489682
Task 2 completed at 2026-04-14T06:43:56.140146
Task 3 completed at 2026-04-14T06:44:13.849669
Task 4 completed at 2026-04-14T06:44:31.548785

Sending SIGTERM to celery worker...
[+] Killing 1/1
 ✔ Container thing-celery-1  Killed                                                                                                                                 0.0s 
Restarting celery...
[+] Running 2/2
 ✔ Container thing-rabbitmq-1  Healthy                                                                                                                              0.5s 
 ✔ Container thing-celery-1    Running                                                                                                                              0.0s 
Waiting 8 seconds...
--- Iteration 12 ---
Completed tasks:
Task 1 completed at 2026-04-14T06:43:38.489682
Task 2 completed at 2026-04-14T06:43:56.140146
Task 3 completed at 2026-04-14T06:44:13.849669
Task 4 completed at 2026-04-14T06:44:31.548785
Task 5 completed at 2026-04-14T06:44:49.225215

=== ALL 5 TASKS COMPLETED! ===
Task 1 completed at 2026-04-14T06:43:38.489682
Task 2 completed at 2026-04-14T06:43:56.140146
Task 3 completed at 2026-04-14T06:44:13.849669
Task 4 completed at 2026-04-14T06:44:31.548785
Task 5 completed at 2026-04-14T06:44:49.225215
[+] Running 3/3
 ✔ Container thing-celery-1    Removed                                                                                                                              0.0s 
 ✔ Container thing-rabbitmq-1  Removed                                                                                                                              1.3s 
 ✔ Network thing_default       Removed     

@kingbuzzman
Copy link
Copy Markdown
Author

This proves that the default settings of celery while shutting down a worker, will move any and all tasks it picked up to another celery task if its shutting down.

In this example, there is only one worker, only one celery. If its shutting down, it sends it back to rabbit, only for rabbit to give it back to celery when it comes back online

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment