Skip to content

Instantly share code, notes, and snippets.

@Bahus
Last active September 19, 2024 23:53
Show Gist options
  • Save Bahus/98a9848b1f8e2dcd986bf9f05dbf9c65 to your computer and use it in GitHub Desktop.
Save Bahus/98a9848b1f8e2dcd986bf9f05dbf9c65 to your computer and use it in GitHub Desktop.
Django JSONField with Pydantic schema support
from functools import partial
import pydantic
import logging
from django.contrib.postgres.fields import JSONField
from typing import Type, Union, Tuple
from django.core.serializers.json import DjangoJSONEncoder
logger = logging.getLogger(__name__)
def default_error_handler(obj, errors):
logger.warning(
'Can not parse stored object with schema obj=%s, errors=%s',
obj, errors
)
return obj
class FieldToPythonSetter:
"""
Forces Django to call to_python on fields when setting them.
This is useful when you want to add some custom field data postprocessing.
Should be added to field like a so:
```
def contribute_to_class(self, cls, name, *args, **kwargs):
super(JSONField, self).contribute_to_class(cls, name, *args, **kwargs)
setattr(cls, name, FieldToPythonSetter(self))
```
"""
def __init__(self, field):
self.field = field
def __get__(self, obj, cls=None):
return obj.__dict__[self.field.name]
def __set__(self, obj, value):
obj.__dict__[self.field.name] = self.field.to_python(value)
class JSONSchemedEncoder(DjangoJSONEncoder):
def __init__(
self,
*args,
schema: Union[Tuple[Type[pydantic.BaseModel]], Type[pydantic.BaseModel]],
**kwargs
):
if not isinstance(schema, tuple):
self.schemas = (schema, )
else:
self.schemas = schema
super().__init__(*args, **kwargs)
def encode(self, obj):
if not isinstance(obj, pydantic.BaseModel):
# this flow used for expressions like .filter(data__contains={})
# we don't want that {} to be parsed as schema
return super().encode(obj)
return obj.json()
class JSONSchemedDecoder:
def __init__(
self,
schema: Union[Tuple[Type[pydantic.BaseModel]], Type[pydantic.BaseModel]],
error_handler=default_error_handler,
):
if not isinstance(schema, tuple):
self.schemas = (schema, )
else:
self.schemas = schema
self.error_handler = error_handler
def decode(self, obj):
if isinstance(obj, self.schemas):
return obj
errors = []
for schema in self.schemas:
try:
return schema.parse_obj(obj)
except pydantic.ValidationError as exc:
errors.append((schema, exc.errors()))
except TypeError as exc:
errors.append((schema, str(exc)))
return self.error_handler(obj, errors)
class JSONSchemedField(JSONField):
def __init__(self, *args, schema=None, error_handler=default_error_handler, **kwargs):
super().__init__(*args, **kwargs)
self._schemas = self._populate_schemas(schema)
self.decoder = JSONSchemedDecoder(schema=self._schemas, error_handler=error_handler)
self.encoder = partial(JSONSchemedEncoder, schema=self._schemas)
def deconstruct(self):
name, path, args, kwargs = super().deconstruct()
kwargs['schema'] = self._schemas
return name, path, args, kwargs
@staticmethod
def _populate_schemas(schema) -> Tuple[Type[pydantic.BaseModel]]:
assert schema is not None, 'Schema can not be None'
if isinstance(schema, tuple):
return schema
if isinstance(schema, type) and issubclass(schema, pydantic.BaseModel):
return schema,
origin = getattr(schema, '__origin__', None)
if origin is Union:
for s in schema.__args__:
assert issubclass(s, pydantic.BaseModel)
return schema.__args__
# only pydantic.BaseModel and typing.Union are supported
raise AssertionError('Unsupported schema type: {0}'.format(type(schema)))
def to_python(self, value):
if value is None:
return None
return self.decoder.decode(value)
def contribute_to_class(self, cls, name, *args, **kwargs):
super(JSONField, self).contribute_to_class(cls, name, *args, **kwargs)
setattr(cls, name, FieldToPythonSetter(self))
@hbd
Copy link

hbd commented May 30, 2024

@abriemme great solution. Big thanks to everyone here for the ideas

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment