Created
December 11, 2024 14:51
-
-
Save paulwinex/1e1c3a2b6b050238b9d21d51f20a55fd to your computer and use it in GitHub Desktop.
fastapi app structure example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from fastapi import FastAPI, APIRouter, Depends, HTTPException, status | |
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine | |
from sqlalchemy.ext.asyncio import async_sessionmaker | |
from sqlalchemy import String | |
from sqlalchemy.ext.declarative import declarative_base | |
from sqlalchemy.future import select | |
from sqlalchemy.orm import mapped_column, Mapped | |
from typing import TypeVar, Generic, Annotated, List | |
from contextlib import asynccontextmanager | |
from pydantic import BaseModel, ConfigDict | |
# DATABASE ############################################################################# | |
DATABASE_URL = "postgresql+asyncpg://test:test@localhost/test" | |
engine = create_async_engine(DATABASE_URL, echo=False) | |
session_factory = async_sessionmaker( | |
bind=engine, | |
autoflush=False, | |
autocommit=False, | |
expire_on_commit=False, | |
) | |
async def get_session() -> AsyncSession: | |
async with session_factory() as session: | |
yield session | |
AsyncSessionAnnotated = Annotated[AsyncSession, Depends(get_session)] | |
# MODELS ############################################################################# | |
Base = declarative_base() | |
async def setup_dev_db(): | |
async with engine.begin() as conn: | |
await conn.run_sync(Base.metadata.create_all) | |
class UserModel(Base): | |
__tablename__ = "users" | |
id: Mapped[int] = mapped_column(primary_key=True) | |
name: Mapped[str] = mapped_column(String(20)) | |
is_admin: Mapped[bool] = mapped_column(default=False) | |
# SCHEMAS ############################################################################# | |
class UserCreate(BaseModel): | |
name: str | |
class UserUpdate(BaseModel): | |
name: str = None | |
is_admin: bool = None | |
class UserResponse(BaseModel): | |
model_config = ConfigDict(from_attributes=True) | |
id: int | |
name: str | |
is_admin: bool | |
# REPOSITORY ############################################################################# | |
T_Repo = TypeVar("T_Repo") | |
T_Service = TypeVar("T_Service") | |
T_Model = TypeVar("T_Model", bound=Base) | |
class BaseRepository(Generic[T_Repo]): | |
model: T_Model = None | |
def __init__(self, session: AsyncSession) -> None: | |
self.session = session | |
async def get(self, entity_id: int) -> T_Model: | |
result = await self.session.execute(select(self.model).where(self.model.id == entity_id)) | |
instance = result.scalars().first() | |
if not instance: | |
raise HTTPException(status_code=404, detail="Entity not found") # need some app exception, hot http? | |
return instance | |
async def get_all(self) -> List[T_Model]: | |
return (await self.session.execute(select(self.model))).scalars().all() # type: ignore | |
async def create(self, creation_data: dict) -> T_Model: | |
instance = self.model(**creation_data) | |
self.session.add(instance) | |
await self.session.commit() | |
await self.session.refresh(instance) | |
return instance | |
async def update(self, instance_id: int, update_data: dict) -> T_Model: | |
instance = await self.get(instance_id) | |
for key, value in update_data.items(): | |
setattr(instance, key, value) | |
await self.session.commit() | |
await self.session.refresh(instance) | |
return instance | |
async def delete(self, instance_id: int): | |
instance = await self.get(instance_id) | |
await self.session.delete(instance) | |
await self.session.commit() | |
class UserRepository(BaseRepository[UserModel]): | |
model = UserModel | |
class BaseService(Generic[T_Service]): | |
repo_class: T_Repo | |
def __init__(self, session: AsyncSessionAnnotated) -> None: | |
self.repository = self.repo_class(session) | |
async def create(self, creation_data: dict) -> T_Model: | |
return await self.repository.create(creation_data) | |
async def get(self, instance_id: int) -> T_Model: | |
return await self.repository.get(instance_id) | |
async def list(self): | |
return await self.repository.get_all() | |
async def update(self, instance_id: int, update_data: dict) -> T_Model: | |
return await self.repository.update(instance_id, update_data) | |
async def delete(self, instance_id: int): | |
return await self.repository.delete(instance_id) | |
class UserService(BaseService[UserRepository]): | |
repo_class = UserRepository | |
async def delete(self, instance_id: int): | |
user = await self.get(instance_id) | |
if user.is_admin: | |
raise HTTPException(status_code=403, detail="Cannot delete admin user") | |
return await super().delete(instance_id) | |
# ROUTERS ############################################################################# | |
router = APIRouter() | |
@router.get("/users", response_model=list[UserResponse]) | |
async def get_users( | |
user_service: Annotated[UserService, Depends(UserService)] | |
): | |
return await user_service.repository.get_all() | |
@router.get("/users/{user_id}", response_model=UserResponse) | |
async def get_users( | |
user_id: int, | |
user_service: Annotated[UserService, Depends(UserService)] | |
): | |
users = await user_service.get(user_id) | |
return users | |
@router.post("/users", response_model=UserResponse) | |
async def create_user( | |
payload: UserCreate, | |
user_service: Annotated[UserService, Depends(UserService)] | |
): | |
new_user = await user_service.repository.create(payload.model_dump(exclude_unset=True, exclude_none=True)) | |
return new_user | |
@router.patch("/users/{user_id}", response_model=UserResponse) | |
async def update_user( | |
user_id: int, | |
payload: UserUpdate, | |
user_service: Annotated[UserService, Depends(UserService)] | |
): | |
updated_user = await user_service.update(user_id, payload.model_dump(exclude_unset=True, exclude_none=True)) | |
return updated_user | |
@router.delete("/users/{user_id}", | |
status_code=status.HTTP_204_NO_CONTENT) | |
async def delete_user( | |
user_id: int, | |
user_service: Annotated[UserService, Depends(UserService)] | |
): | |
await user_service.delete(user_id) | |
# APP ############################################################################# | |
async def on_app_startup(app: FastAPI): | |
await setup_dev_db() | |
async def on_app_shutdown(app: FastAPI): | |
await engine.dispose() | |
@asynccontextmanager | |
async def lifespan(app: FastAPI): | |
await on_app_startup(app) | |
yield | |
await on_app_shutdown(app) | |
app = FastAPI( | |
lifespan=lifespan, | |
) | |
app.include_router(router) | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=8000) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[tool.poetry] | |
name = "test-arch-services" | |
version = "0.1.0" | |
description = "" | |
authors = ["paulwinex <[email protected]>"] | |
readme = "README.md" | |
[tool.poetry.dependencies] | |
python = "^3.11" | |
sqlalchemy = "2.0.36" | |
psycopg-binary = "^3.2.3" | |
fastapi = "^0.115.6" | |
asyncpg = "^0.30.0" | |
uvicorn = "^0.32.1" | |
[build-system] | |
requires = ["poetry-core"] | |
build-backend = "poetry.core.masonry.api" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment