Skip to content

Instantly share code, notes, and snippets.

@coderberry
Created March 28, 2025 14:21
Show Gist options
  • Save coderberry/e13165e2ae66ecbf3df9ee1aa0ffb373 to your computer and use it in GitHub Desktop.
Save coderberry/e13165e2ae66ecbf3df9ee1aa0ffb373 to your computer and use it in GitHub Desktop.
This document outlines best practices and examples for building FastAPI applications that offload work to Celery tasks, including chaining tasks and handling asynchronous HTTP requests.

FastAPI and Celery Guidelines

This document outlines best practices and examples for building FastAPI applications that offload work to Celery tasks, including chaining tasks and handling asynchronous HTTP requests.


Key Guidelines

1. Define Tasks as Pure Functions

  • Synchronous Nature:
    Celery tasks are normally synchronous functions executed by separate worker processes. While FastAPI endpoints can be asynchronous, Celery tasks generally remain synchronous (unless you explicitly run an event loop inside them).

  • Side Effects:
    Keep tasks idempotent and independent. They should do one thing well (e.g., fetching data, processing data) and return a value for chaining if needed.

2. Creating Chains of Tasks

  • Celery Signatures:
    Use Celery's built-in primitives like chain, group, or chord to create workflows. Chaining allows you to pass the output of one task to the next seamlessly.

  • Error Handling in Chains:
    By default, if any task in a chain fails, subsequent tasks won't execute. Use proper error handling or consider the link_error parameter to define failure callbacks when building complex workflows.

  • Example Pattern:
    For instance, you might have one task to make an HTTP request and another to process the response. Using chain(task1.s(args), task2.s()) will let the output of the first task flow into the second.

3. Handling Async vs. Synchronous Methods

  • FastAPI Endpoints:
    FastAPI routes can be defined as async functions. When an endpoint triggers a background task (using Celery's .delay() or .apply_async()), it hands off the work and immediately returns a response.

  • Celery Tasks and HTTP Requests:

    • If your task makes an HTTP call (e.g., using requests), it will block until a response is received—this is typical and usually acceptable within a Celery worker.
    • If you need non-blocking behavior for HTTP calls inside a task, you must either use a synchronous wrapper (like using asyncio.run to execute an async HTTP client call) or switch to a library that can operate within a synchronous context.
    • Important Note: Even when using asyncio.run() to run async code inside a Celery task, the worker thread still blocks until completion. The async benefit is primarily for concurrent operations within that single task, not for freeing up the worker itself.

Example Implementation

Celery Configuration and Tasks

# celery_app.py
from celery import Celery, chain

# Initialize your Celery app with a broker (like Redis) and backend for results.
celery_app = Celery('my_app', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

@celery_app.task
def fetch_data(url: str) -> dict:
    """
    Task to perform an HTTP GET request.
    Note: Using requests which is a synchronous HTTP client.
    """
    import requests
    response = requests.get(url)
    # Assume the response is JSON
    return response.json()

@celery_app.task
def process_data(data: dict) -> dict:
    """
    Task to process the fetched data.
    """
    # Process the data (e.g., filtering, transformation)
    processed = {"processed": data}
    return processed

def start_task_chain(url: str):
    """
    Chain the tasks so that fetch_data feeds into process_data.
    """
    # Create a chain of tasks
    task_chain = chain(fetch_data.s(url), process_data.s())
    # Start the chain asynchronously
    result = task_chain.apply_async()
    return result.id  # Return the task chain ID for tracking

Implementing a Task Status Endpoint

# fastapi_app.py
from fastapi import FastAPI, HTTPException
from celery.result import AsyncResult
from celery_app import start_task_chain

app = FastAPI()

@app.post("/process-url/")
async def process_url(url: str):
    """Endpoint to start a task chain and return the task ID."""
    task_id = start_task_chain(url)
    return {"task_id": task_id}

@app.get("/task-status/{task_id}")
async def get_task_status(task_id: str):
    """
    Endpoint to check the status of a task or task chain.
    Returns the current state and result if available.
    """
    result = AsyncResult(task_id)

    response = {
        "task_id": task_id,
        "status": result.status,
    }

    # Add additional info based on task state
    if result.successful():
        response["result"] = result.get()
    elif result.failed():
        response["error"] = str(result.result)
    elif result.status == 'PENDING':
        # Task not found or not started
        response["info"] = "Task not yet started or not found"
    else:
        # Task is probably running
        response["info"] = "Task is in progress"

    return response

Additional Considerations

  • Error Handling & Timeouts: Ensure that your tasks handle HTTP errors and timeouts appropriately. Use try/except blocks within tasks to catch exceptions and possibly retry tasks using Celery's retry mechanisms:

    @celery_app.task(bind=True, max_retries=3)
    def fetch_data_with_retry(self, url: str) -> dict:
        """Task with retry logic for handling request failures."""
        import requests
        try:
            response = requests.get(url, timeout=10)
            response.raise_for_status()  # Raises HTTPError for bad responses
            return response.json()
        except (requests.HTTPError, requests.ConnectionError, requests.Timeout) as exc:
            # Retry with exponential backoff (retry in 2, 4, 8 seconds)
            self.retry(exc=exc, countdown=2 ** self.request.retries)
  • Tracking Task Status: The FastAPI endpoint returns a task ID that clients can use (via the status endpoint) to check the status of the task and retrieve results when available.

  • Async HTTP in Celery Tasks: If you need to make async HTTP calls (for example, using an async HTTP client like httpx), you can run the async code inside your synchronous task using asyncio.run(). For example:

    import asyncio
    import httpx
    
    @celery_app.task
    def fetch_data_async(url: str) -> dict:
        async def _fetch():
            async with httpx.AsyncClient() as client:
                response = await client.get(url)
                return response.json()
        return asyncio.run(_fetch())

    Note: This approach still blocks the Celery worker thread until completion. The benefit is primarily for concurrent operations within the task (e.g., making multiple HTTP requests in parallel), not for freeing up the worker itself.

  • Task Result Expiration: Configure task result expiration time in Celery settings to avoid filling up your result backend:

    celery_app.conf.update(
        result_expires=3600,  # Results expire after 1 hour
    )

By following these guidelines and examples, you can design a FastAPI application that triggers background Celery tasks, chains operations, and properly handles HTTP requests within tasks. This approach helps keep your API responsive while delegating long-running or blocking operations to background workers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment