Created
May 20, 2022 14:27
-
-
Save fredkingham/0ea23080ec5547c63572d4f912fce388 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
import datetime | |
from opal.core import subrecords | |
from django.db.models import Count, Max | |
from opal.models import Patient | |
def subrecords_to_extract(subrecords): | |
result = [] | |
for subrecord in subrecords: | |
if not getattr(subrecord, "_exclude_from_extract", None): | |
result.append(subrecord) | |
return result | |
def get_csv_field_name(subrecord_class, field_name, idx, prefix=None): | |
display_name = subrecord_class.get_display_name().lower().replace(" ", "_") | |
field_name = subrecord_class._get_field_title(field_name).lower().replace(" ", "_") | |
if getattr(subrecord_class, '_is_singleton', False): | |
csv_field_name = f"{display_name}_{field_name}" | |
else: | |
csv_field_name = f"{display_name}_{idx + 1}_{field_name}" | |
if prefix: | |
csv_field_name = f"{prefix}_{csv_field_name}" | |
return csv_field_name | |
def row_for_subrecords(subrecords, prefix): | |
subrecord_class = subrecords[0].__class__ | |
fields = subrecord_class._get_fieldnames_to_extract() | |
fields = [ | |
i for i in fields if i not in ( | |
'id', 'episode_id', 'patient_id', 'consistency_token' | |
) | |
] | |
result = {} | |
for idx, subrecord in enumerate(subrecords): | |
for field_name in fields: | |
csv_field_name = get_csv_field_name( | |
subrecord_class, field_name, idx, prefix | |
) | |
result[csv_field_name] = getattr(subrecord, field_name) | |
return result | |
def get_data(patient_qs, episode_count): | |
rows = [] | |
for patient in patient_qs: | |
row = {"patient_id": patient.id} | |
for subrecord_cls in subrecords_to_extract(subrecords.patient_subrecords()): | |
if getattr(subrecord_cls, "_exclude_from_extract", None): | |
continue | |
sub_name = subrecord_cls.__name__.lower() | |
patient_subrecords = list(getattr(patient, f"{sub_name}_set").all()) | |
if patient_subrecords: | |
row.update( | |
row_for_subrecords( | |
patient_subrecords, | |
prefix=None | |
) | |
) | |
for idx, episode in enumerate(patient.episode_set.all()): | |
for episode_subrecord_cls in subrecords_to_extract(subrecords.episode_subrecords()): | |
episode_sub_name = episode_subrecord_cls.__name__.lower() | |
if episode_count > 1: | |
prefix = f"episode_{idx}" | |
else: | |
prefix = "" | |
episode_subrecords = list(getattr(episode, f"{episode_sub_name}_set").all()) | |
if episode_subrecords: | |
row.update(row_for_subrecords(episode_subrecords, prefix=prefix)) | |
rows.append(row) | |
return rows | |
def field_ordering(x): | |
if x == "patient_id": | |
return f"0_{x}" | |
if x == 'demographics_first_name': | |
return "1" | |
if x == 'demographics_surname': | |
return "2" | |
if x == 'demographics_date_of_birth': | |
return "3" | |
if x.startswith('demographics_'): | |
return f"4_{x}" | |
return f"5_{x}" | |
def get_csv_field_names(data): | |
field_names = set() | |
for row in data: | |
for key in row.keys(): | |
field_names.add(key) | |
field_names = sorted(list(field_names), key=field_ordering) | |
return field_names | |
def write_csv(data): | |
field_names = get_csv_field_names(data) | |
today = datetime.date.today() | |
file_name = f'extract_{today.day}_{today.month}_{today.year}.csv' | |
with open(file_name, 'w') as f: | |
writer = csv.DictWriter(f, fieldnames=field_names) | |
writer.writeheader() | |
row = {} | |
for d in data: | |
for field_name in field_names: | |
row[field_name] = d.get(field_name, '') | |
writer.writerow(row) | |
print(f'written to {file_name}') | |
def write_flat_patient_extract(patient_qs): | |
episode_count = Patient.objects.annotate( | |
count=Count('episode') | |
).aggregate(max_count=Max('count'))['max_count'] | |
episode_count = 1 | |
for subrecord_cls in subrecords_to_extract(subrecords.patient_subrecords()): | |
sub_name = subrecord_cls.__name__.lower() | |
patient_qs = patient_qs.prefetch_related( | |
f'{sub_name}_set' | |
) | |
data = get_data(patient_qs, episode_count) | |
write_csv(data) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment