Skip to content

Instantly share code, notes, and snippets.

@LindsayYoung
Created February 6, 2023 19:14
Show Gist options
  • Save LindsayYoung/1933b9aad53c1ee8dccd321733edf244 to your computer and use it in GitHub Desktop.
Save LindsayYoung/1933b9aad53c1ee8dccd321733edf244 to your computer and use it in GitHub Desktop.
Create models from a data key
"""
Download data from https://facdissem.census.gov/PublicDataDownloads.aspx
Then unzip the files and place the them in data_distro/data_to_load/
Load them with: manage.py public_data_loader
"""
import csv
import re
from os import walk, system
import traceback
from pandas import read_csv
import logging
from dateutil import parser
from django.core.management.base import BaseCommand
from data_distro import models as mods
from data_distro.download_model_dictonaries import (
model_title_transforms,
field_transforms,
table_mappings,
field_mappings,
boolen_fields,
)
logger = logging.getLogger(__name__)
def file_clean(all_file_names):
"""Grab just the files we want, no ds_store etc"""
table_names = list(table_mappings.keys())
file_names = []
for f in all_file_names:
# I am starting with loading from the FY 22 downloads
name = f.replace("22.txt", "")
if name in table_names:
file_names.append(f)
return file_names
def rename_headers(df):
"""Replaces headers with new field names that match the models"""
headers_list = df.columns
new_headers = []
for header in headers_list:
new_headers.append(field_mappings[header])
df.columns = new_headers
return df
def data_transform(field_name, payload):
"""Some fileld data needs to be altered in order to load it"""
if field_name in boolen_fields:
boolean_conversion = {"Y": True, "N": False}
payload = boolean_conversion.get(payload, None)
# CfdaInfo
if field_name == "cfda":
payload = str(payload)
# CfdaInfo, auditee_fax is General
if (
field_name
in [
"cluster_total",
"cpa_fax",
"auditee_fax",
"cognizant_agency",
"ein_subcode",
]
and str(payload) == "nan"
):
payload = None
# Dates are only in the gen table
if "date" in field_name:
if str(payload) == "nan":
payload = None
else:
payload = parser.parse(payload)
# These should be integers, but Pandas can think they are floats
if field_name == "cognizant_agency":
if type(payload) is float:
if payload.is_integer():
payload = str(int(payload))
# trying this our on just General
if str(payload) == "nan":
payload = None
# #debug which column is triggering an int out of range error
# if type(payload) == int:
# if payload > 2147483647:
# print("PROBLEM int~~~~~~~~~~~~~~~", field_name, payload)
# if type(payload) == int:
# if payload > 2147483647:
# print("PROBLEM float~~~~~~~~~~~~~", field_name, payload)
return payload
def handle_exceptions(
table, file_path, instance_dict, fac_model_name, error_trace, exceptions_list
):
"""Add detailed explanations to the logs and keep track of each type of error"""
logger.warn(
"""
---------------------PROBLEM---------------------
{table}, {file_path}
----
{instance_dict}
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
{trace}
-------------------------------------------------
""".format(
table=table,
file_path=file_path,
instance_dict=instance_dict,
trace=error_trace,
)
)
problem_text = "Error loading {file_path} into {fac_model_name}: \n \
{trace}".format(
file_path=file_path,
fac_model_name=fac_model_name,
instance_dict=instance_dict,
trace=error_trace,
)
if problem_text not in exceptions_list:
exceptions_list.append(problem_text)
def load_files(load_file_names):
"""Load files into django models"""
exceptions_list = []
exceptions_count = 0
for f in load_file_names:
file_path = "data_distro/data_to_load/{}".format(f)
file_name = file_path.replace("data_distro/data_to_load/", "")
table = re.sub("22.txt", "", file_name)
fac_model_name = table_mappings[table]
fac_model = getattr(mods, fac_model_name)
logger.warn(
"Starting to load {0} into {1}...".format(file_name, fac_model_name)
)
for i, chunk in enumerate(
read_csv(file_path, chunksize=35000, sep="|", encoding="mac-roman")
):
chunk_with_headers = rename_headers(chunk)
csv_dict = chunk_with_headers.to_dict(orient="records")
for row in csv_dict:
try:
fields = list(row.keys())
instance_dict = {}
for field_name in fields:
instance_dict[field_name] = data_transform(
field_name, row[field_name]
)
p, created = fac_model.objects.get_or_create(**instance_dict)
except Exception:
handle_exceptions(
table,
file_path,
instance_dict,
fac_model_name,
traceback.format_exc(),
exceptions_list,
)
exceptions_count += 1
logger.warn("finished chunk")
logger.warn("Finished {0}".format(file_name))
return exceptions_list, exceptions_count
class Command(BaseCommand):
help = """
Loads data from public download files into Django models. It will automatically \
crawl "/backend/data_distro/data_to_load". \
If you just want one file, you can pass the name of the file with -p.
"""
def add_arguments(self, parser):
parser.add_argument("-f", "--file", type=str, help="file name")
def handle(self, *args, **kwargs):
"""
1) Find the files for upload
2) Grab just the files we want
3) Load data into Django models
"""
if kwargs["file"] is not None:
load_file_names = [kwargs["file"]]
else:
file_names = next(walk("data_distro/data_to_load"), (None, None, []))[2]
load_file_names = file_clean(file_names)
errors, exceptions_count = load_files(load_file_names)
if exceptions_count > 0:
message = """###############################
{0} error types in {1} records:
###############################""".format(
len(errors), exceptions_count
)
for err in errors:
message += """
{0}
""".format(
err
)
logger.error(message)
else:
logger.warn("Successful upload")
# Doing this in console for a one-time data grab and mapping using the data download key.
# Then, I can use the mappings to evolve the import script.
from pandas import read_csv
from data_distro.download_model_dictonaries import (
table_mappings_from_key,
field_mappings,
)
from data_distro.v1_crosswalk_for_docs import doc_metadata
""" getting docs """
# detailed description generator
data_key = read_csv(
"data_distro/data_to_load/DataDownloadKey.csv", encoding="mac-roman"
)
data_dict = data_key.to_dict(orient="records")
# want to create
sample_data = {
"new_name": {
"models": ["Model1", "Model2"],
# There are more than one original names that can map to a new name
# The model order and the original name order should be the same
"original_name": ["ORIGINALNAME", "ORIGINALNAME2"],
"descriptions": ["description1", "description2"],
"description_model1": "description1",
"description_model2": "description2",
"forms": ["SAC 1997-200, line 32", "SF-SAC 2022, line 7"],
"forms_modlel1": ["SAC 1997-200, line 32", "SF-SAC 2022, line 7"],
"forms_modlel2": ["SAC 1997-200, line 32", "SF-SAC 2022, line 7"],
"all_years": False,
}
}
doc_metadata = {}
# I don't think this is working as-is but I don't think I will need it again. I am keeping it a bit longer in case I am wrong about not needing it.
for row in data_dict:
for field_data in field_mappings.keys():
data = doc_metadata[field_data]
new_name = field_data
new_model = table_mappings_from_key[row["TABLE"]]
forms_per_model = "forms_{0}".format(new_model)
print(new_name, field_data, data)
if new_name not in doc_metadata:
doc_metadata[new_name] = {}
d = doc_metadata[new_name]
d["models"] = [new_model]
d["original_name"] = [row["FIELD NAME"]]
# populate values below
d["forms"] = []
d[forms_per_model] = []
d["descriptions"] = []
else:
d = doc_metadata[new_name]
d["models"].append(new_model)
d["original_name"].append(row["FIELD NAME"])
d[forms_per_model] = [row["FIELD NAME"]]
for header in row.keys():
value = row[header]
# float catches the nans
if header.startswith("SF-SAC") and type(value) != float:
form_location = "{}: {}".format(header, value)
d["forms"].append(form_location)
d[forms_per_model].append(form_location)
d["descriptions"].append(row["DESCRIPTION"])
d["description_{0}".format(new_model)] = row["DESCRIPTION"]
d["original_table_{0}".format(new_model)] = row["TABLE"]
# If I need to check if something can be null later
if len(d["descriptions"]) < 9:
d["all_years"] = True
if len(d["descriptions"]) == 9:
d["all_years"] = False
"""Use doc metadata to generate doc string documentation"""
for field in doc_metadata.keys():
for model in doc_metadata[field]["models"]:
data = doc_metadata[field]
table_key = "original_table_{}".format(model)
table = (data[table_key],)
original_name = (data["original_name"],)
if len(data["models"]) > 1:
differentiator = "_{0}".format(model.lower())
else:
differentiator = ""
if len(data["forms_{}".format(model)]) > 0:
form_list = data["forms_{}".format(model)]
form_text = "; ".join(form_list)
sources = "Data sources: {0} ".format(form_text)
else:
sources = ""
docstring = '{field_name}{differentiator}_doc = "{sources}Census mapping: {table}, {original_name}"'.format(
field_name=field,
differentiator=differentiator,
sources=sources,
table=table[0],
original_name=original_name[0],
)
print(docstring)
"""first pass at making models from the download key"""
field_transforms = []
model_transforms = []
model_dict = {}
def make_model_name(table_name):
"""make a key to transform names of tables to models"""
name = table_name.title()
name = name.replace(" ", "")
if {table_name.strip(): name} not in model_transforms:
model_transforms.append({table_name.strip(): name})
return name
def make_field_name(field_name):
"""make a key to transform names of column name to field names"""
name = field_name.replace(" ", "").lower()
if {field_name.strip(): name} not in field_transforms:
field_transforms.append({field_name.strip(): name})
return name
for d in data.split("\n"):
line = d.split("|")
model = make_model_name(line[0])
if model not in model_dict:
model_dict[model] = {}
model_dict[model][make_field_name(line[1])] = [
line[2].strip(),
line[3].strip(),
line[4],
]
# create model text, just printing to console
for model in model_dict:
print("class {model}(models.Model):".format(model=model))
for element_key in model_dict[model]:
print(
""" {field_name} = models.{field_type}("{description}", {validation}, help_text=docs.{field_name})""".format(
field_name=element_key,
field_type=model_dict[model][element_key][1],
description=model_dict[model][element_key][0],
validation=model_dict[model][element_key][2].strip(),
)
)
print("\n")
# inspect the models to create a script that crates a key we can use to upload the data
from django.apps import apps
# these I made as temp scripts and then saved outputs
from data_distro.download_model_dictonaries import (
table_mappings,
file_to_table_name_mapping,
)
# From a given table, look at the column name and know where to load the data
sample_upload_mapping_structure = {
"table file name": {
"column_name": ["model_name", "django_field_name"],
"column_name2": ["model_name", "django_field_name2"],
}
}
def make_table_structure():
add_realtional = []
blank_help = []
leftovers = []
# preload tables into a dict
upload_mapping = {}
for table_title in table_mappings.keys():
upload_mapping[table_title] = {}
new_fields = [
# django generates
"id",
# I added
"is_public",
"modified_date",
# relational links these will change if you move fields around
"general",
"findings_text",
]
leftovers = []
distro_classes = apps.all_models["data_distro"]
# this should be enough to make a key
for model in distro_classes:
mod_class = distro_classes[model]
mod_name = mod_class.__name__
fields = mod_class._meta.get_fields()
for field in fields:
f_name = field.name
try:
help_text = field.help_text
except AttributeError:
help_text = ""
add_realtional.append([f_name, model])
new_fields.append(f_name)
if help_text != "":
help_text = field.help_text
source = help_text.split("Census mapping: ", 1)[1]
table_doc_name = source.split(", ", 1)[0]
column_name = source.split(", ", 1)[1]
table_file_name = file_to_table_name_mapping[
table_doc_name.upper().replace(" ", "")
]
upload_mapping[table_file_name][column_name] = [mod_name, f_name]
else:
if f_name not in new_fields:
blank_help.append(f_name)
else:
# just a check
leftovers.append(f_name)
if len(blank_help) > 0:
print("~ WARNING ~ Check blank fields: blank_help={0}".format(blank_help))
# this should relational fields
print(
"Fields with no help (relational and array models that need some custom logic for loading): add_realtional={0}".format(
add_realtional
)
)
return upload_mapping, add_realtional
### Below is a partial sample of data output
# Need to create these links with upload
add_realtional = [
["general", "auditee"],
["general", "auditor"],
["general", "cfdainfo"],
["findings", "findingstext"],
["general", "findings"],
["general", "captext"],
["general", "notes"],
["general", "revisions"],
["general", "agencies"],
["general", "passthrough"],
]
# these should be relational fields and list fields
blank_help = [
"duns_list",
"uei_list",
"auditor",
"agency",
]
# just to confirm all data is accounted for as expected
leftovers = [
"general",
"id",
"create_date",
"data_source",
]
# mapping for upload
# I do a manual reveiw to make sure that it is correct and to handle edge cases
upload_mapping = {
"gen": {
"AUDITEECERTIFYNAME": ["Auditee", "auditee_certify_name"],
"CPASTATE": ["Auditor", "cpa_state"],
"CPACITY": ["Auditor", "cpa_city"],
"CPATITLE": ["Auditor", "cpa_title"],
"CPAEMAIL": ["Auditor", "cpa_email"],
"CPAFIRMNAME": ["Auditor", "cpa_firm_name"],
"CPAEIN": ["Auditor", "cpa_ein"],
# this needs to go 2 places will handle manually
# "DBKEY": ["Auditor", "dbkey"],
"SEQNUM": ["Auditor", "seqnum"],
"CPAFOREIGN": ["Auditor", "cpa_foreign"],
# This is the same as CDFA auditor EIN. Ignore here and load from CDFA table
# "AUDITOR_EIN": ["CfdaInfo", "auditor_ein"],
"COGAGENCY": ["General", "cognizant_agency"],
"COG_OVER": ["General", "cognizant_agency_over"],
"REPORTABLECONDITION": ["General", "reportable_condition"],
"SIGNIFICANTDEFICIENCY": ["General", "significant_deficiency"],
"CYFINDINGS": ["General", "current_or_former_findings"],
"DATEFIREWALL": ["General", "date_firewall"],
# this needs to go 2 places
"DBKEY": ["General", "dbkey"],
"DOLLARTHRESHOLD": ["General", "dollar_threshold"],
"DUP_REPORTS": ["General", "dup_reports"],
"TYPEREPORT_MP": ["General", "type_report_major_program"],
"TYPEREPORT_SP_FRAMEWORK": ["General", "type_report_special_purpose_framework"],
},
"cfda": {
"RD": ["CfdaInfo", "research_and_development"],
"LOANS": ["CfdaInfo", "loans"],
"ARRA": ["CfdaInfo", "arra"],
"TYPEREPORT_MP": ["CfdaInfo", "type_report_major_program"],
"FINDINGSCOUNT": ["CfdaInfo", "findings_count"],
"ELECAUDITSID": ["CfdaInfo", "elec_audits_id"],
"DBKEY": ["CfdaInfo", "dbkey"],
"AUDITYEAR": ["CfdaInfo", "audit_year"],
"QCOSTS2": ["CfdaInfo", "questioned_costs"],
"FINDINGS": ["CfdaInfo", "findings"],
"EIN": ["CfdaInfo", "auditor_ein"],
},
"findings": {
"MODIFIEDOPINION": ["Findings", "modified_opinion"],
"OTHERNONCOMPLIANCE": ["Findings", "other_non_compliance"],
"MATERIALWEAKNESS": ["Findings", "material_weakness"],
"PRIORFINDINGREFNUMS": ["Findings", "prior_finding_ref_nums"],
"TYPEREQUIREMENT": ["Findings", "type_requirement"],
"ELECAUDITSID": ["Findings", "elec_audits_id"],
"ELECAUDITFINDINGSID": ["Findings", "elec_audit_findings_id"],
"AUDITYEAR": ["Findings", "audit_year"],
"DBKEY": ["Findings", "dbkey"],
},
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment