Skip to content

Instantly share code, notes, and snippets.

@sector119
Last active November 30, 2022 18:27
Show Gist options
  • Save sector119/8318f65750560964cac04fc8d1cc0790 to your computer and use it in GitHub Desktop.
Save sector119/8318f65750560964cac04fc8d1cc0790 to your computer and use it in GitHub Desktop.
FastAPI + Async SQLAlchemy
from fastapi import Depends, FastAPI
from .dependencies import get_service
from .schemas import DataResponseSchema
from .services import DataService
app = FastAPI()
@app.get("/data", response_model=DataResponseSchema)
async def get(
data: DataService = Depends(get_service(DataService))
):
data = await data.get(service_id=123)
return data
from typing import (
AsyncGenerator,
Callable,
Type
)
from fastapi import Depends
from .models import AsyncSession
from .services import BaseService
async def get_dbsession() -> AsyncGenerator[AsyncSession, None]:
async with AsyncSession() as session:
yield session
def get_service(service_type: Type[BaseService]) -> Callable[[AsyncSession], BaseService]:
def _get_service(dbsession: AsyncSession = Depends(get_dbsession)) -> BaseService:
return service_type(dbsession)
return _get_service
import warnings
from sqlalchemy import exc
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from .config import settings
# Ensure that a warnings filter will not suppress any warnings
warnings.filterwarnings(
"always", category=exc.Base20DeprecationWarning
)
SQLALCHEMY_ASYNC_ENGINE_OPTIONS = {
"echo": False,
"connect_args": {
"connect_timeout": 5 # seconds
},
"future": True
}
async_engine = create_async_engine(settings.SQLALCHEMY_DATABASE_ASYNC_URI, **SQLALCHEMY_ASYNC_ENGINE_OPTIONS)
AsyncSession = async_sessionmaker(
async_engine,
expire_on_commit=False,
future=True
)
from sqlalchemy.future import select
from .models import DataModel
def get_data_q(
*,
service_id: int
):
q = select(
DataModel
).where(
DataModel.service_id == service_id
)
return q
from .models import AsyncSession
from .queries import get_data_q
from .schemas import DataResponseSchema
class BaseService:
def __init__(self, dbsession: AsyncSession) -> None:
self._dbsession = dbsession
@property
def dbsession(self) -> AsyncSession:
return self._dbsession
class DataService(BaseService):
async def get(
self,
*,
service_id: int
) -> DataResponseSchema:
s = get_data_q(service_id=service_id)
result = await self.dbsession.execute(s)
data = result.one_or_none()
return data
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment