Skip to content

Instantly share code, notes, and snippets.

@fredkingham
Last active June 29, 2016 13:44
Show Gist options
  • Save fredkingham/a49e47b0ce9538a9cb08874468bf762c to your computer and use it in GitHub Desktop.
Save fredkingham/a49e47b0ce9538a9cb08874468bf762c to your computer and use it in GitHub Desktop.
from collections import defaultdict
from opal.models import Symptom, Synonym, Tagging, Team
from opal.core.lookuplists import LookupList
from opal.core.subrecords import subrecords
from elcid.models import PresentingComplaint
from walkin.models import Symptom as WalkinSymptom
from django.contrib.contenttypes.models import ContentType
from django.db import transaction
from opal.utils import _itersubclasses
COMMIT = True
def get_root_symptom(some_symptom):
ct = ContentType.objects.get_for_model(Symptom)
name = some_symptom.name
return Symptom.objects.filter(
synonyms__name=name, synonyms__content_type=ct
).get()
def clean_existing_duplicates():
ct = ContentType.objects.get_for_model(Symptom)
synonym_names = Synonym.objects.filter(content_type=ct).values_list("name", flat=True)
synonym_symptoms = Symptom.objects.filter(name__in=synonym_names)
for parent_model in [WalkinSymptom, PresentingComplaint]:
parent_objects = parent_model.objects.filter(
symptoms__in=synonym_symptoms
)
for parent_object in parent_objects:
for symptom in parent_object.symptoms.all():
if symptom in synonym_symptoms:
root_symptom = get_root_symptom(symptom)
if COMMIT:
parent_object.symptoms.remove(symptom)
if root_symptom not in parent_object.symptoms.all():
parent_object.symptoms.add(root_symptom)
print "we are now free to delete"
for synonym_symptom in synonym_symptoms:
print synonym_symptom
if COMMIT:
synonym_symptom.delete()
def setUpTest():
abdo = Symptom.objects.create(name="Abdo Pain")
proper_abdo = Symptom.objects.filter(name="Abdominal Pain")
pc = PresentingComplaint.objects.last()
pc.symptoms.add(abdo)
ws = WalkinSymptom.objects.last()
ws.symptoms.add(abdo)
clean_existing_duplicates()
assert(not pc.symptoms.filter(id=abdo.id).exists())
assert(not ws.symptoms.filter(id=abdo.id).exists())
assert(pc.symptoms.filter(id=proper_abdo.id).exists())
assert(ws.symptoms.filter(id=proper_abdo.id).exists())
def cleanup_lookup_lists():
lookup_lists = _itersubclasses(LookupList)
for lookup_list in lookup_lists:
ct = ContentType.objects.get_for_model(lookup_list)
synonyms = Synonym.objects.filter(content_type=ct).values_list(
"name", flat=True
)
duplicates = lookup_list.objects.filter(name__in=synonyms)
for duplicate in duplicates:
print "found duplicate {}".format(duplicate.name)
duplicate.delete()
def spread_by_time(episodes):
result = defaultdict(int)
for i in episodes:
if i.end:
timestamp = "{0} - {1}".format(i.end.year, i.end.month)
result[timestamp] += 1
return result
@transaction.atomic()
def figure_out_flawed_lists():
""" get patients that have an archived = True parent tag
this should never be correct if there's a subtag
"""
all_tags = Tagging.objects.all()
parent_teams = Team.objects.values_list("parent_id", flat=True).distinct()
parent_teams = Team.objects.filter(id__in=parent_teams)
all_tags = all_tags.filter(team__in=parent_teams)
all_tags = all_tags.filter(archived=True)
all_episodes = set([])
locations = defaultdict(int)
problem_tag = defaultdict(int)
for all_tag in all_tags:
episode = all_tag.episode
for tag in episode.tagging_set.all():
if tag.team and tag.team.parent == all_tag.team:
if not tag.archived:
tag.archived = True
if COMMIT:
tag.save()
all_episodes.add(episode)
problem_tag[tag.team.name] += 1
locations[episode.location.category] += 1
if episode in all_episodes:
all_tag.archived = False
if COMMIT:
all_tag.save()
return all_episodes, problem_tag, location
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment