Created
October 22, 2020 12:43
-
-
Save fredkingham/66d97754cf7a363e3a9ac98fdac592d2 to your computer and use it in GitHub Desktop.
Proposal for a new search rule system
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# The problem is that... | |
# 1. We cannot search models such as Lab tests or appointments | |
# 2. We cannot remove models such as Episode that are in opal models | |
# | |
# This solution mostly solves 1, however I think we should implement a discoverable remove method | |
# and then the solution includes the other. | |
# | |
# This method moves the query logic onto a search rule class, patient summary generation etc would be | |
# elsewhere by the query backend. | |
# | |
# This solution changes queries.py and extract.py and schema.py, all javascript is left the same. | |
# | |
# My suggestion would be to implement it as a seperate backend that is opt in (from the settings.py pluggable backend) | |
# so the two versions can be examined independently. | |
# | |
# Do we like it as an api? | |
from opal.core.discoverable import DiscoverableFeature | |
from opal.models import PatientSubrecord, EpisodeSubrecord | |
from opal.core.exceptions import Error | |
from opal.core import subrecords | |
from opal.models import Episode | |
class SearchException(Error): | |
pass | |
MODIFIERS = { | |
'Greater Than': "__gt", | |
'Less Than': "__lt", | |
"Before": "__lte", | |
"After": "__gte", | |
"Contains": "__icontains" | |
} | |
class SchemaField(object): | |
name = None | |
field_type = None | |
_type = None | |
description = None | |
display_name = None | |
lookup_list = None | |
enum = None | |
def to_dict(self): | |
display_name = self.display_name | |
if not self.display_name: | |
display_name = self.name.replace("_", " ").title() | |
return { | |
"name": self.name, | |
"field_type": self.field_type, | |
"_type": self._type, | |
"description": self.description, | |
"title": display_name, | |
"enum": self.enum, | |
} | |
class SearchRule(DiscoverableFeature): | |
search_fields = [] | |
extract_fields = [] | |
exclude_from_search = False | |
exclude_from_extract = False | |
slug = None | |
display_name = None | |
model = None | |
def get_qs(self, episode_qs): | |
if isinstance(self.model, PatientSubrecord): | |
return self.model.objects.filter(patient__episode__in=episode_qs) | |
elif isinstance(self.model, EpisodeSubrecord): | |
return self.model.filter(episode__in=episode_qs) | |
if not hasattr(self, "queryset"): | |
raise ValueError( | |
'Please implement get_qs method on {}'.format(ValueError) | |
) | |
def to_episode_qs(self, our_qs): | |
if isinstance(self.model, PatientSubrecord): | |
patient_ids = our_qs.values_list('patient_id', flat=True) | |
return Episode.objects.filter(patient_id__in=patient_ids) | |
elif isinstance(self.model, EpisodeSubrecord): | |
episode_ids = our_qs.values_list('episode_id', flat=True) | |
return Episode.objects.filter(id__in=episode_ids) | |
raise ValueError('Please implement a to_episode_qs method') | |
def search(self, value, episode_qs, modifier="", user=None): | |
queryset = self.get_qs(episode_qs) | |
for field in self.search_fields: | |
query_method = "search_{}".format(field) | |
if hasattr(self, query_method): | |
return getattr(self, query_method)(value, queryset, modifer, user) | |
# default modifier for string should be __iexact | |
# also fk or ft logic | |
query_args = {"{}{}".format(field, MODIFIERS[modifier]): value} | |
return queryset.objects.filter(**query_args) | |
def extract(self, episode_qs): | |
queryset = self.get_qs(episode_qs) | |
rows = [] | |
for instance in queryset: | |
row = {} | |
for field in self.extract_fields: | |
extract_method = "extract_{}".format(field) | |
if hasattr(self, extract_method): | |
row[field] = extract_method(instance, queryset) | |
rows.append(row) | |
return rows | |
class EpisodeSearchRule(SearchRule): | |
slug = Episode.get_api_name() | |
model = Episode | |
search_fields = ["start", "end"] | |
extract_fields = ["start", "end"] | |
start = SchemaField("start", "datetime", "Episode Start") | |
end = SchemaField("end", "datetime", "Episode End") | |
def get_qs(self, episode_qs): | |
return episode_qs | |
def to_episode_qs(self, our_qs): | |
return our_qs | |
for subrecord in subrecords(): | |
class SubrecordSearchRule(SearchRule): | |
slug = subrecord.get_api_name() | |
model = subrecord | |
def extract_schema(): | |
schema = [] | |
for search_rule in SearchRule.list(): | |
display_name = search_rule.display_name | |
if not display_name: | |
display_name = search_rule.model.display_name | |
fields = search_rule.fields | |
if not fields: | |
fields = search_rule.model._get_fieldnames_to_serialize() | |
schema_rule = { | |
"name": search_rule.slug, | |
"title": display_name, | |
"fields": [] | |
} | |
for field in fields: | |
search_field = getattr(search_rule, field, None) | |
if search_field is not None: | |
schema_rule["fields"].append(search_field.to_dict()) | |
else: | |
schema_rule["fields"].append( | |
search_rule.model.build_schema_for_field_name(field) | |
) | |
schema.append(schema_rule) | |
return schema |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment