Last active
January 29, 2024 15:41
-
-
Save TobeTek/df2e9783a64e431c228c513441eaa8df to your computer and use it in GitHub Desktop.
Querying Django's JSON Field
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from django.db import models | |
from django.utils.translation import gettext_lazy as _ | |
class Person(models.Model): | |
""" | |
Store data about people in a school | |
""" | |
name = models.CharField(max_length=100) | |
is_student = models.BooleanField(default=True) | |
details = models.JSONField( | |
default=dict, | |
help_text=_( | |
"For students, we store their class, courses and scores." | |
"For teachers, we store their qualifications and classes" | |
), | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from django.test import TestCase | |
from django.db import models | |
from mm.models import Person | |
from mm.utils import gen_sql_filter_json_array | |
class JSONQueryTestCase(TestCase): | |
teachers = {} | |
students = {} | |
def _create_students(self): | |
self.students["peter"] = Person.objects.create( | |
name="Peter", | |
details={ | |
"class": "Science Major", | |
"courses": [ | |
{"name": "MATH", "grade": 60}, | |
{"name": "ENG", "grade": 80}, | |
{"name": "CHEM", "grade": 30}, | |
], | |
}, | |
) | |
self.students["matt"] = Person.objects.create( | |
name="Matt", | |
details={ | |
"class": "Art Major", | |
"courses": [ | |
{"name": "MATH", "grade": 60}, | |
{"name": "ENG", "grade": 80}, | |
{"name": "GOV", "grade": 90}, | |
], | |
}, | |
) | |
def _create_teachers(self): | |
self.teachers["alice"] = Person.objects.create( | |
name="Alice", | |
is_student=False, | |
details={ | |
"classes": ["Art", "Science"], | |
"degrees": [ | |
{"title": "BSc", "school": "Harvard", "course": "Math"}, | |
{"title": "MSc", "school": "Harvard", "course": "Applied Math"}, | |
{"title": "PhD", "school": "Bowten", "course": "Data Science"}, | |
], | |
}, | |
) | |
self.teachers["bob"] = Person.objects.create( | |
name="Bob", | |
is_student=False, | |
details={ | |
"classes": ["Science"], | |
"degrees": [ | |
{ | |
"title": "BSc", | |
"school": "Yale", | |
"course": "Chemical Engineering", | |
}, | |
{"title": "MSc", "school": "Yale", "course": "Public Health"}, | |
{"title": "PhD", "school": "Bowten", "course": "Data Science"}, | |
], | |
}, | |
) | |
def setUp(self): | |
self._create_students() | |
self._create_teachers() | |
def test_query_for_key_and_value(self): | |
# Get a student taking a particular course | |
r = Person.objects.filter(details__contains={"class": "Science Major"}).all() | |
self.assertEqual(r.count(), 1) | |
self.assertEqual(r[0], self.students["peter"]) | |
# Get a teacher taking a particular class | |
r = Person.objects.filter(details__classes__contains=["Art"]).all() | |
self.assertEqual(r.count(), 1) | |
self.assertEqual(r[0], self.teachers["alice"]) | |
def test_query_for_particular_key(self): | |
# Get all students. They always have the courses and class keys | |
r = Person.objects.filter(details__has_keys=["courses", "class"]).all() | |
self.assertEqual(r.count(), 2) | |
self.assertSetEqual(set(r), set(self.students.values())) | |
## Inexact Queries | |
def test_query_inexact_value(self): | |
from django.db.models.functions import Cast | |
from django.db.models import F | |
# Search for student taking in a class | |
r = Person.objects.annotate( | |
student_class=Cast(F("details__class"), models.TextField()) | |
).filter(student_class__icontains="Science") | |
self.assertEqual(r.count(), 1) | |
self.assertEqual(r[0], self.students["peter"]) | |
# We have to go into a bit of SQL | |
query = gen_sql_filter_json_array( | |
model=Person, | |
lookup_path="details->'courses'", | |
nested_key="name", | |
lookup_value="che", | |
) | |
q = Person.objects.filter(id__in=query) | |
print(str(q.query)) | |
print(q.values()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from django.db import models | |
from functools import partial | |
from django.db.models.expressions import RawSQL | |
def gen_sql_filter_json_array( | |
model: models.Model, lookup_path: str, nested_key: str, lookup_value: str | |
) -> RawSQL: | |
""" | |
Filter a queryset on a nested JSON key in an array field | |
:param models.Model model: Your Django model to filter on | |
:param str lookup_path: The lookup path of the array field/key in Postgres format e.g `data->"sub-key1"->"sub-key2"` | |
:param str nested_key: The name of the nested key to filter on | |
:param str lookup_value: The value to match/filter the queryset on | |
""" | |
table_name = model._meta.db_table | |
search_string = f"%{lookup_value}%" | |
query = ( | |
f"""SELECT {table_name}.id FROM jsonb_to_recordset({lookup_path}) """ | |
+ f"""AS temp_filter_table({nested_key} text) """ | |
+ f"""WHERE {nested_key} ILIKE """ | |
+ "%s" | |
) | |
return RawSQL(sql=query, params=[search_string]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment