Created
January 21, 2021 18:38
-
-
Save vipul43/149a37d9900d52d194fbdcd3f067a54c to your computer and use it in GitHub Desktop.
python script to load csv data into mysql database
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import mysql.connector as msql | |
from mysql.connector import Error | |
import datetime | |
accountsData = pd.read_csv('/Users/vipul/Downloads/accounts_master.csv') | |
accountsData.drop(accountsData.columns[[7, 8]], axis = 1, inplace = True) | |
accountsData.dropna(subset = ["Account Name", "IP Domain", "SFDC Account ID"], inplace=True) | |
unravelData = pd.read_csv('/Users/vipul/Downloads/sir_unravel_v2.csv') | |
unravelData.drop_duplicates(subset ="OpenCX Buyer Id", keep = 'first', inplace = True) | |
def displayData(): | |
print("accounts Data") | |
print(accountsData) | |
print("unravel Data") | |
print(unravelData) | |
def connectToSql(): | |
try: | |
conn = msql.connect(host='localhost', database='project', user='root', password='password', auth_plugin='mysql_native_password') | |
if conn.is_connected(): | |
cursor = conn.cursor() | |
cursor.execute("select database();") | |
db = cursor.fetchone()[0] | |
print("Connection Made To Database: ", db) | |
return conn, cursor | |
except Error as e: | |
print("Error while connecting to MySQL", e) | |
def disconnectToSql(conn, cursor): | |
if (conn.is_connected()): | |
cursor.close() | |
conn.close() | |
print("MySQL connection is closed") | |
def matchAndGetBuyerData(account_domain_source): | |
if(account_domain_source=="bluekai"): | |
return "bluekai" | |
elif(account_domain_source=="bombora"): | |
return "bombora" | |
elif(account_domain_source=="kickfire"): | |
return "reverse_ip_lookup" | |
elif(account_domain_source=="ip"): | |
return "ip_range_match" | |
else: | |
return account_domain_source | |
def createEntryInAccountTableDBAndPD(conn, cursor, uuid, name, ip_domain, city, state, country, salesforce_id): | |
sql = "INSERT INTO project.account VALUES (%s,%s,%s,%s,%s,%s,%s,%s)" | |
cursor.execute(sql, tuple([uuid, name, ip_domain, city, state, country, "", salesforce_id])) | |
conn.commit() | |
accountsData.loc[len(accountsData)] = [name, ip_domain, city, state, country, "", salesforce_id] | |
def setNullablesToEmptyAccount(row): | |
Account_Name, IP_Domain, IP_Geo_City, IP_Geo_State, IP_Geo_Country, Type, SFDC_Account_ID = "", "", "", "", "", "", "" | |
if(pd.notnull(row["Account Name"])): | |
Account_Name = row["Account Name"] | |
if(pd.notnull(row["IP Domain"])): | |
IP_Domain = row["IP Domain"] | |
if(pd.notnull(row["IP Geo City"])): | |
IP_Geo_City = row["IP Geo City"] | |
if(pd.notnull(row["IP Geo State"])): | |
IP_Geo_State = row["IP Geo State"] | |
if(pd.notnull(row["IP Geo Country"])): | |
IP_Geo_Country = row["IP Geo Country"] | |
if(pd.notnull(row["Type"])): | |
Type = row["Type"] | |
if(pd.notnull(row["SFDC Account ID"])): | |
SFDC_Account_ID = row["SFDC Account ID"] | |
return (Account_Name, IP_Domain, IP_Geo_City, IP_Geo_State, IP_Geo_Country, Type, SFDC_Account_ID) | |
def getUUIDOfRow(conn, cursor, row): | |
sql = "SELECT * FROM project.account AS t WHERE t.name = %s AND t.ip_domain = %s AND t.city = %s AND t.state = %s AND t.country = %s AND t.type = %s AND t.salesforce_id = %s" | |
cursor.execute(sql, setNullablesToEmptyAccount(row)) | |
return cursor.fetchone()[0] | |
def getNewUUID(conn, cursor): | |
cursor.execute("select uuid();") | |
return cursor.fetchone()[0] | |
def replaceAndGetBuyerData(conn, cursor, account_name, account_ip_domain, creative_name, website_page_url, geo_city, geo_state, geo_country, external_account_id): | |
for i,row in accountsData.iterrows(): | |
if(pd.notnull(row["SFDC Account ID"]) and row["SFDC Account ID"] in external_account_id): | |
print("Primary Match") | |
return getUUIDOfRow(conn, cursor, row) | |
li = [] | |
for i,row in accountsData.iterrows(): | |
if(pd.notnull(row["Account Name"]) and row["Account Name"]==account_name and pd.notnull(row["IP Domain"]) and row["IP Domain"]==account_ip_domain): | |
li.append(getUUIDOfRow(conn, cursor, row)) | |
if(len(li)>1): | |
for i,row in accountsData.iterrows(): | |
Type = row["Type"] | |
if(pd.notnull(row["Type"]) and Type.lower() in creative_name.lower() or Type.lower() in website_page_url.lower()): | |
print("Secondary Match") | |
return getUUIDOfRow(conn, cursor, row) | |
elif(len(li)==1): | |
return li[0] | |
else: | |
new_uuid = getNewUUID(conn, cursor) | |
createEntryInAccountTableDBAndPD(conn, cursor, new_uuid, account_name, account_ip_domain, geo_city, geo_state, geo_country, external_account_id) | |
print("Tertiary Match") | |
return new_uuid | |
def setNullablesToEmptyBuyer(row): | |
OpenCX_Buyer_Id,Account_Name,Account_IP_Domain,Creative_Name,Website_Page_URL,Geo_City,Geo_State,Geo_Country,External_Account_Id,Account_Domain_Source, Job_Level, Job_Function = "","","","","","","","","","","","" | |
if(pd.notnull(row["OpenCX Buyer Id"])): | |
OpenCX_Buyer_Id = row["OpenCX Buyer Id"] | |
if(pd.notnull(row["Account Name"])): | |
Account_Name = row["Account Name"] | |
if(pd.notnull(row["Account IP Domain"])): | |
Account_IP_Domain = row["Account IP Domain"] | |
if(pd.notnull(row["Creative Name"])): | |
Creative_Name = row["Creative Name"] | |
if(pd.notnull(row["Website Page URL"])): | |
Website_Page_URL = row["Website Page URL"] | |
if(pd.notnull(row["Geo City"])): | |
Geo_City = row["Geo City"] | |
if(pd.notnull(row["Geo State"])): | |
Geo_State = row["Geo State"] | |
if(pd.notnull(row["Geo Country"])): | |
Geo_Country = row["Geo Country"] | |
if(pd.notnull(row["External Account Id"])): | |
External_Account_Id = row["External Account Id"] | |
if(pd.notnull(row["Account Domain Source"])): | |
Account_Domain_Source = row["Account Domain Source"] | |
if(pd.notnull(row["Job Level"])): | |
Job_Level = row["Job Level"] | |
if(pd.notnull(row["Job Function"])): | |
Job_Function = row["Job Function"] | |
return OpenCX_Buyer_Id, Account_Name, Account_IP_Domain, Creative_Name, Website_Page_URL, Geo_City, Geo_State, Geo_Country, External_Account_Id, Account_Domain_Source, Job_Level, Job_Function | |
def getBuyerData(conn, cursor, row): | |
OpenCX_Buyer_Id, Account_Name, Account_IP_Domain, Creative_Name, Website_Page_URL, Geo_City, Geo_State, Geo_Country, External_Account_Id, Account_Domain_Source, Job_Level, Job_Function = setNullablesToEmptyBuyer(row) | |
return tuple([OpenCX_Buyer_Id, replaceAndGetBuyerData(conn,cursor,Account_Name,Account_IP_Domain,Creative_Name,Website_Page_URL,Geo_City,Geo_State,Geo_Country,External_Account_Id), Geo_City, Geo_State, Geo_Country, matchAndGetBuyerData(Account_Domain_Source), Job_Level, Job_Function]) | |
def matchAndGetActivityData(Activity_Type, Creative_Name, Website_Page_URL): | |
if(Activity_Type=="Form Fill"): | |
return Website_Page_URL | |
if(Activity_Type=="Website Visit"): | |
return Website_Page_URL | |
if(Activity_Type=="Ad Click"): | |
return Creative_Name | |
if(Activity_Type=="Live Chat"): | |
return Website_Page_URL | |
return "" | |
def convertUETToDT(conn, cursor, Date_and_Time_unix_epoch_time_format): | |
cursor.execute(f"select from_unixtime({Date_and_Time_unix_epoch_time_format/1000});") | |
return cursor.fetchone()[0] | |
def setNullablesToEmptyBuyerTable(row): | |
id, account_id, city, state, country, source, job_level, job_function = "", "", "", "", "", "", "", "" | |
if(pd.notnull(row["id"])): | |
id = row["id"] | |
if(pd.notnull(row["account_id"])): | |
account_id = row["account_id"] | |
if(pd.notnull(row["city"])): | |
city = row["city"] | |
if(pd.notnull(row["state"])): | |
state = row["state"] | |
if(pd.notnull(row["country"])): | |
country = row["country"] | |
if(pd.notnull(row["source"])): | |
source = row["source"] | |
if(pd.notnull(row["job_level"])): | |
job_level = row["job_level"] | |
if(pd.notnull(row["job_function"])): | |
job_function = row["job_function"] | |
return tuple([id, account_id, city, state, country, source, job_level, job_function]) | |
def replaceAndGetActivityData(buyerData, Open_CX_Buyer_Id): | |
for i,row in buyerData.iterrows(): | |
if(pd.notnull(row["id"]) and row["id"]==Open_CX_Buyer_Id): | |
return row["id"] | |
return "" | |
def setNullablesToEmptyActivity(row): | |
Open_CX_Buyer_Id, Date_and_Time_unix_epoch_time_format, Activity_Type, Creative_Name, Website_Page_URL = "", "", "", "", "" | |
if(pd.notnull(row["OpenCX Buyer Id"])): | |
Open_CX_Buyer_Id = row["OpenCX Buyer Id"] | |
if(pd.notnull(row["Date and Time (unix epoch time format)"])): | |
Date_and_Time_unix_epoch_time_format = row["Date and Time (unix epoch time format)"] | |
if(pd.notnull(row["Activity Type"])): | |
Activity_Type = row["Activity Type"] | |
if(pd.notnull(row["Creative Name"])): | |
Creative_Name = row["Creative Name"] | |
if(pd.notnull(row["Website Page URL"])): | |
Website_Page_URL = row["Website Page URL"] | |
return Open_CX_Buyer_Id, Date_and_Time_unix_epoch_time_format, Activity_Type, Creative_Name, Website_Page_URL | |
def getActivityData(conn, cursor, buyerData, row): | |
Open_CX_Buyer_Id, Date_and_Time_unix_epoch_time_format, Activity_Type, Creative_Name, Website_Page_URL = setNullablesToEmptyActivity(row) | |
val = tuple([getNewUUID(conn, cursor), Open_CX_Buyer_Id, convertUETToDT(conn, cursor, Date_and_Time_unix_epoch_time_format), Activity_Type, matchAndGetActivityData(Activity_Type, Creative_Name, Website_Page_URL)]) | |
print(val) | |
return val | |
def main(): | |
displayData() | |
conn, cursor = connectToSql() | |
cursor.execute('DROP TABLE IF EXISTS buyer;') | |
cursor.execute("CREATE TABLE buyer ( id VARCHAR(255) PRIMARY KEY, account_id VARCHAR(255), city VARCHAR(255), state VARCHAR(255), country VARCHAR(255), source VARCHAR(255), job_level VARCHAR(255), job_function VARCHAR(255), FOREIGN KEY (account_id) REFERENCES account(id));") | |
line = 1 | |
for i,row in unravelData.iterrows(): | |
sql = "INSERT INTO project.buyer VALUES (%s,%s,%s,%s,%s,%s,%s,%s)" | |
cursor.execute(sql, getBuyerData(conn, cursor, row)) | |
print("row in buyer table: " + str(line)) | |
line+=1 | |
conn.commit() | |
cursor.execute('DROP TABLE IF EXISTS activity;') | |
cursor.execute("CREATE TABLE activity ( id VARCHAR(255) NOT NULL DEFAULT(uuid()) PRIMARY KEY, buyer_id VARCHAR(255) NOT NULL, datetime datetime, activity_type VARCHAR(255), details VARCHAR(500), FOREIGN KEY (buyer_id) REFERENCES buyer(id));") | |
cursor.execute("SELECT * FROM project.buyer") | |
table_rows = cursor.fetchall() | |
buyerData = pd.DataFrame(table_rows, columns=cursor.column_names) | |
print(buyerData) | |
line = 1 | |
complete_unravelData = pd.read_csv('/Users/vipul/Downloads/sir_unravel_v2.csv') | |
for i,row in complete_unravelData.iterrows(): | |
sql = "INSERT INTO project.activity VALUES (%s,%s,%s,%s,%s)" | |
cursor.execute(sql, getActivityData(conn, cursor, buyerData, row)) | |
print("row in buyer table: " + str(line)) | |
line+=1 | |
conn.commit() | |
disconnectToSql(conn, cursor) | |
pass | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Consider using https://github.com/FluffyDietEngine/files-2-db. Single command does all these for you.