Created
August 13, 2014 20:21
-
-
Save elliottcordo/59d3c90b158331fe6ed7 to your computer and use it in GitHub Desktop.
python-redshift-pandas-statistics
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import logging | |
import psycopg2 | |
import pandas as pd | |
import pandas.io.sql as sqlio | |
import ConfigParser | |
import argparse | |
import statistics | |
from pandas import pivot_table, crosstab | |
from datetime import datetime | |
#------------------------------------------------------------------------------------- | |
#logging wrapper: | |
log_name = datetime.now().strftime("unload-%Y%m%d-%H%M%S.log") | |
logging.basicConfig(filename=log_name,level=logging.DEBUG, format='%(asctime)s %(message)s',filmode='w') | |
def log(message): | |
logging.info(message) | |
print(message) | |
#------------------------------------------------------------------------------------- | |
#config and args | |
log( 'start:' + str(datetime.now())) | |
conf = ConfigParser.ConfigParser() | |
conf.read('unload_by_network.cfg') | |
db_name = conf.get('redshift','db_name') | |
user = conf.get('redshift','user') | |
host = conf.get('redshift','host') | |
password = conf.get('redshift','password') | |
table_name = conf.get('unloader','table_name') | |
columns = conf.get('correlation','columns') | |
#sys arg for network: | |
parser = argparse.ArgumentParser(add_help=False) | |
parser.add_argument("-h", "--help", action='help', help='Command line utility for computing correlations. Common usage: python correlate_cl.py -n MTV BET -o /data1') | |
parser.add_argument('-n','--networks', help ="""enter "ALL" or specific network list delimited by spaces " MTV MTV2 " """, default='all', nargs='+') | |
parser.add_argument('-c','--columns', help ="""enter two columns delimited by space "total_exposed_viewed_program duration") """, default=columns.split(' '), nargs='+') | |
parser.add_argument('-p','--prefix', help ='enter a user defined prefix - what makes this run special?', default='') | |
parser.add_argument('-o','--out', help ="""folder you want to put it (/data1 """, default='/data1') | |
args = parser.parse_args() | |
#compute vars | |
out_path = args.out | |
prefix = args.prefix | |
if len(args.prefix) >= 1: | |
prefix =args.prefix + '_' | |
v1 = args.columns[0] #variable 2 used for correlation | |
v2 = args.columns[1] #variable 2 used for correlation, all subsequent ignored :) | |
network_list = args.networks | |
network_list = [x.upper() for x in network_list] | |
networks = """('""" + """', '""".join(network_list) + """')""" | |
#------------------------------------------------------------------------------------- | |
#sql conn and initial list of networks from redshift | |
rs_conn_str = " dbname='" + db_name + "' user='" + user + "' host='" + host + "' port='5439' password='" + password + "'" | |
conn = psycopg2.connect(rs_conn_str) | |
cur = conn.cursor() #also declare a cursor interface for outer loop | |
if networks[0] == 'ALL': | |
exec_sql = 'select distinct network from ' + table_name + ' order by 1' | |
else: | |
exec_sql = 'select distinct network from ' + table_name + ' where network in ' + networks + ' order by 1' | |
cur.execute(exec_sql) | |
rows = cur.fetchall() | |
log('list of networks done : ' + str(datetime.now()) + '----' + exec_sql) | |
#------------------------------------------------------------------------------------- | |
#now loop | |
for row in rows: | |
network = row[0] | |
#define sql | |
cur_sql = """ | |
with cl as | |
( select row_number() over (partition by 1 order by 1) as event_id, household_id, nielsen_program_code, """ + v1 + ', ' + v2 + """ | |
from """ + table_name + """ where network='""" + network + """') | |
select event_id, household_id, nielsen_program_code, '""" + v1 + """' as metric, """ + v1 + """ as value from cl | |
union all | |
select event_id, household_id, nielsen_program_code, '""" + v2 + """' as metric, """ + v2 + """ as value from cl""" | |
log(network + ' start : ' + str(datetime.now()) + '----' + cur_sql) | |
#------------------------------------------------------------------------------------- | |
#load resultant query directly into dataframe | |
df = sqlio.read_sql(cur_sql , conn) | |
log('df loaded: ' + str(datetime.now())) | |
##------------------------------------------------------------------------------------- | |
#pivot ur data in one magical step! | |
tab = pivot_table(df, values='value', index = ['event_id','household_id'], columns = ['nielsen_program_code','metric']) | |
log('tab loaded: ' + str(datetime.now())) | |
##------------------------------------------------------------------------------------- | |
# Process data | |
file_name = out_path + '/' + prefix + 'cor_' + network.lower() + '_' + v1 + '-' + v2 + '.txt' | |
corr_file = open(file_name, 'w') | |
programs = list(set(df['nielsen_program_code'])) | |
programs.sort() | |
for program in programs: | |
data = tab.dropna(subset = [(program, v1), (program, v2)]) | |
d1 = data[(program, v1)] | |
d2 = data[(program, v2)] | |
count = [x for x in d1 if int(x) != 0] | |
if len(count): | |
corr = statistics.correlation(d1, d2) | |
corr_file.write("%s,%s,%s,%s\n" % (network, program, corr, len(d2))) | |
corr_file.close() | |
log(network + ' done: ' + str(datetime.now()) + '----' + file_name) | |
tab = pd.DataFrame() | |
df = pd.DataFrame() | |
conn.close() #don't need it anymore |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment