Last active
November 1, 2019 18:40
-
-
Save gm3dmo/cf078698d006bc9a995fd10dfe95051d to your computer and use it in GitHub Desktop.
Anonymizing data in CSV Files
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
""" | |
__author__ = "David Morris" | |
__version__ = "0.1.0" | |
__license__ = "MIT" | |
import os | |
import sys | |
import csv | |
import logzero | |
import logging | |
import pprint | |
import datetime | |
import uuid | |
import dmninolib as nm | |
import glob | |
from logzero import logger | |
from faker import Faker | |
import sdp | |
def replaceNamesCached(variant, row, cache, ln='Trout', type='surname'): | |
"""Cache the names so that structure is preserved across csv's but not accross different runs""" | |
if variant in row.keys(): | |
if row[variant] == "": | |
logger.warn(f"""{variant} is blank that could be tasty""") | |
pass | |
else: | |
oldname = row[variant] | |
if oldname in cache.keys(): | |
logger.debug(f"""cache hit on {variant} (<redacted>) for (<redacted>)""") | |
row[variant] = cache[oldname] | |
else: | |
fake = Faker() | |
ln = fake.last_name() | |
fn = fake.first_name() | |
newname = ln.upper() | |
row[variant] = newname | |
cache[oldname] = newname | |
logger.debug(f"""swapping {variant} (<redacted>) for {newname}""") | |
return (row, cache) | |
def replaceNINOCached(variant, row, cache, type='9digit'): | |
if variant in row.keys(): | |
if row[variant] in cache: | |
nino = cache[row[variant]] | |
nino_suffix_text = nino[-1:] | |
logger.debug(f'seen before and swapping: (<redacted>) switching to: {nino[:8]}') | |
#row[c] = nino[:8] | |
row[variant] = nino | |
if 'NINO_SUFFIX_TEXT' in row: | |
row['NINO_SUFFIX_TEXT'] = nino_suffix_text | |
elif 'NINOSUFFIX_TEXT' in row: | |
row['NINOSUFFIX_TEXT'] = nino_suffix_text | |
else: | |
newnino = nm.doMeANino() | |
newnino_suffix_text = newnino[-1:] | |
cache[row[variant]] = newnino | |
logger.debug(f'swapping: (<redacted>) to {newnino}') | |
#row[c] = newnino[:8] | |
row[variant] = newnino | |
if 'NINO_SUFFIX_TEXT' in row: | |
row['NINO_SUFFIX_TEXT'] = newnino_suffix_text | |
elif 'NINOSUFFIX_TEXT' in row: | |
row['NINOSUFFIX_TEXT'] = nino_suffix_text | |
return (row, cache) | |
def replaceNames(variant, row, ln='Trout', type='surname'): | |
fake = Faker() | |
ln = fake.last_name() | |
fn = fake.first_name() | |
if variant in row.keys(): | |
if row[variant] == "": | |
logger.warn(f"""{variant} is blank that could be tasty""") | |
pass | |
else: | |
logger.debug(f"""swapping {variant} (<redacted>) for {ln.upper()}""") | |
row[variant] = ln.upper() | |
return row | |
def replaceKnownAs(variant, row, newvalue="NULL"): | |
if variant in row.keys(): | |
if row[variant] == "": | |
logger.warn(f"""{variant} is blank that could be tasty""") | |
pass | |
else: | |
logger.debug(f"""swapping {variant} (<redacted>) for {newvalue.upper()}""") | |
row[variant] = newvalue.upper() | |
return row | |
def replacePostcode(variant, row, postcode="LS1 1AA"): | |
if variant in row.keys(): | |
if row[variant] == "": | |
logger.warn(f"""{variant} is blank that could be tasty""") | |
pass | |
else: | |
logger.debug(f"""swapping {variant} (<redacted>) for {postcode.upper()}""") | |
row[variant] = postcode.upper() | |
return row | |
def replaceGUID(variant, row): | |
g = str(uuid.uuid4()) | |
if variant in row.keys(): | |
if row[variant] == "": | |
logger.warn(f"""{variant} is blank that could be tasty""") | |
pass | |
else: | |
logger.debug(f"""swapping {variant} (<redacted>) for {g}""") | |
row[variant] = g | |
return row | |
def replaceDob(row, variant): | |
"""Process a row of csv looking for variants of date of birth and replacing them with the date defined in newDob. | |
23Mar1974 | |
""" | |
if variant in row.keys(): | |
if row[variant] == "": | |
logger.warn(f"""{variant} is blank that could be tasty""") | |
pass | |
else: | |
year_of_birth = row[variant][-4:] | |
newDob = f"01Jan{year_of_birth}" | |
logger.debug(f"""swapping {variant} (<redacted>) for {newDob} {year_of_birth}""") | |
row[variant] = newDob | |
return row | |
def main(): | |
"""Process csv files and replace real ninos with made up ninos. | |
It is desirable to preserve the structure of the ninos in the | |
file, replace an already seen nino with it's cached value stored | |
in *nino_cache*. This happens only across a single run of this | |
program, each subseqent run will cause a different nino to be | |
generated. | |
Surname type columns are cached only per row, so *SMITH* may | |
be replaced with *JONES* in row 1 but with *KNOWLES* in a | |
subsequent run. | |
the lists of columns like: | |
nino_columns = [ 'NINO', 'NINO1' ] | |
are the mechanism by which the column names which should be cleansed are chosen. | |
""" | |
#logzero.loglevel(logging.DEBUG) | |
logzero.loglevel(logging.INFO) | |
config = sdp.ConfigProduction | |
nino_cache = {} | |
surname_cache = {} | |
forename_cache = {} | |
filename = sys.argv[1] | |
p, f = os.path.split(filename) | |
of = '/tmp/test.csv' | |
logger.info(f'filename: {filename} output_file: {of}') | |
#csvin = csv.DictReader(open(filename, mode='r', encoding='utf-8-sig'), dialect=csv.excel) | |
csvin = csv.DictReader(open(filename, mode='r', encoding='cp1252'), dialect=csv.excel) | |
#csvin = csv.DictReader(open(filename, mode='r'), dialect=csv.excel) | |
i = csvin.fieldnames | |
csvout = csv.DictWriter(open(of, 'w'), fieldnames=i) | |
csvout.writeheader() | |
logger.debug(f'csvin: {type(csvin)}') | |
logger.debug(f'headers: {i}') | |
# Set the names of the columns in the that you wan to be anonymised: | |
guid_columns = ['CID'] | |
nino_columns = ['NINO', 'NINO1', 'NC_NINO', 'CCNINO', 'ClaimantNINO'] | |
forename_columns = [ 'FORENAME_1', 'NC_FORENAME_1', 'FORENAME_11', 'CCFORENAME', 'FIRSTNAME', 'FirstName' ] | |
knownAsName_columns = [ 'KnownAsName' ] | |
surname_columns = ['SURNAME', 'SURNAME1', 'NC_SURNAME', 'CCSURNAME', 'LASTNAME', 'LastName', 'MiddleName' ] | |
dob_columns = ['DATE_OF_BIRTH', 'NC_DATE_OF_BIRTH', 'DATE_OF_BIRTH1'] | |
postcode_columns = ['POSTCODE', 'RESIDENTIAL_POSTCODE', 'ClaimantPostCode'] | |
for row in csvin: | |
logger.debug(row) | |
for variant in nino_columns: | |
row, nino_cache = replaceNINOCached(variant, row, nino_cache) | |
for variant in surname_columns: | |
row, surname_cache = replaceNamesCached(variant, row, surname_cache) | |
for variant in forename_columns: | |
row, forename_cache = replaceNamesCached(variant, row, forename_cache) | |
for variant in knownAsName_columns: | |
row = replaceKnownAs(variant, row) | |
# guids | |
for variant in guid_columns: | |
row = replaceGUID(variant, row) | |
# postcode | |
for variant in postcode_columns: | |
row = replacePostcode(variant, row) | |
# DOB | |
for variant in dob_columns: | |
row = replaceDob(row, variant) | |
csvout.writerow(row) | |
logger.info('') | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
""" | |
import exrex | |
def doMeANino(realnino=False): | |
"""""" | |
if realnino == True: | |
r = '^[A-CEGHJ-PR-TW-Z]{1}[A-CEGHJ-NPR-TW-Z]{1}[0-9]{6}[A-D]{1}$' | |
else: | |
r = '[Q]{1}[A-CEGHJ-NPR-TW-Z]{1}[0-9]{6}[A-D]{1}$' | |
n = exrex.getone(r) | |
return n | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment