Skip to content

Instantly share code, notes, and snippets.

@grahamlyons
Created August 16, 2025 08:47
Show Gist options
  • Save grahamlyons/5e7053e5fc9e56bec0cb62aca4232991 to your computer and use it in GitHub Desktop.
Save grahamlyons/5e7053e5fc9e56bec0cb62aca4232991 to your computer and use it in GitHub Desktop.
Celery, using RabbitMQ, configured with retries and dead letter queues
"""
Run with: `celery -A tasks worker`
Expects a RabbitMQ to be served at `rabbitmq:5672` so will need adjustment if it is served on `localhost`.
"""
import logging
from typing import Dict, Tuple
from celery import Celery, Task
from kombu import Exchange, Queue
from celery.exceptions import Reject
from billiard.einfo import ExceptionInfo
from celery import bootsteps
default_queue_name = "default"
dead_letter_suffix = "dlq"
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class CallbackOnFailureTask(Task):
def on_failure(
self,
exc: Exception,
task_id: str,
args: Tuple,
kwargs: Dict,
einfo: ExceptionInfo,
):
logger.error("[on_failure] task failed (%s - %s)", self.name, task_id)
raise Reject(exc, requeue=False)
class DeclareDLXnDLQ(bootsteps.StartStopStep):
"""
Celery Bootstep to declare the DL exchange and queues before the worker starts
processing tasks
"""
requires = {"celery.worker.components:Pool"}
def start(self, worker):
def create_dlq(queue: str) -> Queue:
dlq_name = f"{queue}.{dead_letter_suffix}"
dlx = Exchange(dlq_name, type="direct")
return Queue(dlq_name, dlx, routing_key=dlq_name)
dlqs = [create_dlq(queue) for queue in (default_queue_name,)]
for dlq in dlqs:
with worker.app.pool.acquire() as conn:
dlq.bind(conn).declare()
app = Celery(
'tasks',
broker='amqp://guest:guest@rabbitmq:5672/',
backend=None)
app.conf.task_default_queue = default_queue_name
app.conf.task_queues = [
Queue(
default_queue_name,
Exchange(default_queue_name, type="direct"),
routing_key=default_queue_name,
queue_arguments={
"x-dead-letter-exchange": f"{default_queue_name}.{dead_letter_suffix}",
"x-dead-letter-routing-key": f"{default_queue_name}.{dead_letter_suffix}",
},
),
]
app.steps["worker"].add(DeclareDLXnDLQ)
@app.task
def add(x, y):
logger.info(f"add received: {x} + {y}")
return x + y
@app.task(
bind=True,
base=CallbackOnFailureTask,
acks_late=True,
autoretry_for=(Exception,),
retry_backoff=True,
retry_backoff_max=1,
max_retries=3,
)
def div(self, x, y):
logger.info(f"div received: {x} / {y} - retrying {self.request.retries}")
z = x / y
return z
"""
Run with: `python test_tasks.py`
"""
from tasks import app
app.send_task("tasks.add", args=[2, 3])
app.send_task("tasks.div", args=[2, 0])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment