Last active
June 29, 2016 13:44
-
-
Save fredkingham/a49e47b0ce9538a9cb08874468bf762c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import defaultdict | |
from opal.models import Symptom, Synonym, Tagging, Team | |
from opal.core.lookuplists import LookupList | |
from opal.core.subrecords import subrecords | |
from elcid.models import PresentingComplaint | |
from walkin.models import Symptom as WalkinSymptom | |
from django.contrib.contenttypes.models import ContentType | |
from django.db import transaction | |
from opal.utils import _itersubclasses | |
COMMIT = True | |
def get_root_symptom(some_symptom): | |
ct = ContentType.objects.get_for_model(Symptom) | |
name = some_symptom.name | |
return Symptom.objects.filter( | |
synonyms__name=name, synonyms__content_type=ct | |
).get() | |
def clean_existing_duplicates(): | |
ct = ContentType.objects.get_for_model(Symptom) | |
synonym_names = Synonym.objects.filter(content_type=ct).values_list("name", flat=True) | |
synonym_symptoms = Symptom.objects.filter(name__in=synonym_names) | |
for parent_model in [WalkinSymptom, PresentingComplaint]: | |
parent_objects = parent_model.objects.filter( | |
symptoms__in=synonym_symptoms | |
) | |
for parent_object in parent_objects: | |
for symptom in parent_object.symptoms.all(): | |
if symptom in synonym_symptoms: | |
root_symptom = get_root_symptom(symptom) | |
if COMMIT: | |
parent_object.symptoms.remove(symptom) | |
if root_symptom not in parent_object.symptoms.all(): | |
parent_object.symptoms.add(root_symptom) | |
print "we are now free to delete" | |
for synonym_symptom in synonym_symptoms: | |
print synonym_symptom | |
if COMMIT: | |
synonym_symptom.delete() | |
def setUpTest(): | |
abdo = Symptom.objects.create(name="Abdo Pain") | |
proper_abdo = Symptom.objects.filter(name="Abdominal Pain") | |
pc = PresentingComplaint.objects.last() | |
pc.symptoms.add(abdo) | |
ws = WalkinSymptom.objects.last() | |
ws.symptoms.add(abdo) | |
clean_existing_duplicates() | |
assert(not pc.symptoms.filter(id=abdo.id).exists()) | |
assert(not ws.symptoms.filter(id=abdo.id).exists()) | |
assert(pc.symptoms.filter(id=proper_abdo.id).exists()) | |
assert(ws.symptoms.filter(id=proper_abdo.id).exists()) | |
def cleanup_lookup_lists(): | |
lookup_lists = _itersubclasses(LookupList) | |
for lookup_list in lookup_lists: | |
ct = ContentType.objects.get_for_model(lookup_list) | |
synonyms = Synonym.objects.filter(content_type=ct).values_list( | |
"name", flat=True | |
) | |
duplicates = lookup_list.objects.filter(name__in=synonyms) | |
for duplicate in duplicates: | |
print "found duplicate {}".format(duplicate.name) | |
duplicate.delete() | |
def spread_by_time(episodes): | |
result = defaultdict(int) | |
for i in episodes: | |
if i.end: | |
timestamp = "{0} - {1}".format(i.end.year, i.end.month) | |
result[timestamp] += 1 | |
return result | |
@transaction.atomic() | |
def figure_out_flawed_lists(): | |
""" get patients that have an archived = True parent tag | |
this should never be correct if there's a subtag | |
""" | |
all_tags = Tagging.objects.all() | |
parent_teams = Team.objects.values_list("parent_id", flat=True).distinct() | |
parent_teams = Team.objects.filter(id__in=parent_teams) | |
all_tags = all_tags.filter(team__in=parent_teams) | |
all_tags = all_tags.filter(archived=True) | |
all_episodes = set([]) | |
locations = defaultdict(int) | |
problem_tag = defaultdict(int) | |
for all_tag in all_tags: | |
episode = all_tag.episode | |
for tag in episode.tagging_set.all(): | |
if tag.team and tag.team.parent == all_tag.team: | |
if not tag.archived: | |
tag.archived = True | |
if COMMIT: | |
tag.save() | |
all_episodes.add(episode) | |
problem_tag[tag.team.name] += 1 | |
locations[episode.location.category] += 1 | |
if episode in all_episodes: | |
all_tag.archived = False | |
if COMMIT: | |
all_tag.save() | |
return all_episodes, problem_tag, location |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment