|
from typing import ( |
|
Any, Literal, |
|
) |
|
from venv import create |
|
|
|
import fastapi |
|
import pydantic.v1.typing |
|
from fastapi._compat import ModelField |
|
from fastapi.types import ModelNameMap |
|
from pydantic import BaseConfig, PydanticSchemaGenerationError |
|
from pydantic._internal import _typing_extra |
|
from pydantic.fields import FieldInfo as V2FieldInfo |
|
from pydantic.v1 import BaseConfig as V1BaseConfig, Extra as V1Extra |
|
from pydantic.v1.fields import FieldInfo as V1FieldInfo |
|
from pydantic.v1.fields import Undefined as V1Undefined |
|
from pydantic.v1 import BaseModel as V1BaseModel |
|
from pydantic.v1.fields import UndefinedType as V1UndefinedType |
|
from pydantic_core import PydanticUndefined |
|
from pydantic import BaseModel as V2BaseModel |
|
|
|
from ._v1_compat import lenient_issubclass as v1_lenient_issubclass |
|
from ._v1_compat import ModelField as FastApiPV1ModelField |
|
from ._v1_compat import with_info_plain_validator_function as v1_with_info_plain_validator_function |
|
from ._v1_compat import _model_rebuild as v1__model_rebuild |
|
from ._v1_compat import get_annotation_from_field_info as v1_get_annotation_from_field_info |
|
from ._v1_compat import _normalize_errors as v1__normalize_errors |
|
from ._v1_compat import get_model_definitions as v1_get_model_definitions |
|
from ._v1_compat import _model_dump as v1__model_dump |
|
from ._v1_compat import _get_model_config as v1__get_model_config |
|
from ._v1_compat import get_schema_from_model_field as v1_get_schema_from_model_field |
|
from ._v1_compat import get_compat_model_name_map as v1_get_compat_model_name_map |
|
from ._v1_compat import get_definitions as v1_get_definitions |
|
from ._v1_compat import is_scalar_field as v1_is_scalar_field |
|
from ._v1_compat import is_sequence_field as v1_is_sequence_field |
|
from ._v1_compat import is_scalar_sequence_field as v1_is_scalar_sequence_field |
|
from ._v1_compat import is_bytes_field as v1_is_bytes_field |
|
from ._v1_compat import is_bytes_sequence_field as v1_is_bytes_sequence_field |
|
from ._v1_compat import copy_field_info as v1_copy_field_info |
|
from ._v1_compat import serialize_sequence_value as v1_serialize_sequence_value |
|
from ._v1_compat import get_missing_field_error as v1_get_missing_field_error |
|
from ._v1_compat import create_body_model as v1_create_body_model |
|
from ._v1_compat import get_model_fields as v1_get_model_fields |
|
|
|
from ._v2_compat import lenient_issubclass as v2_lenient_issubclass |
|
from ._v2_compat import ModelField as FastApiPV2ModelField |
|
from ._v2_compat import with_info_plain_validator_function as v2_with_info_plain_validator_function |
|
from ._v2_compat import get_annotation_from_field_info as v2_get_annotation_from_field_info |
|
from ._v2_compat import _normalize_errors as v2__normalize_errors |
|
from ._v2_compat import _model_rebuild as v2__model_rebuild |
|
from ._v2_compat import _model_dump as v2__model_dump |
|
from ._v2_compat import _get_model_config as v2__get_model_config |
|
from ._v2_compat import get_schema_from_model_field as v2_get_schema_from_model_field |
|
from ._v2_compat import get_compat_model_name_map as v2_get_compat_model_name_map |
|
from ._v2_compat import get_definitions as v2_get_definitions |
|
from ._v2_compat import is_scalar_field as v2_is_scalar_field |
|
from ._v2_compat import is_sequence_field as v2_is_sequence_field |
|
from ._v2_compat import is_scalar_sequence_field as v2_is_scalar_sequence_field |
|
from ._v2_compat import is_bytes_field as v2_is_bytes_field |
|
from ._v2_compat import is_bytes_sequence_field as v2_is_bytes_sequence_field |
|
from ._v2_compat import copy_field_info as v2_copy_field_info |
|
from ._v2_compat import serialize_sequence_value as v2_serialize_sequence_value |
|
from ._v2_compat import get_missing_field_error as v2_get_missing_field_error |
|
from ._v2_compat import create_body_model as v2_create_body_model |
|
from ._v2_compat import get_model_fields as v2_get_model_fields |
|
|
|
|
|
# ORIGINAL_PYDANTIC_V2 = fastapi._compat.PYDANTIC_V2 |
|
|
|
|
|
class PydanticV2DynamicCheck: |
|
def __bool__(self): |
|
return False |
|
|
|
|
|
USING_PYDANTIC_V2 = False |
|
|
|
def lenient_issubclass(cls: Any, class_or_tuple: Any) -> bool: |
|
# Modified to make both work at the same time |
|
# in fastapi.dependencies.utils line 508 there's an assertion that incorrectly assumes |
|
# stuff about Pydantic v1. It makes a comparison to a field.type_ and a BaseModel, which |
|
# won't work when field.type_ is a V1 model, as pydantic.BaseModel will now be v2. |
|
# |
|
# attempt to catch that case and swap it around with the v1 basemodel comparison |
|
type_is_v1_basemodel = issubclass(cls, V1BaseModel) and class_or_tuple is V2BaseModel |
|
try: |
|
return isinstance(cls, type) and (issubclass(cls, class_or_tuple) or type_is_v1_basemodel) |
|
except TypeError: |
|
if isinstance(cls, _typing_extra.WithArgsTypes): |
|
return False |
|
if isinstance(cls, pydantic.v1.typing.WithArgsTypes): |
|
return False |
|
raise # pragma: no cover |
|
|
|
def with_info_plain_validator_function(*args, **kwargs) -> Any: |
|
if USING_PYDANTIC_V2: |
|
return v2_with_info_plain_validator_function(*args, **kwargs) |
|
else: |
|
return v1_with_info_plain_validator_function(*args, **kwargs) |
|
|
|
def get_model_definitions(*args, **kwargs) -> Any: |
|
# Function does not exist in fastapi._compat for v2" |
|
return v1_get_model_definitions(*args, **kwargs) |
|
|
|
def get_annotation_from_field_info( |
|
annotation: Any, field_info: Any, field_name: str |
|
) -> Any: |
|
if isinstance(field_info, V2FieldInfo): |
|
return v2_get_annotation_from_field_info(annotation, field_info, field_name) |
|
else: |
|
return v1_get_annotation_from_field_info(annotation, field_info, field_name) |
|
|
|
def _normalize_errors(*args, **kwargs) -> Any: |
|
if USING_PYDANTIC_V2: |
|
return v2__normalize_errors(*args, **kwargs) |
|
else: |
|
return v1__normalize_errors(*args, **kwargs) |
|
|
|
|
|
def _model_rebuild(*args, **kwargs) -> Any: |
|
if USING_PYDANTIC_V2: |
|
return v2__model_rebuild(*args, **kwargs) |
|
else: |
|
return v1__model_rebuild(*args, **kwargs) |
|
|
|
|
|
def _model_dump(*args, **kwargs) -> Any: |
|
if USING_PYDANTIC_V2: |
|
return v2__model_dump(*args, **kwargs) |
|
else: |
|
return v1__model_dump(*args, **kwargs) |
|
|
|
|
|
def _get_model_config(*args, **kwargs) -> Any: |
|
if USING_PYDANTIC_V2: |
|
return v2__get_model_config(*args, **kwargs) |
|
else: |
|
return v1__get_model_config(*args, **kwargs) |
|
|
|
def get_schema_from_model_field(*, field: ModelField, **kwargs) -> Any: |
|
if isinstance(field, FastApiPV2ModelField): |
|
return v2_get_schema_from_model_field(field=field, **kwargs) |
|
else: |
|
return v1_get_schema_from_model_field(field=field, **kwargs) |
|
|
|
|
|
def get_compat_model_name_map(fields: list[ModelField]) -> ModelNameMap: |
|
# v2 version doesn't return any values, so only need to care about v1 fields |
|
v1_fields = [f for f in fields if isinstance(f, FastApiPV1ModelField)] |
|
if not v1_fields: |
|
return {} |
|
else: |
|
return v1_get_compat_model_name_map(v1_fields) |
|
|
|
def get_definitions(*, fields: list[ModelField], **kwargs) -> Any: |
|
v1_fields = [f for f in fields if isinstance(f, FastApiPV1ModelField)] |
|
v2_fields = [f for f in fields if isinstance(f, FastApiPV2ModelField)] |
|
|
|
v1_result = v1_get_definitions(fields=v1_fields, **kwargs) |
|
v2_result = v2_get_definitions(fields=v2_fields, **kwargs) |
|
|
|
final = v1_result |
|
final[0].update(v2_result[0]) |
|
final[1].update(v2_result[1]) |
|
|
|
return final |
|
|
|
def is_scalar_field(field: ModelField) -> bool: |
|
if isinstance(field, FastApiPV2ModelField): |
|
return v2_is_scalar_field(field=field) |
|
else: |
|
return v1_is_scalar_field(field=field) |
|
|
|
|
|
def is_sequence_field(*args, **kwargs) -> Any: |
|
if USING_PYDANTIC_V2: |
|
return v2_is_sequence_field(*args, **kwargs) |
|
else: |
|
return v1_is_sequence_field(*args, **kwargs) |
|
|
|
def is_scalar_sequence_field(*args, **kwargs) -> Any: |
|
if USING_PYDANTIC_V2: |
|
return v2_is_scalar_sequence_field(*args, **kwargs) |
|
else: |
|
return v1_is_scalar_sequence_field(*args, **kwargs) |
|
|
|
|
|
def is_bytes_field(*args, **kwargs) -> Any: |
|
if USING_PYDANTIC_V2: |
|
return v2_is_bytes_field(*args, **kwargs) |
|
else: |
|
return v1_is_bytes_field(*args, **kwargs) |
|
|
|
|
|
def is_bytes_sequence_field(*args, **kwargs) -> Any: |
|
if USING_PYDANTIC_V2: |
|
return v2_is_bytes_sequence_field(*args, **kwargs) |
|
else: |
|
return v1_is_bytes_sequence_field(*args, **kwargs) |
|
|
|
|
|
def copy_field_info(*args, **kwargs) -> Any: |
|
if USING_PYDANTIC_V2: |
|
return v2_copy_field_info(*args, **kwargs) |
|
else: |
|
return v1_copy_field_info(*args, **kwargs) |
|
|
|
|
|
def serialize_sequence_value(*args, **kwargs) -> Any: |
|
if USING_PYDANTIC_V2: |
|
return v2_serialize_sequence_value(*args, **kwargs) |
|
else: |
|
return v1_serialize_sequence_value(*args, **kwargs) |
|
|
|
|
|
def get_missing_field_error(*args, **kwargs) -> Any: |
|
if USING_PYDANTIC_V2: |
|
return v2_get_missing_field_error(*args, **kwargs) |
|
else: |
|
return v1_get_missing_field_error(*args, **kwargs) |
|
|
|
|
|
def create_body_model(*args, **kwargs) -> Any: |
|
if USING_PYDANTIC_V2: |
|
return v2_create_body_model(*args, **kwargs) |
|
else: |
|
return v1_create_body_model(*args, **kwargs) |
|
|
|
|
|
def get_model_fields(*args, **kwargs) -> Any: |
|
if USING_PYDANTIC_V2: |
|
return v2_get_model_fields(*args, **kwargs) |
|
else: |
|
return v1_get_model_fields(*args, **kwargs) |
|
|
|
|
|
_UNSET = object() |
|
|
|
|
|
class ConfigWithExtra(V1BaseConfig): |
|
extra = V1Extra.allow |
|
|
|
class PydanticV1CompatModelField: |
|
def __new__(cls, *args, **kwargs): |
|
pydantic_v1: bool = kwargs.pop("pydantic_v1", False) |
|
if pydantic_v1: |
|
if "model_config" in kwargs and not issubclass(kwargs["model_config"], V1BaseConfig): |
|
kwargs["model_config"] = ConfigWithExtra |
|
if "field_info" in kwargs and not isinstance(kwargs["field_info"], V1FieldInfo): |
|
kwargs["field_info"] = V1FieldInfo() |
|
if "mode" in kwargs: |
|
kwargs.pop("mode") |
|
res = FastApiPV1ModelField(*args, **kwargs) |
|
else: |
|
res = FastApiPV2ModelField(*args, **kwargs) |
|
|
|
return res |
|
|
|
def create_model_field( |
|
name: str, |
|
type_: Any, |
|
class_validators: dict[str, Any] = None, |
|
default: Any | None = _UNSET, |
|
required: bool | Any = _UNSET, |
|
model_config: Any = BaseConfig, |
|
field_info: Any | None = None, |
|
alias: str | None = None, |
|
mode: Literal["validation", "serialization"] = "validation", |
|
) -> Any: |
|
class_validators = class_validators or {} |
|
|
|
pydantic_v1 = issubclass(type_, pydantic.v1.BaseModel) |
|
if pydantic_v1: |
|
if default is _UNSET: |
|
default = V1Undefined |
|
if required is _UNSET: |
|
required = V1Undefined |
|
if model_config is _UNSET: |
|
model_config = V1BaseConfig |
|
field_info = field_info or V1FieldInfo() |
|
else: |
|
if default is _UNSET: |
|
default = PydanticUndefined |
|
if required is _UNSET: |
|
required = PydanticUndefined |
|
if model_config is _UNSET: |
|
model_config = BaseConfig |
|
|
|
field_info = field_info or FieldInfo( |
|
annotation=type_, default=default, alias=alias |
|
) |
|
|
|
kwargs = {"name": name, "field_info": field_info} |
|
if pydantic_v1: |
|
kwargs.update( |
|
{ |
|
"type_": type_, |
|
"class_validators": class_validators, |
|
"default": default, |
|
"required": required, |
|
"model_config": model_config, |
|
"alias": alias, |
|
} |
|
) |
|
else: |
|
kwargs.update({"mode": mode}) |
|
try: |
|
return PydanticV1CompatModelField(**kwargs, pydantic_v1=pydantic_v1) # type: ignore[arg-type] |
|
except (RuntimeError, PydanticSchemaGenerationError): |
|
raise fastapi.exceptions.FastAPIError( |
|
"Invalid args for response field! Hint: " |
|
f"check that {type_} is a valid Pydantic field type. " |
|
"If you are using a return type annotation that is not a valid Pydantic " |
|
"field (e.g. Union[Response, dict, None]) you can disable generating the " |
|
"response model from the type annotation with the path operation decorator " |
|
"parameter response_model=None. Read more: " |
|
"https://fastapi.tiangolo.com/tutorial/response-model/" |
|
) from None |
|
|
|
|
|
def patch_v2_compat(): |
|
# fastapi._compat.PYDANTIC_V2 = PydanticV2DynamicCheck() |
|
|
|
import fastapi._compat |
|
fastapi._compat.ModelField = PydanticV1CompatModelField |
|
fastapi._compat.with_info_plain_validator_function = with_info_plain_validator_function |
|
fastapi._compat._get_model_config = _get_model_config |
|
fastapi._compat.get_annotation_from_field_info = get_annotation_from_field_info |
|
fastapi._compat._normalize_errors = _normalize_errors |
|
fastapi._compat.get_model_definitions = get_model_definitions |
|
fastapi._compat.get_schema_from_model_field = get_schema_from_model_field |
|
fastapi._compat.get_compat_model_name_map = get_compat_model_name_map |
|
fastapi._compat.get_definitions = get_definitions |
|
fastapi._compat.is_scalar_field = is_scalar_field |
|
fastapi._compat.is_sequence_field = is_sequence_field |
|
fastapi._compat.is_scalar_sequence_field = is_scalar_sequence_field |
|
fastapi._compat.is_bytes_field = is_bytes_field |
|
fastapi._compat.is_bytes_sequence_field = is_bytes_sequence_field |
|
fastapi._compat.copy_field_info = copy_field_info |
|
fastapi._compat.serialize_sequence_value = serialize_sequence_value |
|
fastapi._compat.get_missing_field_error = get_missing_field_error |
|
fastapi._compat.create_body_model = create_body_model |
|
fastapi._compat.get_model_fields = get_model_fields |
|
|
|
# Now patch all the places we import it since we're actually patching fastapi._compat too late |
|
|
|
import fastapi.routing |
|
fastapi.routing.ModelField = PydanticV1CompatModelField |
|
# fastapi.routing.Undefined |
|
fastapi.routing._get_model_config = _get_model_config |
|
fastapi.routing._model_dump = _model_dump |
|
fastapi.routing._normalize_errors = _normalize_errors |
|
fastapi.routing.lenient_issubclass = lenient_issubclass |
|
fastapi.routing.create_model_field = create_model_field |
|
|
|
import fastapi.utils |
|
# fastapi.utils.PYDANTIC_V2 = PydanticV2DynamicCheck() # needs the dynamic check else we miss kwargs in initializations |
|
# fastapi.utils.BaseConfig = # TODO |
|
fastapi.utils.ModelField = PydanticV1CompatModelField |
|
# fastapi.utils.PydanticSchemaGenerationError, |
|
# fastapi.utils.Undefined, # TODO: I think this is ok? |
|
# fastapi.utils.UndefinedType, # TODO: I think this is ok? |
|
# fastapi.utils.Validator, |
|
fastapi.utils.lenient_issubclass = lenient_issubclass |
|
fastapi.utils.create_model_field = create_model_field |
|
|
|
# import fastapi.dependencies.models |
|
# fastapi.dependencies.models.ModelField = PydanticV1CompatModelField |
|
|
|
import fastapi.dependencies.utils |
|
# fastapi.dependencies.utils.PYDANTIC_V2 = PydanticV2DynamicCheck() # needs the dynamic check else we call an invalid method |
|
# fastapi.dependencies.utils.ErrorWrapper, # TODO: maybe ok? |
|
# fastapi.dependencies.utils.ModelField, # only used in type hints |
|
# fastapi.dependencies.utils.RequiredParam, # TODO: maybe ok? |
|
# fastapi.dependencies.utils.Undefined, # TODO: maybe ok? |
|
# fastapi.dependencies.utils._regenerate_error_with_loc # same in both, no need to override |
|
fastapi.dependencies.utils.copy_field_info = copy_field_info |
|
fastapi.dependencies.utils.create_body_model = create_body_model |
|
fastapi.dependencies.utils.evaluate_forwardref |
|
# fastapi.dependencies.utils.field_annotation_is_scalar # same in both, no need to override |
|
fastapi.dependencies.utils.get_annotation_from_field_info = get_annotation_from_field_info |
|
# fastapi.dependencies.utils.get_cached_model_fields # same in both, no need to override |
|
fastapi.dependencies.utils.get_missing_field_error = get_missing_field_error |
|
fastapi.dependencies.utils.is_bytes_field = is_bytes_field |
|
fastapi.dependencies.utils.is_bytes_sequence_field = is_bytes_sequence_field |
|
fastapi.dependencies.utils.is_scalar_field = is_scalar_field |
|
fastapi.dependencies.utils.is_scalar_sequence_field = is_scalar_sequence_field |
|
fastapi.dependencies.utils.is_sequence_field = is_sequence_field |
|
# fastapi.dependencies.utils.is_uploadfile_or_nonable_uploadfile_annotation # same in both, no need to override |
|
# fastapi.dependencies.utils.is_uploadfile_sequence_annotation # same in both, no need to override |
|
fastapi.dependencies.utils.lenient_issubclass = lenient_issubclass |
|
# fastapi.dependencies.utils.sequence_types # same in both, no need to override |
|
fastapi.dependencies.utils.serialize_sequence_value = serialize_sequence_value |
|
# fastapi.dependencies.utils.value_is_sequence # same in both, no need to override |
|
fastapi.dependencies.utils.create_model_field = create_model_field |
|
|
|
|
|
import fastapi.openapi.models |
|
# fastapi.openapi.models.PYDANTIC_V2, # TODO |
|
# fastapi.openapi.models.CoreSchema, # only used in type hints |
|
# fastapi.openapi.models.GetJsonSchemaHandler, # only used in type hints |
|
# fastapi.openapi.models.JsonSchemaValue, # only used in type hints |
|
fastapi.openapi.models._model_rebuild = _model_rebuild |
|
fastapi.openapi.models.with_info_plain_validator_function = with_info_plain_validator_function |
|
|
|
import fastapi.openapi.utils |
|
# fastapi.openapi.utils.GenerateJsonSchema, # TODO: this might be a problem in how it's used from get_openapi as a default value |
|
# the V1 DataClass, but maybe not |
|
# fastapi.openapi.utils.JsonSchemaValue, # only used in type hints |
|
# fastapi.openapi.utils.ModelField, # only used in type hints |
|
# fastapi.openapi.utils.Undefined, # TODO: I think this is ok? |
|
fastapi.openapi.utils.get_compat_model_name_map = get_compat_model_name_map |
|
fastapi.openapi.utils.get_definitions = get_definitions |
|
fastapi.openapi.utils.get_schema_from_model_field = get_schema_from_model_field |
|
fastapi.openapi.utils.lenient_issubclass = lenient_issubclass |