Skip to content

Instantly share code, notes, and snippets.

@deepanshumehtaa
Last active April 26, 2025 11:16
Show Gist options
  • Save deepanshumehtaa/4ef33eb4e8060f1f7092892090b65296 to your computer and use it in GitHub Desktop.
Save deepanshumehtaa/4ef33eb4e8060f1f7092892090b65296 to your computer and use it in GitHub Desktop.
Backgrond tasks in FastAPI
# gunicorn -w 2 -k uvicorn.workers.UvicornWorker main:app --max-requests 100 --max-requests-jitter 10 --threads 5
# ..........................................................................................
import asyncio
import os
import logging
# from asyncio.log import logger
from time import sleep
logger = logging.getLogger("LOG")
async def some_async_bg_task(num: int):
"""combination of blocking and unblocking code"""
pid = os.getpid()
logger.info(f"some_async_bg_task started: {pid} ...")
li = [i for i in range(num*10_000_000)]
await asyncio.sleep(1)
msg = f"length of list is : {len(li)}"
logger.info("some_async_bg_task done: " + msg)
def some_blocking_bg_task(num: int):
"""blocking code"""
pid = os.getgid()
logger.info(f"some_blocking_bg_task started: {pid} ...")
li = [i for i in range(num*10_000_000)]
sleep(5) # blocking code of 5 sec
msg = f"length of list is : {len(li)}"
logger.info("some_blocking_bg_task done: " + msg)
def some_mixed_task_bg_task(num: int):
"""blocking code"""
pid = os.getgid()
logger.info(f"some_mixed_task_bg_task started: {pid} ...")
li = [i for i in range(num*10_000_000)]
sleep(5) # blocking code of 5 sec
asyncio.run(asyncio.sleep(5))
msg = f"length of list is : {len(li)}"
logger.info("some_mixed_task_bg_task done: " + msg)
# ................................................................................................
@bg_apis.post("/bg-async/{num}")
async def run_bg_task0(
num: int,
background_tasks: BackgroundTasks
):
if num < 2:
raise HTTPException(400, "Number should be greater than 2")
# way 1:
background_tasks.add_task(some_async_bg_task, num)
return {"message": "Notification sent in the background"}
# Only for blocking tasks .............................
@bg_apis.post("/sync-thread/{num}")
async def run_bg_task3(num: int):
# crash
# threading.Thread(target=some_blocking_bg_task, args=(num,)).start()
# not crashed, because of in-built queue for threads
thread_pool_exe.submit(some_blocking_bg_task, num)
return {"result": True}
@bg_apis.post("/bg-sync/{num}") # OK
async def run_bg_task0(
num: int,
background_tasks: BackgroundTasks
):
"""
To RUN BG task using `background_tasks`, the api (or view) counld be anything what important is
the fn that going to execute with `background_tasks`:
if:
fn is simple not async/await, then it run in thread pool `run_in_threadpool`
else:
async fn executed in same way the gets executed
"""
if num < 2:
raise HTTPException(400, "Number should be greater than 2")
background_tasks.add_task(some_blocking_bg_task, num)
return {"message": "BG process started !!"}
..................................................................................................
from datetime import datetime
from typing import Optional, List, Dict
from fastapi import HTTPException
from pydantic import BaseModel, Field, field_validator
class Profile(BaseModel):
pic: str
is_admin: bool = False
class UserModel(BaseModel):
"""
the obj of `Item(BaseModel)` is dict
"""
name: str
profile: Profile
description: Optional[str] = None
price: float = Field(description="Price in Ruppees")
tags: List[str] = []
date: datetime = datetime.now()
@field_validator("profile", mode="after")
def validate_profile(cls: BaseModel, value):
print("after !!!")
if value < 0:
raise HTTPException(500, "Price cannot be -ve")
return value
@field_validator("profile", mode="before")
def validate_profile(cls: BaseModel, data: Dict):
print("before !!!")
import pdb; pdb.set_trace()
if data['is_admin']:
print("User is Admin")
return data
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment