Skip to content

Instantly share code, notes, and snippets.

@vipul43
Created January 21, 2021 18:38
Show Gist options
  • Save vipul43/149a37d9900d52d194fbdcd3f067a54c to your computer and use it in GitHub Desktop.
Save vipul43/149a37d9900d52d194fbdcd3f067a54c to your computer and use it in GitHub Desktop.
python script to load csv data into mysql database
import pandas as pd
import mysql.connector as msql
from mysql.connector import Error
import datetime
accountsData = pd.read_csv('/Users/vipul/Downloads/accounts_master.csv')
accountsData.drop(accountsData.columns[[7, 8]], axis = 1, inplace = True)
accountsData.dropna(subset = ["Account Name", "IP Domain", "SFDC Account ID"], inplace=True)
unravelData = pd.read_csv('/Users/vipul/Downloads/sir_unravel_v2.csv')
unravelData.drop_duplicates(subset ="OpenCX Buyer Id", keep = 'first', inplace = True)
def displayData():
print("accounts Data")
print(accountsData)
print("unravel Data")
print(unravelData)
def connectToSql():
try:
conn = msql.connect(host='localhost', database='project', user='root', password='password', auth_plugin='mysql_native_password')
if conn.is_connected():
cursor = conn.cursor()
cursor.execute("select database();")
db = cursor.fetchone()[0]
print("Connection Made To Database: ", db)
return conn, cursor
except Error as e:
print("Error while connecting to MySQL", e)
def disconnectToSql(conn, cursor):
if (conn.is_connected()):
cursor.close()
conn.close()
print("MySQL connection is closed")
def matchAndGetBuyerData(account_domain_source):
if(account_domain_source=="bluekai"):
return "bluekai"
elif(account_domain_source=="bombora"):
return "bombora"
elif(account_domain_source=="kickfire"):
return "reverse_ip_lookup"
elif(account_domain_source=="ip"):
return "ip_range_match"
else:
return account_domain_source
def createEntryInAccountTableDBAndPD(conn, cursor, uuid, name, ip_domain, city, state, country, salesforce_id):
sql = "INSERT INTO project.account VALUES (%s,%s,%s,%s,%s,%s,%s,%s)"
cursor.execute(sql, tuple([uuid, name, ip_domain, city, state, country, "", salesforce_id]))
conn.commit()
accountsData.loc[len(accountsData)] = [name, ip_domain, city, state, country, "", salesforce_id]
def setNullablesToEmptyAccount(row):
Account_Name, IP_Domain, IP_Geo_City, IP_Geo_State, IP_Geo_Country, Type, SFDC_Account_ID = "", "", "", "", "", "", ""
if(pd.notnull(row["Account Name"])):
Account_Name = row["Account Name"]
if(pd.notnull(row["IP Domain"])):
IP_Domain = row["IP Domain"]
if(pd.notnull(row["IP Geo City"])):
IP_Geo_City = row["IP Geo City"]
if(pd.notnull(row["IP Geo State"])):
IP_Geo_State = row["IP Geo State"]
if(pd.notnull(row["IP Geo Country"])):
IP_Geo_Country = row["IP Geo Country"]
if(pd.notnull(row["Type"])):
Type = row["Type"]
if(pd.notnull(row["SFDC Account ID"])):
SFDC_Account_ID = row["SFDC Account ID"]
return (Account_Name, IP_Domain, IP_Geo_City, IP_Geo_State, IP_Geo_Country, Type, SFDC_Account_ID)
def getUUIDOfRow(conn, cursor, row):
sql = "SELECT * FROM project.account AS t WHERE t.name = %s AND t.ip_domain = %s AND t.city = %s AND t.state = %s AND t.country = %s AND t.type = %s AND t.salesforce_id = %s"
cursor.execute(sql, setNullablesToEmptyAccount(row))
return cursor.fetchone()[0]
def getNewUUID(conn, cursor):
cursor.execute("select uuid();")
return cursor.fetchone()[0]
def replaceAndGetBuyerData(conn, cursor, account_name, account_ip_domain, creative_name, website_page_url, geo_city, geo_state, geo_country, external_account_id):
for i,row in accountsData.iterrows():
if(pd.notnull(row["SFDC Account ID"]) and row["SFDC Account ID"] in external_account_id):
print("Primary Match")
return getUUIDOfRow(conn, cursor, row)
li = []
for i,row in accountsData.iterrows():
if(pd.notnull(row["Account Name"]) and row["Account Name"]==account_name and pd.notnull(row["IP Domain"]) and row["IP Domain"]==account_ip_domain):
li.append(getUUIDOfRow(conn, cursor, row))
if(len(li)>1):
for i,row in accountsData.iterrows():
Type = row["Type"]
if(pd.notnull(row["Type"]) and Type.lower() in creative_name.lower() or Type.lower() in website_page_url.lower()):
print("Secondary Match")
return getUUIDOfRow(conn, cursor, row)
elif(len(li)==1):
return li[0]
else:
new_uuid = getNewUUID(conn, cursor)
createEntryInAccountTableDBAndPD(conn, cursor, new_uuid, account_name, account_ip_domain, geo_city, geo_state, geo_country, external_account_id)
print("Tertiary Match")
return new_uuid
def setNullablesToEmptyBuyer(row):
OpenCX_Buyer_Id,Account_Name,Account_IP_Domain,Creative_Name,Website_Page_URL,Geo_City,Geo_State,Geo_Country,External_Account_Id,Account_Domain_Source, Job_Level, Job_Function = "","","","","","","","","","","",""
if(pd.notnull(row["OpenCX Buyer Id"])):
OpenCX_Buyer_Id = row["OpenCX Buyer Id"]
if(pd.notnull(row["Account Name"])):
Account_Name = row["Account Name"]
if(pd.notnull(row["Account IP Domain"])):
Account_IP_Domain = row["Account IP Domain"]
if(pd.notnull(row["Creative Name"])):
Creative_Name = row["Creative Name"]
if(pd.notnull(row["Website Page URL"])):
Website_Page_URL = row["Website Page URL"]
if(pd.notnull(row["Geo City"])):
Geo_City = row["Geo City"]
if(pd.notnull(row["Geo State"])):
Geo_State = row["Geo State"]
if(pd.notnull(row["Geo Country"])):
Geo_Country = row["Geo Country"]
if(pd.notnull(row["External Account Id"])):
External_Account_Id = row["External Account Id"]
if(pd.notnull(row["Account Domain Source"])):
Account_Domain_Source = row["Account Domain Source"]
if(pd.notnull(row["Job Level"])):
Job_Level = row["Job Level"]
if(pd.notnull(row["Job Function"])):
Job_Function = row["Job Function"]
return OpenCX_Buyer_Id, Account_Name, Account_IP_Domain, Creative_Name, Website_Page_URL, Geo_City, Geo_State, Geo_Country, External_Account_Id, Account_Domain_Source, Job_Level, Job_Function
def getBuyerData(conn, cursor, row):
OpenCX_Buyer_Id, Account_Name, Account_IP_Domain, Creative_Name, Website_Page_URL, Geo_City, Geo_State, Geo_Country, External_Account_Id, Account_Domain_Source, Job_Level, Job_Function = setNullablesToEmptyBuyer(row)
return tuple([OpenCX_Buyer_Id, replaceAndGetBuyerData(conn,cursor,Account_Name,Account_IP_Domain,Creative_Name,Website_Page_URL,Geo_City,Geo_State,Geo_Country,External_Account_Id), Geo_City, Geo_State, Geo_Country, matchAndGetBuyerData(Account_Domain_Source), Job_Level, Job_Function])
def matchAndGetActivityData(Activity_Type, Creative_Name, Website_Page_URL):
if(Activity_Type=="Form Fill"):
return Website_Page_URL
if(Activity_Type=="Website Visit"):
return Website_Page_URL
if(Activity_Type=="Ad Click"):
return Creative_Name
if(Activity_Type=="Live Chat"):
return Website_Page_URL
return ""
def convertUETToDT(conn, cursor, Date_and_Time_unix_epoch_time_format):
cursor.execute(f"select from_unixtime({Date_and_Time_unix_epoch_time_format/1000});")
return cursor.fetchone()[0]
def setNullablesToEmptyBuyerTable(row):
id, account_id, city, state, country, source, job_level, job_function = "", "", "", "", "", "", "", ""
if(pd.notnull(row["id"])):
id = row["id"]
if(pd.notnull(row["account_id"])):
account_id = row["account_id"]
if(pd.notnull(row["city"])):
city = row["city"]
if(pd.notnull(row["state"])):
state = row["state"]
if(pd.notnull(row["country"])):
country = row["country"]
if(pd.notnull(row["source"])):
source = row["source"]
if(pd.notnull(row["job_level"])):
job_level = row["job_level"]
if(pd.notnull(row["job_function"])):
job_function = row["job_function"]
return tuple([id, account_id, city, state, country, source, job_level, job_function])
def replaceAndGetActivityData(buyerData, Open_CX_Buyer_Id):
for i,row in buyerData.iterrows():
if(pd.notnull(row["id"]) and row["id"]==Open_CX_Buyer_Id):
return row["id"]
return ""
def setNullablesToEmptyActivity(row):
Open_CX_Buyer_Id, Date_and_Time_unix_epoch_time_format, Activity_Type, Creative_Name, Website_Page_URL = "", "", "", "", ""
if(pd.notnull(row["OpenCX Buyer Id"])):
Open_CX_Buyer_Id = row["OpenCX Buyer Id"]
if(pd.notnull(row["Date and Time (unix epoch time format)"])):
Date_and_Time_unix_epoch_time_format = row["Date and Time (unix epoch time format)"]
if(pd.notnull(row["Activity Type"])):
Activity_Type = row["Activity Type"]
if(pd.notnull(row["Creative Name"])):
Creative_Name = row["Creative Name"]
if(pd.notnull(row["Website Page URL"])):
Website_Page_URL = row["Website Page URL"]
return Open_CX_Buyer_Id, Date_and_Time_unix_epoch_time_format, Activity_Type, Creative_Name, Website_Page_URL
def getActivityData(conn, cursor, buyerData, row):
Open_CX_Buyer_Id, Date_and_Time_unix_epoch_time_format, Activity_Type, Creative_Name, Website_Page_URL = setNullablesToEmptyActivity(row)
val = tuple([getNewUUID(conn, cursor), Open_CX_Buyer_Id, convertUETToDT(conn, cursor, Date_and_Time_unix_epoch_time_format), Activity_Type, matchAndGetActivityData(Activity_Type, Creative_Name, Website_Page_URL)])
print(val)
return val
def main():
displayData()
conn, cursor = connectToSql()
cursor.execute('DROP TABLE IF EXISTS buyer;')
cursor.execute("CREATE TABLE buyer ( id VARCHAR(255) PRIMARY KEY, account_id VARCHAR(255), city VARCHAR(255), state VARCHAR(255), country VARCHAR(255), source VARCHAR(255), job_level VARCHAR(255), job_function VARCHAR(255), FOREIGN KEY (account_id) REFERENCES account(id));")
line = 1
for i,row in unravelData.iterrows():
sql = "INSERT INTO project.buyer VALUES (%s,%s,%s,%s,%s,%s,%s,%s)"
cursor.execute(sql, getBuyerData(conn, cursor, row))
print("row in buyer table: " + str(line))
line+=1
conn.commit()
cursor.execute('DROP TABLE IF EXISTS activity;')
cursor.execute("CREATE TABLE activity ( id VARCHAR(255) NOT NULL DEFAULT(uuid()) PRIMARY KEY, buyer_id VARCHAR(255) NOT NULL, datetime datetime, activity_type VARCHAR(255), details VARCHAR(500), FOREIGN KEY (buyer_id) REFERENCES buyer(id));")
cursor.execute("SELECT * FROM project.buyer")
table_rows = cursor.fetchall()
buyerData = pd.DataFrame(table_rows, columns=cursor.column_names)
print(buyerData)
line = 1
complete_unravelData = pd.read_csv('/Users/vipul/Downloads/sir_unravel_v2.csv')
for i,row in complete_unravelData.iterrows():
sql = "INSERT INTO project.activity VALUES (%s,%s,%s,%s,%s)"
cursor.execute(sql, getActivityData(conn, cursor, buyerData, row))
print("row in buyer table: " + str(line))
line+=1
conn.commit()
disconnectToSql(conn, cursor)
pass
if __name__ == "__main__":
main()
Copy link

ghost commented Feb 5, 2024

Consider using https://github.com/FluffyDietEngine/files-2-db. Single command does all these for you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment