Last active
August 20, 2020 13:14
-
-
Save fredkingham/2f6e45bbce34ba7ab8db4124c1e4300b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
from django.utils import timezone | |
from opal.models import Patient, Episode | |
from opal.core.serialization import _temporal_thing_to_string | |
from plugins.imaging.models import Imaging | |
from plugins.labtests.models import Observation | |
from plugins.covid.episode_categories import CovidEpisode | |
from plugins.covid.models import CovidFollowUpCall | |
from plugins.covid.lab import COVID_19_TESTS | |
import datetime | |
PATIENT_CSV = "./patients.csv" | |
IMAGING_CSV = "./imaging.csv" | |
LAB_TEST_CSV = "./lab_test.csv" | |
class EmptyModel(object): | |
""" | |
Admissions and follow ups don't necessarily exist | |
this just means if they don't use this instead | |
and all the attributes will be None | |
""" | |
def __getattribute__(self, field): | |
return None | |
def __bool__(self): | |
return False | |
def patient_to_row(patient): | |
demographics = patient.demographics_set.all()[0] | |
contact_information = patient.contactinformation_set.all()[0] | |
episode = patient.episode_set.filter( | |
category_name=CovidEpisode.display_name | |
).first() | |
covid_comorbidities = EmptyModel() | |
admission = EmptyModel() | |
follow_up = EmptyModel() | |
covid_patient = EmptyModel() | |
if episode: | |
covid_comorbidities = episode.covidcomorbidities_set.all()[0] | |
if episode.covidadmission_set.all().exists(): | |
# people seem to have multiple episodes | |
# choose one at random | |
# | |
admission = episode.covidadmission_set.first() | |
if episode.covidfollowupcall_set.all().exists(): | |
# people seem to have multiple follow ups, this probably should not | |
# happen. Take the first follow up where they are able to | |
# complete | |
follow_up = episode.covidfollowupcall_set.exclude( | |
follow_up_outcome__in=[ | |
CovidFollowUpCall.UNREACHABLE, | |
CovidFollowUpCall.UNABLE_TO_COMPLETE | |
] | |
).first() | |
if not follow_up: | |
follow_up = episode.covidfollowupcall_set.last() | |
if patient.covid_patient.exists(): | |
covid_patient = patient.covid_patient.get() | |
episode_data = { | |
"elcid_patient_id": patient.id, | |
"date_of_birth": _temporal_thing_to_string(demographics.date_of_birth), | |
"cerner_ethnicity": demographics.ethnicity, | |
"follow_up_ethnicity": follow_up.ethnicity, | |
"follow_up_ethnicity_code": follow_up.ethnicity_code, | |
"weight": follow_up.weight, | |
"height": follow_up.height, | |
# "postcode": contact_information.postcode, | |
"duration_of_symptoms": admission.duration_of_symptoms, | |
"admission": bool(admission.date_of_admission), | |
"date_of_admission": _temporal_thing_to_string(admission.date_of_admission), | |
"site": get_site(get_encounter(patient)), | |
"admission_clinical_frailty": admission.clinical_frailty, | |
"hypertension": covid_comorbidities.hypertension, | |
"ace_inhibitor": covid_comorbidities.ace_inhibitor, | |
"angiotension_blocker": covid_comorbidities.angiotension_blocker, | |
"nsaid": covid_comorbidities.nsaid, | |
"ihd": covid_comorbidities.ihd, | |
"heart_failure": covid_comorbidities.heart_failure, | |
"arrhythmia": covid_comorbidities.arrhythmia, | |
"cerebrovascular_disease": covid_comorbidities.cerebrovascular_disease, | |
"type_1_diabetes": covid_comorbidities.type_1_diabetes, | |
"type_2_diabetes": covid_comorbidities.type_2_diabetes, | |
"copd": covid_comorbidities.copd, | |
"asthma": covid_comorbidities.asthma, | |
"ild": covid_comorbidities.ild, | |
"other_lung_condition": covid_comorbidities.other_lung_condition, | |
"ckd": covid_comorbidities.ckd, | |
"active_malignancy": covid_comorbidities.active_malignancy, | |
"active_malignancy_on_immunosuppression": covid_comorbidities.active_malignancy_on_immunosuppression, | |
"hiv": covid_comorbidities.hiv, | |
"autoimmunne_with_immunosuppression": covid_comorbidities.autoimmunne_with_immunosuppression, | |
"autoimmunne_without_immunosuppression": covid_comorbidities.autoimmunne_without_immunosuppression, | |
"gord": covid_comorbidities.gord, | |
"depression": covid_comorbidities.depression, | |
"anxiety": covid_comorbidities.anxiety, | |
"other_mental_health": covid_comorbidities.other_mental_health, | |
"obesity": covid_comorbidities.obesity, | |
"dyslipidaemia": covid_comorbidities.dyslipidaemia, | |
"anaemia": covid_comorbidities.anaemia, | |
"covid_pos_neg": bool(covid_patient), | |
"covid_date_first_post": _temporal_thing_to_string(covid_patient.date_first_positive), | |
"systolic_blood_pressure": admission.systolic_bp, | |
"diastolic_blood_pressure": admission.diastolic_bp, | |
"heart_rate": admission.heart_rate, | |
"respiratory_rate": admission.respiratory_rate, | |
"admission_sao2": admission.sao2, | |
"admission_fio2": admission.fi02, | |
"final_spo2": admission.final_spo2, | |
"final_fio2": admission.final_fio2, | |
"max_resp_support": admission.maximum_resp_support, | |
"max_fio2_non_nc": admission.max_fio2_non_nc, | |
"max_fio2_nc": admission.max_fio2_nc, | |
"days_on_cpap": admission.days_on_cpap, | |
"days_on_niv": admission.days_on_niv, | |
"days_on_iv": admission.days_on_iv, | |
"days_on_ippv": admission.days_on_ippv, | |
"days_on_oxygen": admission.days_on_oxygen, | |
"temperature": admission.temperature, | |
"news2": admission.news2, | |
"dead": demographics.death_indicator, | |
"date_of_discharge": _temporal_thing_to_string(admission.date_of_discharge), | |
"date_of_death": _temporal_thing_to_string(demographics.date_of_death) | |
} | |
for k, v in episode_data.items(): | |
if isinstance(v, bool): | |
episode_data[k] = int(v) | |
return episode_data | |
def observation_to_row(observation): | |
return { | |
"elcid_observation_id": observation.id, | |
"elcid_patient_id": observation.test.patient_id, | |
"test_name": observation.test.test_name, | |
"datetime_ordered": _temporal_thing_to_string(observation.test.datetime_ordered), | |
"observation_name": observation.observation_name, | |
"reference_range": observation.reference_range, | |
"observation_value": observation.observation_value, | |
"observation_unit": observation.units | |
} | |
def imaging_to_row(imaging): | |
covid_report = EmptyModel() | |
covid_reports = imaging.covidreportcode_set.all() | |
if covid_reports: | |
covid_report = covid_reports[0] | |
return { | |
"elcid_imaging_id": imaging.id, | |
"elcid_patient_id": imaging.patient_id, | |
"date_reported": _temporal_thing_to_string(imaging.date_reported), | |
"name": imaging.result_description, | |
"report": imaging.obx_result, | |
"covid_code": covid_report.covid_code | |
} | |
def get_imaging_qs(patient_qs): | |
return Imaging.objects.filter( | |
patient__in=patient_qs | |
).prefetch_related('covidreportcode_set') | |
def get_site(encounter): | |
if not encounter: | |
return | |
mappings = { | |
"RFH": "Royal Free Hospital", | |
"BH": "Barnett Hospital", | |
"CFH": "Chase farm hospital", | |
"New CFH": "Chase farm hospital", | |
"ED": "Edgware Community Hospital", | |
"FM": "Finchley Memorial Hospital", | |
"WH": "Whittington Hospital" | |
} | |
return mappings.get( | |
encounter.pv1_3_building, encounter.pv1_3_hospital | |
) | |
def date_to_datetime(date, hours=0, minutes=0): | |
return timezone.make_aware( | |
datetime.datetime( | |
date.year, date.month, date.day, hours, minutes | |
) | |
) | |
def get_encounter(patient): | |
from_covid_positive = get_encounter_from_covid_positive(patient) | |
if from_covid_positive: | |
return from_covid_positive | |
return get_encounter_from_admission_date(patient) | |
def get_encounter_from_covid_positive(patient): | |
""" | |
If they have a covid positive during an inpatient appointment | |
return that. | |
Else if they are positive on an emergency appointment return that. | |
""" | |
covid_patient = patient.covid_patient.first() | |
if not covid_patient: | |
return | |
first_positive = covid_patient.date_first_positive | |
inpatient_encounters = patient.encounters.filter( | |
pv1_18_visit_patient_type_product="INPATIENT" | |
) | |
for i in inpatient_encounters: | |
if i.pv1_44_admit_date_time and i.pv1_45_discharge_date_time: | |
admit_date = i.pv1_44_admit_date_time.date() | |
discharge_date = i.pv1_45_discharge_date_time.date() | |
if admit_date <= first_positive: | |
if discharge_date >= first_positive: | |
return i | |
emergency_encounters = patient.encounters.filter( | |
pv1_18_visit_patient_type_product="EMERGENCY" | |
) | |
for encounter in emergency_encounters: | |
if encounter.pv1_44_admit_date_time: | |
encounter_date = encounter.pv1_44_admit_date_time.date() | |
if encounter_date == first_positive: | |
return encounter | |
def get_encounter_from_admission_date(patient): | |
""" | |
The users manually enter a datetime, this is often a bit | |
out, so return the encounter that we have to the | |
one the users entered. | |
A patient often has lots of admissions. We do as best we can. | |
If they don't have an appointment on the date entered | |
we look at at an expanded date range that broadens 1 day at a time to | |
a maximum of 5. | |
For a given expanded date range. | |
We take the first admission date after what they entered failing this | |
we take this we take the last admission date before what they entered. | |
""" | |
episode = patient.episode_set.filter( | |
category_name=CovidEpisode.display_name | |
).first() | |
if not episode: | |
return | |
covid_admission = episode.covidadmission_set.first() | |
if not covid_admission or not covid_admission.date_of_admission: | |
return | |
encounters = patient.encounters.filter( | |
pv1_18_visit_patient_type_product__in=[ | |
"INPATIENT", "EMERGENCY" | |
] | |
) | |
if not encounters: | |
return | |
for i in range(5): | |
date_of_admission = covid_admission.date_of_admission | |
datetime_of_admission = date_to_datetime(date_of_admission, 0, 0) | |
min_date = date_of_admission - datetime.timedelta(i) | |
min_datetime = date_to_datetime(min_date, 0, 0) | |
max_date = date_of_admission + datetime.timedelta(i + 1) | |
max_datetime = date_to_datetime(max_date, 23, 58) | |
admissions = encounters.filter( | |
pv1_44_admit_date_time__gte=datetime_of_admission, | |
).filter( | |
pv1_44_admit_date_time__lte=max_datetime | |
).order_by('pv1_44_admit_date_time') | |
if admissions.exists(): | |
return admissions.first() | |
admissions = encounters.filter( | |
pv1_44_admit_date_time__gte=min_datetime, | |
).filter( | |
pv1_44_admit_date_time__lte=datetime_of_admission | |
).order_by('pv1_44_admit_date_time') | |
if admissions.exists(): | |
return admissions.last() | |
return | |
def write_observations(patient_qs): | |
test_to_observations = [ | |
("FULL BLOOD COUNT", ["Lymphocytes", "Neutrophils", "Platelets"],), | |
("C REACTIVE PROTEIN", ["C Reactive Protein"]), | |
("IRON STUDIES (FER)", ["Ferritin"]), | |
("IRON STUDIES", ["Ferritin"]), | |
("D-DIMER", ["D-DIMER"]), | |
("CARDIAC TROPONIN T", ["Cardiac Troponin T"]), | |
("NT PRO-BNP", ["NT Pro-BNP"]), | |
("LIVER PROFILE", ["ALT", "AST", "Alkaline Phosphatase"]), | |
("UREA AND ELECTROLYTES", ["Creatinine"]), | |
("LACTATE DEHYDROGENASE", ["LD"]), | |
("LACTATE [WHOLE BLOOD]", ["Lactate [Whole Blood]"]), | |
] | |
for test in COVID_19_TESTS: | |
test_to_observations.append( | |
(test.TEST_NAME, [test.OBSERVATION_NAME],) | |
) | |
headers = observation_to_row(Observation.objects.first()).keys() | |
with open(LAB_TEST_CSV, "w") as lab_test_file: | |
lab_test_csv = csv.DictWriter(lab_test_file, headers) | |
lab_test_csv.writeheader() | |
for test_name, observation_names in test_to_observations: | |
for observation_name in observation_names: | |
observation_qs = Observation.objects.filter( | |
test__test_name=test_name | |
).filter( | |
observation_name=observation_name | |
).filter( | |
test__patient__in=patient_qs | |
).select_related('test') | |
for observation in observation_qs: | |
lab_test_csv.writerow( | |
observation_to_row(observation) | |
) | |
def make_dataset(patient_qs): | |
if patient_qs.exists(): | |
with open(PATIENT_CSV, "w") as patient_file: | |
patient_csv = csv.DictWriter( | |
patient_file, | |
patient_to_row(patient_qs[0]).keys() | |
) | |
patient_csv.writeheader() | |
for patient in patient_qs: | |
patient_csv.writerow( | |
patient_to_row(patient) | |
) | |
imaging_qs = get_imaging_qs(patient_qs) | |
if imaging_qs.exists(): | |
with open(IMAGING_CSV, "w") as imaging_file: | |
imaging_csv = csv.DictWriter( | |
imaging_file, imaging_to_row(imaging_qs[0]).keys() | |
) | |
imaging_csv.writeheader() | |
for imaging in imaging_qs: | |
imaging_csv.writerow(imaging_to_row(imaging)) | |
write_observations(patient_qs) | |
def get_qs(): | |
covid_patients = list(Patient.objects.exclude( | |
covid_patient=None | |
).values_list('id', flat=True)) | |
admitted_patents = list(Patient.objects.exclude( | |
episode__covidadmission=None | |
).values_list('id', flat=True)) | |
return Patient.objects.filter( | |
id__in=set(covid_patients + admitted_patents) | |
) | |
def all_covid_dataset(): | |
make_dataset(get_qs()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment