Skip to content

Instantly share code, notes, and snippets.

@fredkingham
Created November 21, 2018 13:15
Show Gist options
  • Save fredkingham/62a7c39ddfff17fddeb644eb2ba72e0b to your computer and use it in GitHub Desktop.
Save fredkingham/62a7c39ddfff17fddeb644eb2ba72e0b to your computer and use it in GitHub Desktop.
import datetime
from lab import models
from opal.models import Patient
import csv
def get_demographics_dict(patient):
result = {}
demographics = patient.demographics_set.first()
result["name"] = "{} {}".format(demographics.first_name, demographics.surname)
result["hospital_number"] = demographics.hospital_number
dob = demographics.date_of_birth
if not dob:
result["date_of_birth"] = ""
else:
dtob = datetime.datetime.combine(dob, datetime.datetime.min.time())
result["date_of_birth"] = dtob.strftime('%d/%m/%Y')
result["age"] = (datetime.datetime.now() - dtob).days/365
return result
def get_lab_test_dict(patient):
result = {}
organisms = models.Organism.objects.filter(observation_type="Organism")
organisms = organisms.filter(
lab_test__patient=patient
).order_by("result").order_by("lab_test__datetime_ordered")
for i, organism in enumerate(organisms):
lab_test_number = "lab_test_{}_number".format(i)
lab_test_ordered = "lab_test_{}_ordered".format(i)
lab_test_organism = "lab_test_{}_organism".format(i)
lab_test = organism.lab_test
if "lab_number" in lab_test.extras and lab_test.extras["lab_number"]:
result[lab_test_number] = lab_test.extras["lab_number"]
else:
result[lab_test_number] = ""
if lab_test.datetime_ordered:
result[lab_test_ordered] = lab_test.datetime_ordered.strftime('%d/%m/%Y')
else:
result[lab_test_ordered] = ""
result[lab_test_organism] = organism.result
return result
observations = models.Observation.objects.filter(result__icontains="au").filter(observation_type="Organism")
def get_ward_dict(patient):
return dict(ward=patient.episode_set.last().location_set.first().ward)
def get_result_dicts(patients):
result = []
for patient in patients:
result_dict = {}
result_dict.update(get_demographics_dict(patient))
result_dict.update(get_ward_dict(patient))
result_dict.update(get_lab_test_dict(patient))
result.append(result_dict)
return result
def get_keys(results):
keys = list()
for result in results:
keys.extend(result.keys())
keys = list(set(keys))
keys = sorted(keys)
keys.remove("ward")
keys.remove("name")
keys.insert(2, "ward")
keys.insert(0, "name")
for k in keys:
if "10" in k:
keys.remove(k)
keys.append(k)
return keys
def write_file(file_name, results, keys):
with open(file_name, "w+") as e:
dict_writer = csv.DictWriter(e, fieldnames=keys)
dict_writer.writeheader()
for row in results:
dict_writer.writerow(row)
def get_patients(org_parital):
observations = observations = models.Observation.objects.filter(result__icontains=org_parital).filter(observation_type="Organism")
return Patient.objects.filter(labtest__observations__in=observations).distinct()
def get_csv(file_name, org_partial):
patients = get_patients(org_partial)
results_dicts = get_result_dicts(patients)
keys = get_keys(results_dicts)
write_file(file_name, results_dicts, keys)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment