Created
September 7, 2023 15:04
-
-
Save mw3i/67be51f38ec16443f9dc1863e082204b to your computer and use it in GitHub Desktop.
Prototype: Dataframe Upsert
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Built with chatgpt; still in the process of testing | |
''' | |
import time | |
import pandas as pd | |
from sqlalchemy import create_engine, Table, MetaData, select, insert, update, bindparam | |
def upsert_dataframe_to_sql(dataframe, table_name, id_column="id", verbose=True): | |
""" | |
Upsert a Pandas DataFrame into a SQL table using SQLAlchemy. | |
Args: | |
dataframe (pd.DataFrame): The DataFrame to upsert. | |
table_name (str): The name of the SQL table to upsert into. | |
id_column (str, optional): The name of the primary key or unique column. Defaults to "id". | |
verbose (bool, optional): Whether to print verbose information. Defaults to True. | |
Returns: | |
None | |
""" | |
start_time = time.time() | |
# Create a SQLAlchemy engine | |
engine = create_engine('postgresql://username:password@localhost:5432/database') | |
# Get the table metadata and columns | |
metadata = MetaData(bind=engine) | |
table = Table(table_name, metadata, autoload=True) | |
columns = table.columns.keys() | |
# Get the current row count in the table | |
existing_rows_count = engine.execute(select([table.c[id_column]]).count()).scalar() | |
# Use Pandas to create a series of parameters for the IDs in the current DataFrame | |
id_param_series = dataframe[id_column].apply(lambda id_val: bindparam('param_' + id_column, value=id_val)) | |
# Use SQL to check which IDs already exist in the table | |
id_exists_query = select([table.c[id_column].in_(id_param_series)]) | |
existing_id_flags = engine.execute(id_exists_query).fetchall() | |
# Extract the boolean values from the result using Pandas | |
id_exists_flags = pd.Series([bool(row[0]) for row in existing_id_flags]) | |
# Identify rows to update (existing) and rows to insert (non-existing) | |
update_data = dataframe[id_exists_flags] | |
insert_data = dataframe[~id_exists_flags] | |
# Perform updates for existing rows | |
if not update_data.empty: | |
update_stmt = update(table).where(table.c[id_column] == bindparam('param_' + id_column)) | |
engine.execute(update_stmt, update_data.to_dict(orient='records')) | |
# Perform inserts for non-existing rows | |
if not insert_data.empty: | |
engine.execute(table.insert().values(insert_data.to_dict(orient='records'))) | |
# Calculate and print verbose information | |
end_time = time.time() | |
total_rows_before_upsert = existing_rows_count | |
rows_updated = len(update_data) | |
rows_inserted = len(insert_data) | |
total_rows_after_upsert = total_rows_before_upsert + rows_updated + rows_inserted | |
execution_time = end_time - start_time | |
if verbose: | |
print(f"Total rows before upsert: {total_rows_before_upsert}") | |
print(f"Rows updated: {rows_updated}") | |
print(f"Rows inserted: {rows_inserted}") | |
print(f"Total rows after upsert: {total_rows_after_upsert}") | |
print(f"Execution time: {execution_time:.2f} seconds") | |
# Example usage: | |
# upsert_dataframe_to_sql(my_dataframe, "my_table", id_column="my_id") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment