Last active
April 18, 2016 16:44
-
-
Save fredkingham/1da226d800c69fb652d9726c0035a8b4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
from datetime import datetime | |
fields = [] | |
primary_key = "" | |
csv_file_1 = "some_file_name.csv" | |
csv_file_2 = "some_file_name.csv" | |
class CsvEncoder(object): | |
def __init__(self, file_name, some_fun, field_names=None): | |
self.file_name = file_name | |
self.some_fun = some_fun | |
self.field_names = field_names | |
def get_dict_reader(self, some_csv): | |
if self.field_names: | |
return csv.DictReader(some_csv, fieldnames=self.field_names) | |
else: | |
return csv.DictReader(some_csv) | |
def set_from_function(self): | |
with open(self.file_name, "r") as some_csv: | |
reader = self.get_dict_reader(some_csv) | |
result = {self.some_fun(i) for i in reader} | |
return result | |
def rows_from_calculated(self, some_calculated): | |
result = list() | |
with open(self.file_name, "r") as some_csv: | |
reader = self.get_dict_reader(some_csv) | |
for i in reader: | |
calculated = self.some_fun(i) | |
if calculated in some_calculated: | |
result.append(i) | |
return result | |
def set_not_including(self, another_csv_encoder): | |
with open(self.file_name, "r") as some_csv: | |
reader = self.get_dict_reader(some_csv) | |
existing = another_csv_encoder.set_from_function() | |
result = set() | |
with open(self.file_name, "r") as some_csv: | |
reader = self.get_dict_reader(some_csv) | |
for i in reader: | |
calculated = self.some_fun(i) | |
if calculated not in existing: | |
result.add(calculated) | |
return result | |
def get_matches(): | |
# date_of_births | |
uch_nd_data = CsvEncoder( | |
"elcid.live.pmi.data.csv", | |
date_and_name_match, | |
field_names=["hospital_number", "date_of_birth", "name"] | |
) | |
elcid_nd_data = CsvEncoder( | |
"elcid.patients.csv", | |
date_and_name_match | |
) | |
missing_name_dob = elcid_nd_data.set_not_including(uch_nd_data) | |
missing_name_dob_rows = elcid_nd_data.rows_from_calculated(missing_name_dob) | |
for i in missing_name_dob_rows: | |
html = "we think {0} - {1}({2}) at {3} has the wrong name or date of birth" | |
html = html.format(i["hospital_number"], i["name"], to_formatted_date(i["date_of_birth"]), get_link(i["episode_id"])) | |
print html | |
class CsvDiff(object): | |
def __init__( | |
self, | |
file_a, | |
file_b, | |
file_a_field_names=None, | |
file_b_field_names=None | |
): | |
self.file_a = file_a | |
self.field_names_for_file = { | |
file_a: file_a_field_names, | |
file_b: file_b_field_names | |
} | |
self.file_a_field_names = file_a_field_names | |
self.file_b = file_b | |
self.file_b_field_names = get_matches | |
def by_fun(self, some_csv_file, some_fun): | |
with open(some_csv_file, "r") as some_csv: | |
field_names = self.field_names_for_file.get(some_csv_file) | |
reader = csv.DictReader(some_csv, fieldnames=field_names) | |
result = {some_fun(row) for row in reader} | |
return result | |
def get_from_funs(self, some_csv_file, some_rows, some_fun): | |
with open(some_csv_file, "r") as some_csv: | |
result = set() | |
field_names = self.field_names_for_file.get(some_csv_file) | |
reader = csv.DictReader(some_csv, fieldnames=field_names) | |
for row in reader: | |
r = some_fun(row) | |
if r in some_rows: | |
result.add(tuple(row.itervalues())) | |
return result | |
def in_one_not_in_the_other_for_fun(self, some_fun): | |
# having a lot of memory issues so we're going the slow | |
# way round to limit the amount we're putting in memory | |
# step 1. Create set of the computed function | |
# step 2. Create a set of the ones that are found | |
# step 3. the output of step 1 - step 2 are what are missing | |
# things to note, we don't just pop out from the set | |
# because the function is not necessarily unique | |
# e.g. name, dob which would give us fale positives | |
some_file = self.file_a | |
some_other_file = self.file_b | |
other = self.by_fun(some_other_file, some_fun) | |
field_names = self.field_names_for_file.get(some_file) | |
found = set() | |
with open(some_file) as some_csv: | |
reader = csv.DictReader(some_csv, fieldnames=field_names) | |
for our_row in reader: | |
calculated = some_fun(our_row) | |
if calculated in other: | |
found.add(tuple(our_row.itervalues())) | |
return other - found | |
def in_both(self, some_fun): | |
some_file = self.file_a | |
some_other_file = self.file_b | |
other = self.by_fun(some_other_file, some_fun) | |
field_names = self.field_names_for_file.get(some_file) | |
found = set() | |
with open(some_file) as some_csv: | |
reader = csv.DictReader(some_csv, fieldnames=field_names) | |
for our_row in reader: | |
calculated = some_fun(our_row) | |
if some_fun(our_row) in other: | |
found.add(calculated) | |
return found | |
def to_date(dob): | |
if dob: | |
dob = dob.strip() | |
if not dob == "NULL": | |
try: | |
return datetime.strptime(dob[:10], "%Y-%m-%d").date() | |
except: | |
pass | |
def date_and_name_match(row): | |
return (to_date(row["date_of_birth"]), row["name"].strip(),) | |
def hospital_number_match(row): | |
return row["hospital_number"].strip() | |
# def get_matches(): | |
# csv_diff_1 = CsvDiff( | |
# "elcid.live.pmi.data.csv", | |
# "elcid.patients.csv", | |
# file_a_field_names=["hospital_number", "date_of_birth", "name"] | |
# ) | |
# # gives us all the names and dobs that can't be found | |
# missing_name_dob = csv_diff_1.in_one_not_in_the_other_for_fun(date_and_name_match) | |
# | |
# # gives us all the hospital numbers that can be found | |
# hospital_number_matches = csv_diff_1.in_both(hospital_number_match) | |
# import ipdb; ipdb.set_trace() | |
# | |
# # gives us everything that has a correct hospital number but not a matching name dob | |
# missing = missing_name_dob - hospital_number_matches | |
# wrong = missing_name_dob.intersection(hospital_number_matches) | |
# | |
# return dict( | |
# wrong=wrong, missing=missing | |
# ) | |
# | |
# # csv_diff_2 = CsvDiff( | |
# # "elcid.live.pmi.data.csv", | |
# # "elcid.patients.csv", | |
# # hospital_number_match, | |
# # file_a_field_names=["hospital_number", "date_of_birth", "name"] | |
# # ) | |
# # return ( | |
# # csv_diff_1.in_one_not_in_the_other_for_fun(), | |
# # csv_diff_2.in_one_not_in_the_other_for_fun(), | |
# # ) | |
def combined_matches(): | |
missing_hospital_numbers, wrong_names = get_matches() | |
# print "missing hospital numbers" | |
# print missing_hospital_numbers | |
# print "wrong names" | |
# print wrong_names | |
# return missing_hospital_numbers, wrong_names | |
def look_for_duplicates(some_csv_file, some_fun, field_names): | |
from collections import defaultdict | |
hospital_numbers = set() | |
result = defaultdict(list) | |
with open(some_csv_file) as some_csv: | |
reader = csv.DictReader(some_csv) | |
for row in reader: | |
if row not in result[some_fun(row)]: | |
if row["hospital_number"] not in hospital_numbers: | |
result[some_fun(row)].append(row) | |
hospital_numbers.add(row["hospital_number"]) | |
return (v for v in result.itervalues() if len(v) > 1) | |
def get_link(episode_id): | |
return '<a href="http://elcidl/#/episode/{0}">{0}</a>'.format(episode_id.strip()) | |
def html_wrapper(some_fun): | |
def wrapper(*args, **kwargs): | |
return "<html><head></head><body>{}</body></html>".format(some_fun(*args, **kwargs)) | |
return wrapper | |
def to_formatted_date(dob): | |
d = to_date(dob) | |
if d: | |
return d.strftime("%d/%m/%Y") | |
else: | |
return "" | |
@html_wrapper | |
def our_duplicates(): | |
dups = look_for_duplicates( | |
"elcid.patients.csv", date_and_name_match, field_names=[ | |
"hospital_number", "date_of_birth", "name" | |
] | |
) | |
def to_html(duplicates): | |
return """ | |
<p>we think there are duplicates for {0}({1}), ie {2}</p> | |
""".format( | |
duplicates[0]["name"], | |
to_formatted_date(duplicates[0]["date_of_birth"]), | |
", ".join(get_link(i["episode_id"]) for i in duplicates) | |
) | |
return "\n".join(to_html(v) for v in dups) | |
def duplicates_html(): | |
print our_duplicates() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment