-
-
Save Bahus/98a9848b1f8e2dcd986bf9f05dbf9c65 to your computer and use it in GitHub Desktop.
from functools import partial | |
import pydantic | |
import logging | |
from django.contrib.postgres.fields import JSONField | |
from typing import Type, Union, Tuple | |
from django.core.serializers.json import DjangoJSONEncoder | |
logger = logging.getLogger(__name__) | |
def default_error_handler(obj, errors): | |
logger.warning( | |
'Can not parse stored object with schema obj=%s, errors=%s', | |
obj, errors | |
) | |
return obj | |
class FieldToPythonSetter: | |
""" | |
Forces Django to call to_python on fields when setting them. | |
This is useful when you want to add some custom field data postprocessing. | |
Should be added to field like a so: | |
``` | |
def contribute_to_class(self, cls, name, *args, **kwargs): | |
super(JSONField, self).contribute_to_class(cls, name, *args, **kwargs) | |
setattr(cls, name, FieldToPythonSetter(self)) | |
``` | |
""" | |
def __init__(self, field): | |
self.field = field | |
def __get__(self, obj, cls=None): | |
return obj.__dict__[self.field.name] | |
def __set__(self, obj, value): | |
obj.__dict__[self.field.name] = self.field.to_python(value) | |
class JSONSchemedEncoder(DjangoJSONEncoder): | |
def __init__( | |
self, | |
*args, | |
schema: Union[Tuple[Type[pydantic.BaseModel]], Type[pydantic.BaseModel]], | |
**kwargs | |
): | |
if not isinstance(schema, tuple): | |
self.schemas = (schema, ) | |
else: | |
self.schemas = schema | |
super().__init__(*args, **kwargs) | |
def encode(self, obj): | |
if not isinstance(obj, pydantic.BaseModel): | |
# this flow used for expressions like .filter(data__contains={}) | |
# we don't want that {} to be parsed as schema | |
return super().encode(obj) | |
return obj.json() | |
class JSONSchemedDecoder: | |
def __init__( | |
self, | |
schema: Union[Tuple[Type[pydantic.BaseModel]], Type[pydantic.BaseModel]], | |
error_handler=default_error_handler, | |
): | |
if not isinstance(schema, tuple): | |
self.schemas = (schema, ) | |
else: | |
self.schemas = schema | |
self.error_handler = error_handler | |
def decode(self, obj): | |
if isinstance(obj, self.schemas): | |
return obj | |
errors = [] | |
for schema in self.schemas: | |
try: | |
return schema.parse_obj(obj) | |
except pydantic.ValidationError as exc: | |
errors.append((schema, exc.errors())) | |
except TypeError as exc: | |
errors.append((schema, str(exc))) | |
return self.error_handler(obj, errors) | |
class JSONSchemedField(JSONField): | |
def __init__(self, *args, schema=None, error_handler=default_error_handler, **kwargs): | |
super().__init__(*args, **kwargs) | |
self._schemas = self._populate_schemas(schema) | |
self.decoder = JSONSchemedDecoder(schema=self._schemas, error_handler=error_handler) | |
self.encoder = partial(JSONSchemedEncoder, schema=self._schemas) | |
def deconstruct(self): | |
name, path, args, kwargs = super().deconstruct() | |
kwargs['schema'] = self._schemas | |
return name, path, args, kwargs | |
@staticmethod | |
def _populate_schemas(schema) -> Tuple[Type[pydantic.BaseModel]]: | |
assert schema is not None, 'Schema can not be None' | |
if isinstance(schema, tuple): | |
return schema | |
if isinstance(schema, type) and issubclass(schema, pydantic.BaseModel): | |
return schema, | |
origin = getattr(schema, '__origin__', None) | |
if origin is Union: | |
for s in schema.__args__: | |
assert issubclass(s, pydantic.BaseModel) | |
return schema.__args__ | |
# only pydantic.BaseModel and typing.Union are supported | |
raise AssertionError('Unsupported schema type: {0}'.format(type(schema))) | |
def to_python(self, value): | |
if value is None: | |
return None | |
return self.decoder.decode(value) | |
def contribute_to_class(self, cls, name, *args, **kwargs): | |
super(JSONField, self).contribute_to_class(cls, name, *args, **kwargs) | |
setattr(cls, name, FieldToPythonSetter(self)) | |
@kuldeepk I'm not sure what exactly is causing your error, but when I made my implementation I noticed that I need a custom django forms field to make django admin work properly, and in the orm field I've overridden the formfield
function to return my custom forms field class.
Here is the custom forms field:
from django.contrib.postgres.forms import JSONField as JSONFormField
class PydanticFormField(JSONFormField):
def _serialize(self, value):
if isinstance(value, list):
value = [(self._serialize(v) if isinstance(v, BaseModel) else v) for v in value]
elif isinstance(value, BaseModel):
value = value.dict(exclude_unset=True)
if '__root__' in value:
value = value['__root__']
for k, v in value.items():
if isinstance(v, Decimal):
value[k] = str(v)
return value
def prepare_value(self, value):
value = self._serialize(value)
return json.dumps(value, indent=2)
def has_changed(self, initial, data):
initial = self._serialize(initial)
return super().has_changed(initial, data)
Note that this snippet also consider lists of pydantic models and have special handling for Decimal
type, you can remove these bits if you want.
@Bahus
I tried to implement the same, but while saving the model I was facing error like this -:
list_of_stages=[ABCModel(uuid=UUID('1102f63e-b7db-11eb-a149-1f45af5c49e5'), name='asdas', type='JO')] must be of type dict/list
Can anyone help me out, that how to handle .save() method while using this.
I had issue with this gist (pydantic v2), so i found a simpler approach by using getter & setter:
class PydanticModel(BaseModel):
url: str
title: str
class Item(models.Model):
_data: PydanticModel = JSONField(default=dict)
@property
def data(self) -> PydanticModel:
return PydanticModel(**self._data)
@data.setter
def data(self, value: PydanticModel):
self._data = value.model_dump(mode="json")
@abriemme great solution. Big thanks to everyone here for the ideas
Thanks for this snippet @Bahus! Super helpful.
Everything works perfectly but in Django Admin i get error as
Object of type PydanticModel is not JSON serializable
. Any way to solve this? Am I doing anything wrong?