Created
August 16, 2025 08:47
-
-
Save grahamlyons/5e7053e5fc9e56bec0cb62aca4232991 to your computer and use it in GitHub Desktop.
Celery, using RabbitMQ, configured with retries and dead letter queues
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Run with: `celery -A tasks worker` | |
Expects a RabbitMQ to be served at `rabbitmq:5672` so will need adjustment if it is served on `localhost`. | |
""" | |
import logging | |
from typing import Dict, Tuple | |
from celery import Celery, Task | |
from kombu import Exchange, Queue | |
from celery.exceptions import Reject | |
from billiard.einfo import ExceptionInfo | |
from celery import bootsteps | |
default_queue_name = "default" | |
dead_letter_suffix = "dlq" | |
logger = logging.getLogger(__name__) | |
logger.setLevel(logging.INFO) | |
class CallbackOnFailureTask(Task): | |
def on_failure( | |
self, | |
exc: Exception, | |
task_id: str, | |
args: Tuple, | |
kwargs: Dict, | |
einfo: ExceptionInfo, | |
): | |
logger.error("[on_failure] task failed (%s - %s)", self.name, task_id) | |
raise Reject(exc, requeue=False) | |
class DeclareDLXnDLQ(bootsteps.StartStopStep): | |
""" | |
Celery Bootstep to declare the DL exchange and queues before the worker starts | |
processing tasks | |
""" | |
requires = {"celery.worker.components:Pool"} | |
def start(self, worker): | |
def create_dlq(queue: str) -> Queue: | |
dlq_name = f"{queue}.{dead_letter_suffix}" | |
dlx = Exchange(dlq_name, type="direct") | |
return Queue(dlq_name, dlx, routing_key=dlq_name) | |
dlqs = [create_dlq(queue) for queue in (default_queue_name,)] | |
for dlq in dlqs: | |
with worker.app.pool.acquire() as conn: | |
dlq.bind(conn).declare() | |
app = Celery( | |
'tasks', | |
broker='amqp://guest:guest@rabbitmq:5672/', | |
backend=None) | |
app.conf.task_default_queue = default_queue_name | |
app.conf.task_queues = [ | |
Queue( | |
default_queue_name, | |
Exchange(default_queue_name, type="direct"), | |
routing_key=default_queue_name, | |
queue_arguments={ | |
"x-dead-letter-exchange": f"{default_queue_name}.{dead_letter_suffix}", | |
"x-dead-letter-routing-key": f"{default_queue_name}.{dead_letter_suffix}", | |
}, | |
), | |
] | |
app.steps["worker"].add(DeclareDLXnDLQ) | |
@app.task | |
def add(x, y): | |
logger.info(f"add received: {x} + {y}") | |
return x + y | |
@app.task( | |
bind=True, | |
base=CallbackOnFailureTask, | |
acks_late=True, | |
autoretry_for=(Exception,), | |
retry_backoff=True, | |
retry_backoff_max=1, | |
max_retries=3, | |
) | |
def div(self, x, y): | |
logger.info(f"div received: {x} / {y} - retrying {self.request.retries}") | |
z = x / y | |
return z |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Run with: `python test_tasks.py` | |
""" | |
from tasks import app | |
app.send_task("tasks.add", args=[2, 3]) | |
app.send_task("tasks.div", args=[2, 0]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment