Last active
April 17, 2021 13:45
-
-
Save fy0/4ccd3d3234b0a8fb1b5baaea436232d2 to your computer and use it in GitHub Desktop.
pydantic `patternProperties` validator demo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
MIT License | |
Copyright (c) 2021 fy | |
Permission is hereby granted, free of charge, to any person obtaining a copy | |
of this software and associated documentation files (the "Software"), to deal | |
in the Software without restriction, including without limitation the rights | |
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
copies of the Software, and to permit persons to whom the Software is | |
furnished to do so, subject to the following conditions: | |
The above copyright notice and this permission notice shall be included in all | |
copies or substantial portions of the Software. | |
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
SOFTWARE. | |
""" | |
""" | |
Schema: | |
{ | |
"type": "object", | |
"patternProperties": { | |
"^S_": { "type": "string" }, | |
"^I_": { "type": "integer" } | |
}, | |
"additionalProperties": false | |
} | |
https://opis.io/json-schema/1.x/ref-keyword.html | |
Tested on py3.8, pydantic 1.6.1, win10 | |
Execute results: | |
__root__={'S_25': 'This is a string', 'I_2': 456} | |
__root__={'S_25': '123', 'I_2': 456} | |
{'title': 'TestModel', 'patternProperties': {'^S_': {'type': 'string'}, '^I_': {'type': 'integer'}}, 'additionalProperties': False} | |
""" | |
from typing import Dict, Generic, TypeVar, Tuple, Any, Mapping, Union, Set | |
from pydantic.fields import ModelField | |
from pydantic import BaseModel, Field, ValidationError, constr | |
try: | |
from typing import get_args, get_origin | |
except ImportError: | |
from typing_extensions import get_args, get_origin | |
TKeyType = TypeVar('TKeyType', bound=str) | |
TValueType = TypeVar('TValueType') | |
class KVPair(Generic[TKeyType, TValueType]): | |
@classmethod | |
def __get_validators__(cls): | |
yield cls.validate | |
@classmethod | |
def validate(cls, v: Tuple[TKeyType, TValueType], field: ModelField): | |
f_key: ModelField = field.sub_fields[0] | |
f_value: ModelField = field.sub_fields[1] | |
val_key, err_key = f_key.validate(v[0], None, loc='key') | |
if err_key: raise ValidationError([err_key], cls) | |
val_value, err_value = f_value.validate(v[1], None, loc='value') | |
if err_value: raise ValidationError([err_value], cls) | |
return val_key, val_value | |
@classmethod | |
def __modify_schema__(cls, field_schema): | |
pass | |
TKVPair = TypeVar('TKVPair') | |
class DictWithKVPair(Generic[TKVPair]): | |
@classmethod | |
def __get_validators__(cls): | |
yield cls.validate | |
@classmethod | |
def validate(cls, v, field: ModelField): | |
if not isinstance(v, Mapping): | |
raise TypeError('mapping required') | |
generic_field: ModelField = field.sub_fields[0] | |
items = [] | |
for i in v.items(): | |
val, error = generic_field.validate(i, None, loc=f'{i[0]}') # loc=f'{field.name}.{i[0]}' | |
if error: | |
raise ValidationError([error], cls) | |
items.append(val) | |
return dict(items) | |
def kv_pair_monkey_patch(): | |
import pydantic.schema | |
_field_type_schema = pydantic.schema.field_type_schema | |
def _field_type_schema_wrap(field: ModelField, **kwargs) -> Tuple[Dict[str, Any], Dict[str, Any], Set[str]]: | |
if field.type_ == DictWithKVPair: | |
f_schema = {} | |
def solve(f: ModelField): | |
ret = [] | |
for sf in f.sub_fields: | |
o = get_origin(sf.type_) | |
if o == Union: | |
ret.extend(solve(sf)) | |
elif o is None and sf.type_ == KVPair: | |
ret.append(sf) | |
return ret | |
sub_fields = solve(field) | |
patternProperties = {} | |
for i in sub_fields: | |
i: ModelField | |
f_key: ModelField = i.sub_fields[0] | |
f_value: ModelField = i.sub_fields[1] | |
k = _field_type_schema(f_key, **kwargs)[0]['pattern'] | |
patternProperties[k] = _field_type_schema(f_value, **kwargs)[0] | |
f_schema.update({ | |
"patternProperties": patternProperties, | |
"additionalProperties": False | |
}) | |
return f_schema, {}, set() | |
return _field_type_schema(field, **kwargs) | |
pydantic.schema.field_type_schema = _field_type_schema_wrap | |
kv_pair_monkey_patch() | |
class TestModel(BaseModel): | |
__root__: DictWithKVPair[ | |
Union[ | |
KVPair[constr(regex="^S_"), str], | |
KVPair[constr(regex="^I_"), int], | |
] | |
] | |
print(TestModel.parse_obj({"S_25": "This is a string", "I_2": 456})) | |
print(TestModel.parse_obj({"S_25": 123, "I_2": 456})) | |
print(TestModel.schema()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment