Last active
December 24, 2015 22:59
-
-
Save darkblue-b/6876841 to your computer and use it in GitHub Desktop.
imports ACS 2012 1yr extension - rev8: first cut at geodata; imports multiple states as previously; needs more QA
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
##=========================================================== | |
## Import ACS 2012 1 year extension | |
## GPLv3 Oct 2013 -dbb | |
## | |
## Useage: python import_master_20121yr_8.py Sequence_Number_and_Table_Number_Lookup.txt work_dir | |
## assumes an existing database 'census_acs_2012' ; currently writes to public schema | |
## | |
## Note: results in ~1480 tables plus 1478 more per state, so | |
## max_locks_per_transaction must be manually increased | |
## for example | |
## max_locks_per_transaction = 10000 | |
## max_connections = 40 | |
## shared_mem = 2400MB | |
## | |
## the test is to pg_dump the table when completed | |
## | |
# change search_path to destination Postgres schema. | |
schema = ''' | |
SET search_path TO census_acs_20121; | |
''' | |
import psycopg2 | |
import sys,os, re | |
import glob | |
##----------------------------------------------------------- | |
## SQL Strings misc | |
tTableMetaDataSQL = ''' | |
INSERT into acs20121_tables_list | |
VALUES ( %s,%s,%s,%s,%s,%s ); | |
''' | |
tCreate_ACSmtable_SQL_pre = ''' | |
drop table if exists {0} cascade; | |
create table {0} ( | |
''' | |
tCreate_ACSmtable_SQL_post = ''' | |
) inherits (acs20121_base); | |
''' | |
tInsStr = ''' | |
COPY {0} from e20121ca{1}000.txt with CSV delimiter E','; | |
''' | |
tColsRecSQL = ''' | |
INSERT into acs20121_table_cols_list VALUES ( %s,%s,%s ); | |
''' | |
#-------------- | |
tTablesKey_setup = ''' | |
drop table if exists acs20121_tables_list cascade; | |
create table acs20121_tables_list ( | |
table_indb_name text, | |
table_id text, | |
table_seq_num text, | |
table_title text, | |
table_subject_area text, | |
table_universe text, | |
pkey serial primary key ); | |
''' | |
tTableCols_setup = ''' | |
drop table if exists acs20121_table_cols_list cascade; | |
create table acs20121_table_cols_list ( | |
table_indb_name text, | |
table_col text, | |
table_col_orig text, | |
pkey serial primary key ); | |
''' | |
##------------------------------------------------------ | |
##-- data loading SQL | |
tBaseRaw_setup = ''' | |
drop table if exists acs20121_base cascade; | |
CREATE TABLE acs20121_base ( | |
pkey serial PRIMARY KEY, | |
seq_metadata_key integer | |
); | |
''' | |
tSeqMetaData_setup = ''' | |
drop table if exists acs20121_seq_metadata cascade; | |
CREATE TABLE acs20121_seq_metadata ( | |
pkey serial PRIMARY KEY, | |
e_filename text, | |
e_file_id text, | |
e_file_type text, | |
e_stusab text, | |
e_chariter text, | |
e_sequence text, | |
e_logrecno text ); | |
''' | |
tGet_mtables = ''' | |
select table_name from information_schema.tables | |
where table_name ~ '^acs_mtable_' and table_name ~ '_raw$' and table_schema = 'public'; | |
''' | |
tMakeChildTableSQL = ''' | |
create table {0}_{1} ( state_name char(2) ) | |
inherits ({0}); | |
''' | |
tMakeChildGeoTableSQL = ''' | |
create table acs20121_geo_defs_{0} ( state_name char(2) ) | |
inherits (acs20121_geo_defs); | |
''' | |
tGetInDB_TablesSQL = ''' | |
SELECT table_indb_name, table_id | |
FROM acs20121_tables_list | |
WHERE table_seq_num = '{0}'; | |
''' | |
tGetColumnsForTableSQL = ''' | |
SELECT column_name FROM information_schema.columns | |
WHERE table_name = '{0}' and column_name ~ '^mtbl_' and table_schema='public'; | |
''' | |
tInsDataSQL = ''' | |
INSERT into {0} VALUES ( {1} ); | |
''' | |
##-- note the following colmn defs are lowercased, | |
##-- and name -> geo_name | |
tGeoTable_setup = ''' | |
drop table if exists acs20121_geo_defs cascade; | |
create table acs20121_geo_defs ( | |
fileid text, stusab text, sumlevel text, component text, | |
logrecno text, us text, region text, division text, statece text, | |
state text, county text, cousub text, place text, tract text, blkgrp text, | |
concit text, aianhh text, aianhhfp text, aihhtli text, aitsce text, | |
aits text, anrc text, cbsa text, csa text, metdiv text, macc text, | |
memi text, necta text, cnecta text, nectadiv text, ua text, blank_0 text, | |
cdcurr text, sldu text, sldl text, blank_1 text, blank_2 text, | |
zcta5 text, submcd text, sdelm text, sdsec text, sduni text, ur text, | |
pci text, blank_3 text, blank_4 text, puma5 text, blank_5 text, | |
geoid text, | |
geo_name text, bttr text, btbg text, blank_6 text | |
); | |
alter table acs20121_geo_defs add PRIMARY KEY (logrecno); | |
''' | |
tExampleSQLStr = ''' | |
SELECT | |
table_indb_name, | |
table_id, table_seq_num, | |
table_title, table_subject_area, | |
table_universe | |
FROM acs20121_tables_list | |
WHERE | |
table_indb_name = 'acs_mtable_2_raw'; | |
SELECT table_col, table_col_orig | |
FROM acs20121_table_cols_list | |
where | |
table_indb_name = 'acs_mtable_2_raw' | |
order by pkey; | |
-- will show 49 named columns for table_id B01001 | |
-- a population table on age | |
-- | |
-- adding the contents of col_1 and col_25 should equal col_0 | |
-- in an example row 59368 + 61909 = 121277 check | |
''' | |
##-------------------------------------------------------- | |
## some globals | |
gTableNum = 0 | |
##-------------------------------------------------------- | |
def do_process_lkup_defs( inFileName ): | |
''' build the primary table definitions from file ''' | |
try: | |
curs.execute( tBaseRaw_setup ) | |
curs.execute( tTablesKey_setup ) | |
curs.execute( tTableCols_setup ) | |
curs.execute( tGeoTable_setup ) | |
curs.execute( tSeqMetaData_setup ) | |
except Exception, E: | |
print str(E) | |
exit(0) | |
#inFileBase = inFileName[0:-4] | |
try: | |
tFile = open(inFileName,'r') | |
except Exception, E: | |
print str(E) | |
exit(0) | |
## read header line | |
tLine = tFile.readline() | |
tA = tLine.split(',') | |
tFirstRec = True | |
tColTitleaA = [] | |
tCommitCnt = 0 ## slight speedup, dont commit each loop | |
for tLine in tFile: | |
tLine = tLine.rstrip() | |
## unfortunately, quoted strings w/ commas means no simple split() | |
tA = do_get_acs_line( tLine ) | |
#tA = tLine.split(',') | |
## first time in table desc | |
if tA[3] == ' ' and tFirstRec == True: | |
tTableID = tA[1] | |
tTableSeqID = tA[2] | |
tTableTitle = tA[7] | |
tSubjArea = tA[8] | |
tFirstRec = False | |
continue | |
if tA[3] == ' ' and tFirstRec == False: | |
if tA[4] == ' ': | |
tUniverseStr = do_clean_univ_str( tA[7]) | |
continue | |
else: | |
## new record, wrap up the old record | |
## tUniverseStr,'|',tSubjArea,'|',tTableTitle tColTitleaA | |
do_emit_table( tTableID, tTableSeqID, tUniverseStr, tSubjArea, tTableTitle, tColTitleaA ) | |
if (tCommitCnt % 12) == 0 : | |
conn.commit() | |
tCommitCnt += 1 | |
tColTitleaA = [] | |
tTableID = tA[1] | |
tTableSeqID = tA[2] | |
tTableTitle = tA[7] | |
tSubjArea = tA[8] | |
continue | |
tColTitleaA.append( tA[7] ) | |
conn.commit() | |
return | |
##-------------------------------------------------------- | |
def do_emit_table( inTableID, inSeqID, inUniv, inSubj, inTableTitle, inColA ): | |
''' after a table desc is parsed, func to act on the desc | |
''' | |
global gTableNum | |
tTableNameStr = 'acs_mtable_' + str( gTableNum ) + '_raw' | |
try: | |
curs.execute( tTableMetaDataSQL, [ tTableNameStr,inTableID,inSeqID,inTableTitle,inSubj,inUniv ] ) | |
except Exception, E: | |
print str(E) | |
exit(1) | |
tCrStr = tCreate_ACSmtable_SQL_pre | |
tColNum = 0 | |
for tCol in inColA: | |
tColName = 'mtbl_' + str(gTableNum) + '_col_' + str(tColNum) | |
tColNum += 1 | |
curs.execute( tColsRecSQL, [ tTableNameStr, tColName, tCol ] ) | |
tCrStr += tColName + ' text,\n' | |
tCrStr = tCrStr.rstrip( ',\n' ) | |
tCrStr = tCrStr + tCreate_ACSmtable_SQL_post | |
try: | |
curs.execute( tCrStr.format( tTableNameStr ) ) | |
except Exception, E: | |
print str(E) | |
exit(1) | |
gTableNum += 1 | |
return | |
##------------------------------------------------------ | |
def do_clean_univ_str( inStr ): | |
''' Not Yet Implemented - docstring ''' | |
#TODO get rid of annoying universe prefix, watching for anomolies | |
## it seems that there is an implied major/minor universe, too hmm | |
return inStr | |
def do_get_acs_line( inStr ): | |
''' read a single line from a e*.txt sequence file, | |
split it into an array of fields, return the array | |
''' | |
tInQuotStr = False | |
tCurStr = '' | |
resA = [] | |
for tChar in inStr: | |
if tChar == '"': | |
if tInQuotStr: | |
#resA.append( tCurStr) | |
tInQuotStr = False | |
#tCurStr = '' | |
else: | |
tInQuotStr = True | |
elif tChar == ',': | |
if tInQuotStr: | |
tCurStr += tChar | |
else: | |
resA.append( tCurStr ) | |
tCurStr = '' | |
else: | |
tCurStr += tChar | |
#resA = inStr.split(',') | |
resA.append( tCurStr ) | |
return resA | |
##=================================================================== | |
## begin per-file sequence reading and loading | |
##------------------------------------------------------ | |
## skip the multi-sequence | |
## see TechDoc Appendix C.7 for multi-sequence tables | |
cNotYetSequences = [ | |
'0116','0117', | |
'0122','0123', | |
'0133','0134','0135', | |
'0136','0137','0138', | |
'0139','0140','0141', | |
'0142','0143','0144', | |
'0145','0146','0147', | |
'0148','0149','0150' | |
] | |
#--- | |
def do_process_dir( inDirName ): | |
''' assume all states are unzipped in this directory.. | |
''' | |
try: | |
os.chdir( inDirName ) | |
except Exception, E: | |
print 'ERR working data directory ' + inDirName | |
print str(E) | |
exit(0) | |
## basic defs of import file names for this release | |
## see ACS_2012_SF_Tech_Doc.pdf page 12 | |
## | |
tPrefixStr = 'e20121' | |
tStateStr = '00' | |
tSeqStr = '0086' | |
tCIter = '000' | |
tSuffixStr = '.txt' | |
## TMP array of states in this directory - CAN CHANGE | |
tStatesA = [] | |
for fileStr in glob.glob("e*.txt"): | |
if not ( | |
fileStr[0:6] == tPrefixStr and | |
fileStr[12:15] == tCIter and | |
fileStr[15:19] == tSuffixStr | |
): | |
print "ERROR: filename not in format in dir {0}".format(inDirName) | |
exit(0) | |
tStateStr = fileStr[6:8] | |
tSeqStr = fileStr[8:12] | |
if tSeqStr in cNotYetSequences: | |
print 'skipped file ' + fileStr | |
continue | |
if not (tStateStr in tStatesA): | |
#print 'tStatesA = ' + str(tStatesA) + ' ;fileStr = ' + fileStr | |
do_make_geo_child_table( tStateStr ) | |
do_make_states_child_tables( tStateStr ) | |
tStatesA.append( tStateStr ) | |
do_process_input_file( fileStr, tStateStr, tSeqStr ) | |
return | |
##------------------------------------------------------------------- | |
def do_make_geo_child_table( inStateStr ): | |
try: | |
curs.execute( tMakeChildGeoTableSQL.format( inStateStr ) ) | |
except Exception, E: | |
print str(E) | |
exit(0) | |
tInsGeoSQL = ''' | |
COPY acs20121_geo_defs_{0} ( | |
fileid, stusab, sumlevel, component, | |
logrecno, us, region, division, statece, | |
state, county, cousub, place, tract, blkgrp, | |
concit, aianhh, aianhhfp, aihhtli, aitsce, | |
aits, anrc, cbsa, csa, metdiv, macc, | |
memi, necta, cnecta, nectadiv, ua, blank_0, | |
cdcurr, sldu, sldl, blank_1, blank_2, | |
zcta5, submcd, sdelm, sdsec, sduni, ur, | |
pci, blank_3, blank_4, puma5, blank_5, | |
geoid, geo_name, bttr, btbg, blank_6) from | |
'{1}/g20121{0}.csv' with CSV delimiter E','; | |
''' | |
try: | |
curs.execute( tInsGeoSQL.format( inStateStr, inDataDir) ) | |
except Exception, E: | |
print str(E) | |
exit(0) | |
return | |
##------------------------------------------------------------------- | |
def do_make_states_child_tables( inStateStr ): | |
try: | |
curs.execute( tGet_mtables ) | |
except Exception, E: | |
print str(E) | |
exit(0) | |
if len(inStateStr) > 2: | |
print 'ERR: do_make_states_child_tables() inStateStr = ' + inStateStr | |
return | |
## array of tables for this sequence, per line | |
tResTablesTA = curs.fetchall() | |
tIndCnt = 0 | |
for tTableT in tResTablesTA: | |
tIndCnt += 1 | |
try: | |
curs.execute( tMakeChildTableSQL.format( tTableT[0], inStateStr ) ) | |
except Exception, E: | |
print str(E) | |
exit(0) | |
if (tIndCnt % 100) == 0: | |
conn.commit() | |
conn.commit() | |
return | |
##------------------------------------------------------------------- | |
gErrTablesInfoA = [] | |
def do_process_input_file( inFileStr, inStateStr, inSeqStr ): | |
''' create all tables and populate, for one e*txt sequence file | |
''' | |
try: | |
tDF = open( inFileStr, 'r' ) | |
except Exception, E: | |
print 'ERR in do_process_input_file: file={0} state={1} seq={2}'.format( inFileStr, inStateStr, inSeqStr) | |
print str(E) | |
return | |
## -- | |
global gErrTablesInfoA | |
try: | |
curs.execute( tGetInDB_TablesSQL.format( inSeqStr )) | |
except Exception, E: | |
print str(E) | |
exit(0) | |
## array of tables for this sequence, per line | |
tResTablesA = curs.fetchall() | |
tProcessedLineCnt = 0 | |
for tLine in tDF: | |
##TODO COPY a line into the right table | |
##NOTE: I really hope a simple split by comma will do | |
tProcessedLineCnt += 1 | |
tDataA = tLine.split(',') | |
#print ' ' + str(len(tA)) + ' flds for ' + str(len(tLine)) + ' chars' | |
## record the meta-data for this sequence, | |
## see ACS_2012_SF_Tech_Doc.pdf sec. 2.5 | |
## FILEID,FILETYPE,STUSAB,CHARITER,SEQUENCE,LOGRECNO | |
tSeqMetaDataA = [] | |
tSeqMetaDataA.append( inFileStr ) | |
tSeqMetaDataA.append( tDataA[0] ) | |
tSeqMetaDataA.append( tDataA[1] ) | |
tSeqMetaDataA.append( tDataA[2] ) | |
tSeqMetaDataA.append( tDataA[3] ) | |
tSeqMetaDataA.append( tDataA[4] ) | |
tSeqMetaDataA.append( tDataA[5] ) | |
tSQL = ''' | |
INSERT into acs20121_seq_metadata (e_filename,e_file_id,e_file_type,e_stusab,e_chariter,e_sequence,e_logrecno) | |
VALUES (%s,%s,%s,%s,%s,%s,%s) returning pkey; | |
''' | |
try: | |
curs.execute( tSQL, tSeqMetaDataA ) | |
#tFStr = '%s,%s,%s,%s,%s,%s,%s' | |
#curs.execute( tInsDataSQL.format( 'acs20121_seq_metadata', tFStr ), tSeqMetaDataA ) | |
except Exception, E: | |
print str(E) | |
exit(0) | |
tResSeqMD_pkey = curs.fetchone()[0] | |
## LOOP each dst table for this sequence line | |
tInLineOffset = 0 + 6 | |
for tElem in tResTablesA: | |
#print tElem | |
tCurTable = tElem[0] | |
## note that this could be done once, TODO NEXT | |
try: | |
curs.execute( tGetColumnsForTableSQL.format( tCurTable)) | |
except Exception, E: | |
print str(E) | |
exit(0) | |
tResColsA = curs.fetchall() | |
##-- get the len of the array | |
tColsToCopyCnt = len(tResColsA) | |
#print tCurTable + ' -- ' + str(tColsToCopyCnt) | |
## copy the slice to the right table | |
## starting at col[6], grab data, make the format string, INSERT, repeat | |
tColNum = 0 | |
tColsA = [] | |
## preflight, may need more attention | |
try: | |
tInsA = [] | |
tDstTable = tCurTable + '_' + inStateStr | |
tInsStrPre = 'INSERT into ' + tDstTable + '(seq_metadata_key,' | |
tInsStrMid = ' VALUES (' + str( tResSeqMD_pkey) +',' | |
tIFrmtStr_0 = '' | |
tIFrmtStr_1 = '' | |
for tInd in xrange( tColsToCopyCnt): | |
tIFrmtStr_0 += tResColsA[tInd][0] + ',' | |
tIFrmtStr_1 += '%s,' | |
tInsA.append( tDataA[ tInLineOffset ] ) | |
tInLineOffset += 1 | |
#tInsStr = tInsStr.rstrip(',') | |
tInsA.append( inStateStr ) | |
tIFrmtStr_0 += 'state_name)' | |
tIFrmtStr_1 += '%s' | |
tIFrmtStr_1 += ');' | |
try: | |
tInsStr = tInsStrPre + tIFrmtStr_0 + tInsStrMid + tIFrmtStr_1 | |
curs.execute( tInsStr, tInsA ) | |
conn.commit() | |
except Exception, E: | |
print str(E) | |
exit(0) | |
except Exception, E: | |
#print 'ERR: '+tCurTable+' '+' line '+str(tProcessedLineCnt) | |
tErrA = [] | |
tErrA.append( inFileStr) | |
tErrA.append( tCurTable) | |
tErrA.append( tProcessedLineCnt) | |
gErrTablesInfoA.append( tErrA) | |
continue | |
#print tCurTable + '--' + tColsA | |
#print '--' | |
conn.commit() | |
tDF.close() | |
return | |
##=========================================================================== | |
## main - just do it | |
try: | |
# change dbname to destination Postgres database. | |
conn = psycopg2.connect( "dbname=census_acs_2012") | |
conn.set_client_encoding( 'LATIN1' ) | |
curs = conn.cursor() | |
except Exception, E: | |
print str(E) | |
exit(0) | |
##----------------------- | |
inLkupDefsFileName = None | |
inDataDir = None | |
if len( sys.argv ) > 1: | |
inLkupDefsFileName = sys.argv[1] | |
else: | |
#inFileName = 'Sequence_Number_and_Table_Number_Lookup_m.txt' | |
inLkupDefsFileName = 'Sequence_Number_and_Table_Number_Lookup.txt' | |
if len(sys.argv ) > 2: | |
inDataDir = sys.argv[2] | |
else: | |
inDataDir = 'acs2012_1yr_work/work_dir' | |
## TODO more robust args checking | |
if inLkupDefsFileName is None or inDataDir is None: | |
print 'USAGE: {0} Sequence_Number_and_Table_Number_Lookup.txt data_dir'.format(sys.argv[0]) | |
exit(0) | |
##------------------------------------------- | |
do_process_lkup_defs( inLkupDefsFileName ) | |
do_process_dir( inDataDir ) | |
#print str(gErrTablesInfoA) | |
##------------------------------------------- | |
#curs.execute( 'VACUUM ANALYZE;') | |
conn.close() | |
exit(0) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment