Skip to content

Instantly share code, notes, and snippets.

@fredkingham
Last active April 18, 2016 16:44
Show Gist options
  • Save fredkingham/1da226d800c69fb652d9726c0035a8b4 to your computer and use it in GitHub Desktop.
Save fredkingham/1da226d800c69fb652d9726c0035a8b4 to your computer and use it in GitHub Desktop.
import csv
from datetime import datetime
fields = []
primary_key = ""
csv_file_1 = "some_file_name.csv"
csv_file_2 = "some_file_name.csv"
class CsvEncoder(object):
def __init__(self, file_name, some_fun, field_names=None):
self.file_name = file_name
self.some_fun = some_fun
self.field_names = field_names
def get_dict_reader(self, some_csv):
if self.field_names:
return csv.DictReader(some_csv, fieldnames=self.field_names)
else:
return csv.DictReader(some_csv)
def set_from_function(self):
with open(self.file_name, "r") as some_csv:
reader = self.get_dict_reader(some_csv)
result = {self.some_fun(i) for i in reader}
return result
def rows_from_calculated(self, some_calculated):
result = list()
with open(self.file_name, "r") as some_csv:
reader = self.get_dict_reader(some_csv)
for i in reader:
calculated = self.some_fun(i)
if calculated in some_calculated:
result.append(i)
return result
def set_not_including(self, another_csv_encoder):
with open(self.file_name, "r") as some_csv:
reader = self.get_dict_reader(some_csv)
existing = another_csv_encoder.set_from_function()
result = set()
with open(self.file_name, "r") as some_csv:
reader = self.get_dict_reader(some_csv)
for i in reader:
calculated = self.some_fun(i)
if calculated not in existing:
result.add(calculated)
return result
def get_matches():
# date_of_births
uch_nd_data = CsvEncoder(
"elcid.live.pmi.data.csv",
date_and_name_match,
field_names=["hospital_number", "date_of_birth", "name"]
)
elcid_nd_data = CsvEncoder(
"elcid.patients.csv",
date_and_name_match
)
missing_name_dob = elcid_nd_data.set_not_including(uch_nd_data)
missing_name_dob_rows = elcid_nd_data.rows_from_calculated(missing_name_dob)
for i in missing_name_dob_rows:
html = "we think {0} - {1}({2}) at {3} has the wrong name or date of birth"
html = html.format(i["hospital_number"], i["name"], to_formatted_date(i["date_of_birth"]), get_link(i["episode_id"]))
print html
class CsvDiff(object):
def __init__(
self,
file_a,
file_b,
file_a_field_names=None,
file_b_field_names=None
):
self.file_a = file_a
self.field_names_for_file = {
file_a: file_a_field_names,
file_b: file_b_field_names
}
self.file_a_field_names = file_a_field_names
self.file_b = file_b
self.file_b_field_names = get_matches
def by_fun(self, some_csv_file, some_fun):
with open(some_csv_file, "r") as some_csv:
field_names = self.field_names_for_file.get(some_csv_file)
reader = csv.DictReader(some_csv, fieldnames=field_names)
result = {some_fun(row) for row in reader}
return result
def get_from_funs(self, some_csv_file, some_rows, some_fun):
with open(some_csv_file, "r") as some_csv:
result = set()
field_names = self.field_names_for_file.get(some_csv_file)
reader = csv.DictReader(some_csv, fieldnames=field_names)
for row in reader:
r = some_fun(row)
if r in some_rows:
result.add(tuple(row.itervalues()))
return result
def in_one_not_in_the_other_for_fun(self, some_fun):
# having a lot of memory issues so we're going the slow
# way round to limit the amount we're putting in memory
# step 1. Create set of the computed function
# step 2. Create a set of the ones that are found
# step 3. the output of step 1 - step 2 are what are missing
# things to note, we don't just pop out from the set
# because the function is not necessarily unique
# e.g. name, dob which would give us fale positives
some_file = self.file_a
some_other_file = self.file_b
other = self.by_fun(some_other_file, some_fun)
field_names = self.field_names_for_file.get(some_file)
found = set()
with open(some_file) as some_csv:
reader = csv.DictReader(some_csv, fieldnames=field_names)
for our_row in reader:
calculated = some_fun(our_row)
if calculated in other:
found.add(tuple(our_row.itervalues()))
return other - found
def in_both(self, some_fun):
some_file = self.file_a
some_other_file = self.file_b
other = self.by_fun(some_other_file, some_fun)
field_names = self.field_names_for_file.get(some_file)
found = set()
with open(some_file) as some_csv:
reader = csv.DictReader(some_csv, fieldnames=field_names)
for our_row in reader:
calculated = some_fun(our_row)
if some_fun(our_row) in other:
found.add(calculated)
return found
def to_date(dob):
if dob:
dob = dob.strip()
if not dob == "NULL":
try:
return datetime.strptime(dob[:10], "%Y-%m-%d").date()
except:
pass
def date_and_name_match(row):
return (to_date(row["date_of_birth"]), row["name"].strip(),)
def hospital_number_match(row):
return row["hospital_number"].strip()
# def get_matches():
# csv_diff_1 = CsvDiff(
# "elcid.live.pmi.data.csv",
# "elcid.patients.csv",
# file_a_field_names=["hospital_number", "date_of_birth", "name"]
# )
# # gives us all the names and dobs that can't be found
# missing_name_dob = csv_diff_1.in_one_not_in_the_other_for_fun(date_and_name_match)
#
# # gives us all the hospital numbers that can be found
# hospital_number_matches = csv_diff_1.in_both(hospital_number_match)
# import ipdb; ipdb.set_trace()
#
# # gives us everything that has a correct hospital number but not a matching name dob
# missing = missing_name_dob - hospital_number_matches
# wrong = missing_name_dob.intersection(hospital_number_matches)
#
# return dict(
# wrong=wrong, missing=missing
# )
#
# # csv_diff_2 = CsvDiff(
# # "elcid.live.pmi.data.csv",
# # "elcid.patients.csv",
# # hospital_number_match,
# # file_a_field_names=["hospital_number", "date_of_birth", "name"]
# # )
# # return (
# # csv_diff_1.in_one_not_in_the_other_for_fun(),
# # csv_diff_2.in_one_not_in_the_other_for_fun(),
# # )
def combined_matches():
missing_hospital_numbers, wrong_names = get_matches()
# print "missing hospital numbers"
# print missing_hospital_numbers
# print "wrong names"
# print wrong_names
# return missing_hospital_numbers, wrong_names
def look_for_duplicates(some_csv_file, some_fun, field_names):
from collections import defaultdict
hospital_numbers = set()
result = defaultdict(list)
with open(some_csv_file) as some_csv:
reader = csv.DictReader(some_csv)
for row in reader:
if row not in result[some_fun(row)]:
if row["hospital_number"] not in hospital_numbers:
result[some_fun(row)].append(row)
hospital_numbers.add(row["hospital_number"])
return (v for v in result.itervalues() if len(v) > 1)
def get_link(episode_id):
return '<a href="http://elcidl/#/episode/{0}">{0}</a>'.format(episode_id.strip())
def html_wrapper(some_fun):
def wrapper(*args, **kwargs):
return "<html><head></head><body>{}</body></html>".format(some_fun(*args, **kwargs))
return wrapper
def to_formatted_date(dob):
d = to_date(dob)
if d:
return d.strftime("%d/%m/%Y")
else:
return ""
@html_wrapper
def our_duplicates():
dups = look_for_duplicates(
"elcid.patients.csv", date_and_name_match, field_names=[
"hospital_number", "date_of_birth", "name"
]
)
def to_html(duplicates):
return """
<p>we think there are duplicates for {0}({1}), ie {2}</p>
""".format(
duplicates[0]["name"],
to_formatted_date(duplicates[0]["date_of_birth"]),
", ".join(get_link(i["episode_id"]) for i in duplicates)
)
return "\n".join(to_html(v) for v in dups)
def duplicates_html():
print our_duplicates()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment