Created
February 6, 2023 19:14
-
-
Save LindsayYoung/1933b9aad53c1ee8dccd321733edf244 to your computer and use it in GitHub Desktop.
Create models from a data key
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Download data from https://facdissem.census.gov/PublicDataDownloads.aspx | |
Then unzip the files and place the them in data_distro/data_to_load/ | |
Load them with: manage.py public_data_loader | |
""" | |
import csv | |
import re | |
from os import walk, system | |
import traceback | |
from pandas import read_csv | |
import logging | |
from dateutil import parser | |
from django.core.management.base import BaseCommand | |
from data_distro import models as mods | |
from data_distro.download_model_dictonaries import ( | |
model_title_transforms, | |
field_transforms, | |
table_mappings, | |
field_mappings, | |
boolen_fields, | |
) | |
logger = logging.getLogger(__name__) | |
def file_clean(all_file_names): | |
"""Grab just the files we want, no ds_store etc""" | |
table_names = list(table_mappings.keys()) | |
file_names = [] | |
for f in all_file_names: | |
# I am starting with loading from the FY 22 downloads | |
name = f.replace("22.txt", "") | |
if name in table_names: | |
file_names.append(f) | |
return file_names | |
def rename_headers(df): | |
"""Replaces headers with new field names that match the models""" | |
headers_list = df.columns | |
new_headers = [] | |
for header in headers_list: | |
new_headers.append(field_mappings[header]) | |
df.columns = new_headers | |
return df | |
def data_transform(field_name, payload): | |
"""Some fileld data needs to be altered in order to load it""" | |
if field_name in boolen_fields: | |
boolean_conversion = {"Y": True, "N": False} | |
payload = boolean_conversion.get(payload, None) | |
# CfdaInfo | |
if field_name == "cfda": | |
payload = str(payload) | |
# CfdaInfo, auditee_fax is General | |
if ( | |
field_name | |
in [ | |
"cluster_total", | |
"cpa_fax", | |
"auditee_fax", | |
"cognizant_agency", | |
"ein_subcode", | |
] | |
and str(payload) == "nan" | |
): | |
payload = None | |
# Dates are only in the gen table | |
if "date" in field_name: | |
if str(payload) == "nan": | |
payload = None | |
else: | |
payload = parser.parse(payload) | |
# These should be integers, but Pandas can think they are floats | |
if field_name == "cognizant_agency": | |
if type(payload) is float: | |
if payload.is_integer(): | |
payload = str(int(payload)) | |
# trying this our on just General | |
if str(payload) == "nan": | |
payload = None | |
# #debug which column is triggering an int out of range error | |
# if type(payload) == int: | |
# if payload > 2147483647: | |
# print("PROBLEM int~~~~~~~~~~~~~~~", field_name, payload) | |
# if type(payload) == int: | |
# if payload > 2147483647: | |
# print("PROBLEM float~~~~~~~~~~~~~", field_name, payload) | |
return payload | |
def handle_exceptions( | |
table, file_path, instance_dict, fac_model_name, error_trace, exceptions_list | |
): | |
"""Add detailed explanations to the logs and keep track of each type of error""" | |
logger.warn( | |
""" | |
---------------------PROBLEM--------------------- | |
{table}, {file_path} | |
---- | |
{instance_dict} | |
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
{trace} | |
------------------------------------------------- | |
""".format( | |
table=table, | |
file_path=file_path, | |
instance_dict=instance_dict, | |
trace=error_trace, | |
) | |
) | |
problem_text = "Error loading {file_path} into {fac_model_name}: \n \ | |
{trace}".format( | |
file_path=file_path, | |
fac_model_name=fac_model_name, | |
instance_dict=instance_dict, | |
trace=error_trace, | |
) | |
if problem_text not in exceptions_list: | |
exceptions_list.append(problem_text) | |
def load_files(load_file_names): | |
"""Load files into django models""" | |
exceptions_list = [] | |
exceptions_count = 0 | |
for f in load_file_names: | |
file_path = "data_distro/data_to_load/{}".format(f) | |
file_name = file_path.replace("data_distro/data_to_load/", "") | |
table = re.sub("22.txt", "", file_name) | |
fac_model_name = table_mappings[table] | |
fac_model = getattr(mods, fac_model_name) | |
logger.warn( | |
"Starting to load {0} into {1}...".format(file_name, fac_model_name) | |
) | |
for i, chunk in enumerate( | |
read_csv(file_path, chunksize=35000, sep="|", encoding="mac-roman") | |
): | |
chunk_with_headers = rename_headers(chunk) | |
csv_dict = chunk_with_headers.to_dict(orient="records") | |
for row in csv_dict: | |
try: | |
fields = list(row.keys()) | |
instance_dict = {} | |
for field_name in fields: | |
instance_dict[field_name] = data_transform( | |
field_name, row[field_name] | |
) | |
p, created = fac_model.objects.get_or_create(**instance_dict) | |
except Exception: | |
handle_exceptions( | |
table, | |
file_path, | |
instance_dict, | |
fac_model_name, | |
traceback.format_exc(), | |
exceptions_list, | |
) | |
exceptions_count += 1 | |
logger.warn("finished chunk") | |
logger.warn("Finished {0}".format(file_name)) | |
return exceptions_list, exceptions_count | |
class Command(BaseCommand): | |
help = """ | |
Loads data from public download files into Django models. It will automatically \ | |
crawl "/backend/data_distro/data_to_load". \ | |
If you just want one file, you can pass the name of the file with -p. | |
""" | |
def add_arguments(self, parser): | |
parser.add_argument("-f", "--file", type=str, help="file name") | |
def handle(self, *args, **kwargs): | |
""" | |
1) Find the files for upload | |
2) Grab just the files we want | |
3) Load data into Django models | |
""" | |
if kwargs["file"] is not None: | |
load_file_names = [kwargs["file"]] | |
else: | |
file_names = next(walk("data_distro/data_to_load"), (None, None, []))[2] | |
load_file_names = file_clean(file_names) | |
errors, exceptions_count = load_files(load_file_names) | |
if exceptions_count > 0: | |
message = """############################### | |
{0} error types in {1} records: | |
###############################""".format( | |
len(errors), exceptions_count | |
) | |
for err in errors: | |
message += """ | |
{0} | |
""".format( | |
err | |
) | |
logger.error(message) | |
else: | |
logger.warn("Successful upload") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Doing this in console for a one-time data grab and mapping using the data download key. | |
# Then, I can use the mappings to evolve the import script. | |
from pandas import read_csv | |
from data_distro.download_model_dictonaries import ( | |
table_mappings_from_key, | |
field_mappings, | |
) | |
from data_distro.v1_crosswalk_for_docs import doc_metadata | |
""" getting docs """ | |
# detailed description generator | |
data_key = read_csv( | |
"data_distro/data_to_load/DataDownloadKey.csv", encoding="mac-roman" | |
) | |
data_dict = data_key.to_dict(orient="records") | |
# want to create | |
sample_data = { | |
"new_name": { | |
"models": ["Model1", "Model2"], | |
# There are more than one original names that can map to a new name | |
# The model order and the original name order should be the same | |
"original_name": ["ORIGINALNAME", "ORIGINALNAME2"], | |
"descriptions": ["description1", "description2"], | |
"description_model1": "description1", | |
"description_model2": "description2", | |
"forms": ["SAC 1997-200, line 32", "SF-SAC 2022, line 7"], | |
"forms_modlel1": ["SAC 1997-200, line 32", "SF-SAC 2022, line 7"], | |
"forms_modlel2": ["SAC 1997-200, line 32", "SF-SAC 2022, line 7"], | |
"all_years": False, | |
} | |
} | |
doc_metadata = {} | |
# I don't think this is working as-is but I don't think I will need it again. I am keeping it a bit longer in case I am wrong about not needing it. | |
for row in data_dict: | |
for field_data in field_mappings.keys(): | |
data = doc_metadata[field_data] | |
new_name = field_data | |
new_model = table_mappings_from_key[row["TABLE"]] | |
forms_per_model = "forms_{0}".format(new_model) | |
print(new_name, field_data, data) | |
if new_name not in doc_metadata: | |
doc_metadata[new_name] = {} | |
d = doc_metadata[new_name] | |
d["models"] = [new_model] | |
d["original_name"] = [row["FIELD NAME"]] | |
# populate values below | |
d["forms"] = [] | |
d[forms_per_model] = [] | |
d["descriptions"] = [] | |
else: | |
d = doc_metadata[new_name] | |
d["models"].append(new_model) | |
d["original_name"].append(row["FIELD NAME"]) | |
d[forms_per_model] = [row["FIELD NAME"]] | |
for header in row.keys(): | |
value = row[header] | |
# float catches the nans | |
if header.startswith("SF-SAC") and type(value) != float: | |
form_location = "{}: {}".format(header, value) | |
d["forms"].append(form_location) | |
d[forms_per_model].append(form_location) | |
d["descriptions"].append(row["DESCRIPTION"]) | |
d["description_{0}".format(new_model)] = row["DESCRIPTION"] | |
d["original_table_{0}".format(new_model)] = row["TABLE"] | |
# If I need to check if something can be null later | |
if len(d["descriptions"]) < 9: | |
d["all_years"] = True | |
if len(d["descriptions"]) == 9: | |
d["all_years"] = False | |
"""Use doc metadata to generate doc string documentation""" | |
for field in doc_metadata.keys(): | |
for model in doc_metadata[field]["models"]: | |
data = doc_metadata[field] | |
table_key = "original_table_{}".format(model) | |
table = (data[table_key],) | |
original_name = (data["original_name"],) | |
if len(data["models"]) > 1: | |
differentiator = "_{0}".format(model.lower()) | |
else: | |
differentiator = "" | |
if len(data["forms_{}".format(model)]) > 0: | |
form_list = data["forms_{}".format(model)] | |
form_text = "; ".join(form_list) | |
sources = "Data sources: {0} ".format(form_text) | |
else: | |
sources = "" | |
docstring = '{field_name}{differentiator}_doc = "{sources}Census mapping: {table}, {original_name}"'.format( | |
field_name=field, | |
differentiator=differentiator, | |
sources=sources, | |
table=table[0], | |
original_name=original_name[0], | |
) | |
print(docstring) | |
"""first pass at making models from the download key""" | |
field_transforms = [] | |
model_transforms = [] | |
model_dict = {} | |
def make_model_name(table_name): | |
"""make a key to transform names of tables to models""" | |
name = table_name.title() | |
name = name.replace(" ", "") | |
if {table_name.strip(): name} not in model_transforms: | |
model_transforms.append({table_name.strip(): name}) | |
return name | |
def make_field_name(field_name): | |
"""make a key to transform names of column name to field names""" | |
name = field_name.replace(" ", "").lower() | |
if {field_name.strip(): name} not in field_transforms: | |
field_transforms.append({field_name.strip(): name}) | |
return name | |
for d in data.split("\n"): | |
line = d.split("|") | |
model = make_model_name(line[0]) | |
if model not in model_dict: | |
model_dict[model] = {} | |
model_dict[model][make_field_name(line[1])] = [ | |
line[2].strip(), | |
line[3].strip(), | |
line[4], | |
] | |
# create model text, just printing to console | |
for model in model_dict: | |
print("class {model}(models.Model):".format(model=model)) | |
for element_key in model_dict[model]: | |
print( | |
""" {field_name} = models.{field_type}("{description}", {validation}, help_text=docs.{field_name})""".format( | |
field_name=element_key, | |
field_type=model_dict[model][element_key][1], | |
description=model_dict[model][element_key][0], | |
validation=model_dict[model][element_key][2].strip(), | |
) | |
) | |
print("\n") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# inspect the models to create a script that crates a key we can use to upload the data | |
from django.apps import apps | |
# these I made as temp scripts and then saved outputs | |
from data_distro.download_model_dictonaries import ( | |
table_mappings, | |
file_to_table_name_mapping, | |
) | |
# From a given table, look at the column name and know where to load the data | |
sample_upload_mapping_structure = { | |
"table file name": { | |
"column_name": ["model_name", "django_field_name"], | |
"column_name2": ["model_name", "django_field_name2"], | |
} | |
} | |
def make_table_structure(): | |
add_realtional = [] | |
blank_help = [] | |
leftovers = [] | |
# preload tables into a dict | |
upload_mapping = {} | |
for table_title in table_mappings.keys(): | |
upload_mapping[table_title] = {} | |
new_fields = [ | |
# django generates | |
"id", | |
# I added | |
"is_public", | |
"modified_date", | |
# relational links these will change if you move fields around | |
"general", | |
"findings_text", | |
] | |
leftovers = [] | |
distro_classes = apps.all_models["data_distro"] | |
# this should be enough to make a key | |
for model in distro_classes: | |
mod_class = distro_classes[model] | |
mod_name = mod_class.__name__ | |
fields = mod_class._meta.get_fields() | |
for field in fields: | |
f_name = field.name | |
try: | |
help_text = field.help_text | |
except AttributeError: | |
help_text = "" | |
add_realtional.append([f_name, model]) | |
new_fields.append(f_name) | |
if help_text != "": | |
help_text = field.help_text | |
source = help_text.split("Census mapping: ", 1)[1] | |
table_doc_name = source.split(", ", 1)[0] | |
column_name = source.split(", ", 1)[1] | |
table_file_name = file_to_table_name_mapping[ | |
table_doc_name.upper().replace(" ", "") | |
] | |
upload_mapping[table_file_name][column_name] = [mod_name, f_name] | |
else: | |
if f_name not in new_fields: | |
blank_help.append(f_name) | |
else: | |
# just a check | |
leftovers.append(f_name) | |
if len(blank_help) > 0: | |
print("~ WARNING ~ Check blank fields: blank_help={0}".format(blank_help)) | |
# this should relational fields | |
print( | |
"Fields with no help (relational and array models that need some custom logic for loading): add_realtional={0}".format( | |
add_realtional | |
) | |
) | |
return upload_mapping, add_realtional | |
### Below is a partial sample of data output | |
# Need to create these links with upload | |
add_realtional = [ | |
["general", "auditee"], | |
["general", "auditor"], | |
["general", "cfdainfo"], | |
["findings", "findingstext"], | |
["general", "findings"], | |
["general", "captext"], | |
["general", "notes"], | |
["general", "revisions"], | |
["general", "agencies"], | |
["general", "passthrough"], | |
] | |
# these should be relational fields and list fields | |
blank_help = [ | |
"duns_list", | |
"uei_list", | |
"auditor", | |
"agency", | |
] | |
# just to confirm all data is accounted for as expected | |
leftovers = [ | |
"general", | |
"id", | |
"create_date", | |
"data_source", | |
] | |
# mapping for upload | |
# I do a manual reveiw to make sure that it is correct and to handle edge cases | |
upload_mapping = { | |
"gen": { | |
"AUDITEECERTIFYNAME": ["Auditee", "auditee_certify_name"], | |
"CPASTATE": ["Auditor", "cpa_state"], | |
"CPACITY": ["Auditor", "cpa_city"], | |
"CPATITLE": ["Auditor", "cpa_title"], | |
"CPAEMAIL": ["Auditor", "cpa_email"], | |
"CPAFIRMNAME": ["Auditor", "cpa_firm_name"], | |
"CPAEIN": ["Auditor", "cpa_ein"], | |
# this needs to go 2 places will handle manually | |
# "DBKEY": ["Auditor", "dbkey"], | |
"SEQNUM": ["Auditor", "seqnum"], | |
"CPAFOREIGN": ["Auditor", "cpa_foreign"], | |
# This is the same as CDFA auditor EIN. Ignore here and load from CDFA table | |
# "AUDITOR_EIN": ["CfdaInfo", "auditor_ein"], | |
"COGAGENCY": ["General", "cognizant_agency"], | |
"COG_OVER": ["General", "cognizant_agency_over"], | |
"REPORTABLECONDITION": ["General", "reportable_condition"], | |
"SIGNIFICANTDEFICIENCY": ["General", "significant_deficiency"], | |
"CYFINDINGS": ["General", "current_or_former_findings"], | |
"DATEFIREWALL": ["General", "date_firewall"], | |
# this needs to go 2 places | |
"DBKEY": ["General", "dbkey"], | |
"DOLLARTHRESHOLD": ["General", "dollar_threshold"], | |
"DUP_REPORTS": ["General", "dup_reports"], | |
"TYPEREPORT_MP": ["General", "type_report_major_program"], | |
"TYPEREPORT_SP_FRAMEWORK": ["General", "type_report_special_purpose_framework"], | |
}, | |
"cfda": { | |
"RD": ["CfdaInfo", "research_and_development"], | |
"LOANS": ["CfdaInfo", "loans"], | |
"ARRA": ["CfdaInfo", "arra"], | |
"TYPEREPORT_MP": ["CfdaInfo", "type_report_major_program"], | |
"FINDINGSCOUNT": ["CfdaInfo", "findings_count"], | |
"ELECAUDITSID": ["CfdaInfo", "elec_audits_id"], | |
"DBKEY": ["CfdaInfo", "dbkey"], | |
"AUDITYEAR": ["CfdaInfo", "audit_year"], | |
"QCOSTS2": ["CfdaInfo", "questioned_costs"], | |
"FINDINGS": ["CfdaInfo", "findings"], | |
"EIN": ["CfdaInfo", "auditor_ein"], | |
}, | |
"findings": { | |
"MODIFIEDOPINION": ["Findings", "modified_opinion"], | |
"OTHERNONCOMPLIANCE": ["Findings", "other_non_compliance"], | |
"MATERIALWEAKNESS": ["Findings", "material_weakness"], | |
"PRIORFINDINGREFNUMS": ["Findings", "prior_finding_ref_nums"], | |
"TYPEREQUIREMENT": ["Findings", "type_requirement"], | |
"ELECAUDITSID": ["Findings", "elec_audits_id"], | |
"ELECAUDITFINDINGSID": ["Findings", "elec_audit_findings_id"], | |
"AUDITYEAR": ["Findings", "audit_year"], | |
"DBKEY": ["Findings", "dbkey"], | |
}, | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment