Skip to content

Instantly share code, notes, and snippets.

@fredkingham
Created October 22, 2020 12:43
Show Gist options
  • Save fredkingham/66d97754cf7a363e3a9ac98fdac592d2 to your computer and use it in GitHub Desktop.
Save fredkingham/66d97754cf7a363e3a9ac98fdac592d2 to your computer and use it in GitHub Desktop.
Proposal for a new search rule system
# The problem is that...
# 1. We cannot search models such as Lab tests or appointments
# 2. We cannot remove models such as Episode that are in opal models
#
# This solution mostly solves 1, however I think we should implement a discoverable remove method
# and then the solution includes the other.
#
# This method moves the query logic onto a search rule class, patient summary generation etc would be
# elsewhere by the query backend.
#
# This solution changes queries.py and extract.py and schema.py, all javascript is left the same.
#
# My suggestion would be to implement it as a seperate backend that is opt in (from the settings.py pluggable backend)
# so the two versions can be examined independently.
#
# Do we like it as an api?
from opal.core.discoverable import DiscoverableFeature
from opal.models import PatientSubrecord, EpisodeSubrecord
from opal.core.exceptions import Error
from opal.core import subrecords
from opal.models import Episode
class SearchException(Error):
pass
MODIFIERS = {
'Greater Than': "__gt",
'Less Than': "__lt",
"Before": "__lte",
"After": "__gte",
"Contains": "__icontains"
}
class SchemaField(object):
name = None
field_type = None
_type = None
description = None
display_name = None
lookup_list = None
enum = None
def to_dict(self):
display_name = self.display_name
if not self.display_name:
display_name = self.name.replace("_", " ").title()
return {
"name": self.name,
"field_type": self.field_type,
"_type": self._type,
"description": self.description,
"title": display_name,
"enum": self.enum,
}
class SearchRule(DiscoverableFeature):
search_fields = []
extract_fields = []
exclude_from_search = False
exclude_from_extract = False
slug = None
display_name = None
model = None
def get_qs(self, episode_qs):
if isinstance(self.model, PatientSubrecord):
return self.model.objects.filter(patient__episode__in=episode_qs)
elif isinstance(self.model, EpisodeSubrecord):
return self.model.filter(episode__in=episode_qs)
if not hasattr(self, "queryset"):
raise ValueError(
'Please implement get_qs method on {}'.format(ValueError)
)
def to_episode_qs(self, our_qs):
if isinstance(self.model, PatientSubrecord):
patient_ids = our_qs.values_list('patient_id', flat=True)
return Episode.objects.filter(patient_id__in=patient_ids)
elif isinstance(self.model, EpisodeSubrecord):
episode_ids = our_qs.values_list('episode_id', flat=True)
return Episode.objects.filter(id__in=episode_ids)
raise ValueError('Please implement a to_episode_qs method')
def search(self, value, episode_qs, modifier="", user=None):
queryset = self.get_qs(episode_qs)
for field in self.search_fields:
query_method = "search_{}".format(field)
if hasattr(self, query_method):
return getattr(self, query_method)(value, queryset, modifer, user)
# default modifier for string should be __iexact
# also fk or ft logic
query_args = {"{}{}".format(field, MODIFIERS[modifier]): value}
return queryset.objects.filter(**query_args)
def extract(self, episode_qs):
queryset = self.get_qs(episode_qs)
rows = []
for instance in queryset:
row = {}
for field in self.extract_fields:
extract_method = "extract_{}".format(field)
if hasattr(self, extract_method):
row[field] = extract_method(instance, queryset)
rows.append(row)
return rows
class EpisodeSearchRule(SearchRule):
slug = Episode.get_api_name()
model = Episode
search_fields = ["start", "end"]
extract_fields = ["start", "end"]
start = SchemaField("start", "datetime", "Episode Start")
end = SchemaField("end", "datetime", "Episode End")
def get_qs(self, episode_qs):
return episode_qs
def to_episode_qs(self, our_qs):
return our_qs
for subrecord in subrecords():
class SubrecordSearchRule(SearchRule):
slug = subrecord.get_api_name()
model = subrecord
def extract_schema():
schema = []
for search_rule in SearchRule.list():
display_name = search_rule.display_name
if not display_name:
display_name = search_rule.model.display_name
fields = search_rule.fields
if not fields:
fields = search_rule.model._get_fieldnames_to_serialize()
schema_rule = {
"name": search_rule.slug,
"title": display_name,
"fields": []
}
for field in fields:
search_field = getattr(search_rule, field, None)
if search_field is not None:
schema_rule["fields"].append(search_field.to_dict())
else:
schema_rule["fields"].append(
search_rule.model.build_schema_for_field_name(field)
)
schema.append(schema_rule)
return schema
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment