Last active
November 30, 2022 18:27
-
-
Save sector119/8318f65750560964cac04fc8d1cc0790 to your computer and use it in GitHub Desktop.
FastAPI + Async SQLAlchemy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from fastapi import Depends, FastAPI | |
from .dependencies import get_service | |
from .schemas import DataResponseSchema | |
from .services import DataService | |
app = FastAPI() | |
@app.get("/data", response_model=DataResponseSchema) | |
async def get( | |
data: DataService = Depends(get_service(DataService)) | |
): | |
data = await data.get(service_id=123) | |
return data |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import ( | |
AsyncGenerator, | |
Callable, | |
Type | |
) | |
from fastapi import Depends | |
from .models import AsyncSession | |
from .services import BaseService | |
async def get_dbsession() -> AsyncGenerator[AsyncSession, None]: | |
async with AsyncSession() as session: | |
yield session | |
def get_service(service_type: Type[BaseService]) -> Callable[[AsyncSession], BaseService]: | |
def _get_service(dbsession: AsyncSession = Depends(get_dbsession)) -> BaseService: | |
return service_type(dbsession) | |
return _get_service |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import warnings | |
from sqlalchemy import exc | |
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine | |
from .config import settings | |
# Ensure that a warnings filter will not suppress any warnings | |
warnings.filterwarnings( | |
"always", category=exc.Base20DeprecationWarning | |
) | |
SQLALCHEMY_ASYNC_ENGINE_OPTIONS = { | |
"echo": False, | |
"connect_args": { | |
"connect_timeout": 5 # seconds | |
}, | |
"future": True | |
} | |
async_engine = create_async_engine(settings.SQLALCHEMY_DATABASE_ASYNC_URI, **SQLALCHEMY_ASYNC_ENGINE_OPTIONS) | |
AsyncSession = async_sessionmaker( | |
async_engine, | |
expire_on_commit=False, | |
future=True | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from sqlalchemy.future import select | |
from .models import DataModel | |
def get_data_q( | |
*, | |
service_id: int | |
): | |
q = select( | |
DataModel | |
).where( | |
DataModel.service_id == service_id | |
) | |
return q |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from .models import AsyncSession | |
from .queries import get_data_q | |
from .schemas import DataResponseSchema | |
class BaseService: | |
def __init__(self, dbsession: AsyncSession) -> None: | |
self._dbsession = dbsession | |
@property | |
def dbsession(self) -> AsyncSession: | |
return self._dbsession | |
class DataService(BaseService): | |
async def get( | |
self, | |
*, | |
service_id: int | |
) -> DataResponseSchema: | |
s = get_data_q(service_id=service_id) | |
result = await self.dbsession.execute(s) | |
data = result.one_or_none() | |
return data |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment