Skip to content

Instantly share code, notes, and snippets.

@fredkingham
Created May 20, 2022 14:27
Show Gist options
  • Save fredkingham/0ea23080ec5547c63572d4f912fce388 to your computer and use it in GitHub Desktop.
Save fredkingham/0ea23080ec5547c63572d4f912fce388 to your computer and use it in GitHub Desktop.
import csv
import datetime
from opal.core import subrecords
from django.db.models import Count, Max
from opal.models import Patient
def subrecords_to_extract(subrecords):
result = []
for subrecord in subrecords:
if not getattr(subrecord, "_exclude_from_extract", None):
result.append(subrecord)
return result
def get_csv_field_name(subrecord_class, field_name, idx, prefix=None):
display_name = subrecord_class.get_display_name().lower().replace(" ", "_")
field_name = subrecord_class._get_field_title(field_name).lower().replace(" ", "_")
if getattr(subrecord_class, '_is_singleton', False):
csv_field_name = f"{display_name}_{field_name}"
else:
csv_field_name = f"{display_name}_{idx + 1}_{field_name}"
if prefix:
csv_field_name = f"{prefix}_{csv_field_name}"
return csv_field_name
def row_for_subrecords(subrecords, prefix):
subrecord_class = subrecords[0].__class__
fields = subrecord_class._get_fieldnames_to_extract()
fields = [
i for i in fields if i not in (
'id', 'episode_id', 'patient_id', 'consistency_token'
)
]
result = {}
for idx, subrecord in enumerate(subrecords):
for field_name in fields:
csv_field_name = get_csv_field_name(
subrecord_class, field_name, idx, prefix
)
result[csv_field_name] = getattr(subrecord, field_name)
return result
def get_data(patient_qs, episode_count):
rows = []
for patient in patient_qs:
row = {"patient_id": patient.id}
for subrecord_cls in subrecords_to_extract(subrecords.patient_subrecords()):
if getattr(subrecord_cls, "_exclude_from_extract", None):
continue
sub_name = subrecord_cls.__name__.lower()
patient_subrecords = list(getattr(patient, f"{sub_name}_set").all())
if patient_subrecords:
row.update(
row_for_subrecords(
patient_subrecords,
prefix=None
)
)
for idx, episode in enumerate(patient.episode_set.all()):
for episode_subrecord_cls in subrecords_to_extract(subrecords.episode_subrecords()):
episode_sub_name = episode_subrecord_cls.__name__.lower()
if episode_count > 1:
prefix = f"episode_{idx}"
else:
prefix = ""
episode_subrecords = list(getattr(episode, f"{episode_sub_name}_set").all())
if episode_subrecords:
row.update(row_for_subrecords(episode_subrecords, prefix=prefix))
rows.append(row)
return rows
def field_ordering(x):
if x == "patient_id":
return f"0_{x}"
if x == 'demographics_first_name':
return "1"
if x == 'demographics_surname':
return "2"
if x == 'demographics_date_of_birth':
return "3"
if x.startswith('demographics_'):
return f"4_{x}"
return f"5_{x}"
def get_csv_field_names(data):
field_names = set()
for row in data:
for key in row.keys():
field_names.add(key)
field_names = sorted(list(field_names), key=field_ordering)
return field_names
def write_csv(data):
field_names = get_csv_field_names(data)
today = datetime.date.today()
file_name = f'extract_{today.day}_{today.month}_{today.year}.csv'
with open(file_name, 'w') as f:
writer = csv.DictWriter(f, fieldnames=field_names)
writer.writeheader()
row = {}
for d in data:
for field_name in field_names:
row[field_name] = d.get(field_name, '')
writer.writerow(row)
print(f'written to {file_name}')
def write_flat_patient_extract(patient_qs):
episode_count = Patient.objects.annotate(
count=Count('episode')
).aggregate(max_count=Max('count'))['max_count']
episode_count = 1
for subrecord_cls in subrecords_to_extract(subrecords.patient_subrecords()):
sub_name = subrecord_cls.__name__.lower()
patient_qs = patient_qs.prefetch_related(
f'{sub_name}_set'
)
data = get_data(patient_qs, episode_count)
write_csv(data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment