Last active
February 10, 2023 08:41
-
-
Save horodchukanton/9ca868de0788a7dd8f6162d27fd6e873 to your computer and use it in GitHub Desktop.
FastAPI generic CRUD router
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import codecs | |
import csv | |
import logging | |
from typing import List, Type, Union, Dict | |
from fastapi import ( | |
HTTPException, APIRouter, Request, Response, File, | |
UploadFile, ) | |
from fastapi.responses import StreamingResponse | |
from pydantic import ValidationError, BaseModel | |
from sqlalchemy.exc import (NoResultFound, IntegrityError) | |
from application.core.persistence import Base | |
from application.core.queue.controller import SimpleTasksController | |
from application.core.queue.storage import MissingEntryRequestedException | |
from application.core.schemas.bulk_update import BulkUpdateValues | |
from application.core.schemas.diff import GenericDifferenceResponse | |
from application.core.schemas.import_result import ImportResponse | |
from application.core.schemas.options_bulk_request import BulkOptionsRequest | |
from application.core.schemas.queued_task import ( | |
OperationStatusResponse, | |
OperationQueuedResponse, ) | |
class AbstractCrudRouter(APIRouter): | |
model: Base = None | |
result_schema: Type[BaseModel] = None | |
search_schema: Type[BaseModel] = None | |
update_schema: Type[BaseModel] = None | |
diff_schema = GenericDifferenceResponse | |
crud = None | |
tasks_controller = None | |
prefix: str = None | |
list_path: str = None | |
get_path: str = None | |
create_path: str = None | |
update_path: str = None | |
delete_path: str = None | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.logger = logging.getLogger("crud_" + self.get_entity_name()) | |
self.tasks_controller = SimpleTasksController() | |
self._build_router() | |
def _build_router(self): | |
super().add_api_route( | |
name="get_" + self.get_entity_name() + "_list", | |
path=self.list_path, methods=['get'], | |
response_model=List[self.result_schema], | |
operation_id="list" + self.get_entity_name(), | |
endpoint=self.get_entities_list) | |
super().add_api_route( | |
name="read_" + self.get_entity_name(), | |
path=self.get_path, methods=['get'], | |
response_model=self.result_schema, | |
operation_id="get" + self.get_entity_name(), | |
endpoint=self.get_entity) | |
super().add_api_route( | |
name="update_" + self.get_entity_name(), | |
path=self.update_path, methods=['put'], | |
response_model=self.result_schema, | |
operation_id="update" + self.get_entity_name(), | |
endpoint=self.update_entity) | |
super().add_api_route( | |
name="bulk_update_" + self.get_entity_name(), | |
path=self.list_path + '/bulk', | |
methods=['put'], | |
operation_id="bulk_update" + self.get_entity_name(), | |
endpoint=self.update_entity_bulk) | |
super().add_api_route( | |
name="create_" + self.get_entity_name(), | |
path=self.create_path, methods=['post'], | |
response_model=self.result_schema, | |
operation_id="create" + self.get_entity_name(), | |
endpoint=self.create_entity) | |
super().add_api_route( | |
name="delete_" + self.get_entity_name(), | |
path=self.delete_path, methods=['delete'], | |
operation_id="delete" + self.get_entity_name(), | |
endpoint=self.delete_entity) | |
super().add_api_route( | |
name="bulk_delete_" + self.get_entity_name(), | |
path=self.list_path + '/bulk', methods=['delete'], | |
operation_id="bulk_delete" + self.get_entity_name(), | |
endpoint=self.delete_entity_bulk) | |
def get_entity_name(self): | |
return self.result_schema.__name__ | |
def get_entities_list(self, request: Request, response: Response, | |
offset: int = 0, limit: int = 100, | |
order_by: str = None, order: str = "asc", | |
): | |
# Extract model fields from query | |
search_fields = dict() | |
for param in request.query_params: | |
if param in self.result_schema.schema()['properties']: | |
search_fields[param] = request.query_params[param] | |
count = self.crud.count(**search_fields) | |
response.headers["x-total-count"] = str(count) | |
response.headers["access-control-expose-headers"] = "x-total-count" | |
result = self.crud.list(offset=offset, limit=limit, | |
order_by=order_by, | |
order=order, **search_fields) | |
return result | |
def get_entity(self, **kwargs): | |
try: | |
result = self.crud.read(self.search_schema(**kwargs)) | |
except ValidationError as ve: | |
raise HTTPException(status_code=422, | |
detail=f"Invalid values: {ve}") from ve | |
except NoResultFound as nrf: | |
raise HTTPException(status_code=404, | |
detail="Entry is not found") from nrf | |
return result | |
def update_entity(self, update_values: update_schema, **kwargs): | |
try: | |
validated = self.search_schema(**kwargs) | |
search_object = self.search_schema(**validated.dict()) | |
return self.crud.update(search_object, update_values) | |
except ValidationError as ve: | |
raise HTTPException(status_code=422, | |
detail=f"Invalid values: {ve}") from ve | |
def update_entity_bulk(self, bulk_request_body: BulkUpdateValues): | |
try: | |
item_list = [self.search_schema.parse_obj(o) | |
for o in bulk_request_body.entities] | |
update_values = self.update_schema.parse_obj( | |
bulk_request_body.update) | |
return self.crud.update_bulk(item_list, update_values) | |
except ValidationError as ve: | |
raise HTTPException(status_code=422, | |
detail=f"Invalid values: {ve}") from ve | |
def delete_entity(self, **kwargs): | |
try: | |
validated = self.search_schema(**kwargs) | |
search_object = self.search_schema(**validated.dict()) | |
self.crud.delete(search_object) | |
except ValidationError as ve: | |
raise HTTPException(status_code=422, | |
detail=f"Invalid values: {ve}") from ve | |
except NoResultFound as nrf: | |
raise HTTPException(status_code=404, | |
detail="Entry is not found") from nrf | |
return {"detail": "ok"} | |
def delete_entity_bulk(self, item_list: List): | |
try: | |
validated = [self.search_schema(**i) for i in item_list] | |
self.crud.delete_bulk(validated) | |
except ValidationError as ve: | |
raise HTTPException(status_code=422, | |
detail=f"Invalid values: {ve}") from ve | |
return {"detail": "ok"} | |
def create_entity(self, json_body: Union[result_schema]): | |
try: | |
return self.crud.create(**json_body.dict()) | |
except IntegrityError as e: | |
raise HTTPException( | |
status_code=422, | |
detail="Unique or another DB constraint failed, either" | |
" record for Provider and Provider Service" | |
" already exists or such combination" | |
" of Provider and Provider Service is not acceptable: " | |
"" + str(e)) from e | |
def _model_to_schema(self, model_object: BaseModel): | |
keys = self.crud.table_columns() | |
values = {c: getattr(model_object, c) for c in keys} | |
return self.result_schema(**values) | |
def _model_to_dict(self, model_object: BaseModel): | |
return dict(self._model_to_schema(model_object)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This code is incomplete and will not work without concrete crud and few other packages, but it gives and understanding on how to reuse routes