Last active
August 21, 2024 17:07
-
-
Save gordthompson/ae7a1528fde1c00c03fdbb5c53c8f90f to your computer and use it in GitHub Desktop.
Build a PostgreSQL INSERT … ON CONFLICT statement and upsert a DataFrame
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright 2024 Gordon D. Thompson, [email protected] | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# version 1.3 - 2024-06-21 | |
import pandas as pd | |
import sqlalchemy as sa | |
def df_upsert(data_frame, table_name, engine, schema=None, match_columns=None, insert_only=False): | |
""" | |
Perform an "upsert" on a PostgreSQL table from a DataFrame. | |
Constructs an INSERT … ON CONFLICT statement, uploads the DataFrame to a | |
temporary table, and then executes the INSERT. | |
Parameters | |
---------- | |
data_frame : pandas.DataFrame | |
The DataFrame to be upserted. | |
table_name : str | |
The name of the target table. | |
engine : sqlalchemy.engine.Engine | |
The SQLAlchemy Engine to use. | |
schema : str, optional | |
The name of the schema containing the target table. | |
match_columns : list of str, optional | |
A list of the column name(s) on which to match. If omitted, the | |
primary key columns of the target table will be used. | |
insert_only : bool, optional | |
On conflict do not update. (Default: False) | |
""" | |
table_spec = "" | |
if schema: | |
table_spec += '"' + schema.replace('"', '""') + '".' | |
table_spec += '"' + table_name.replace('"', '""') + '"' | |
df_columns = list(data_frame.columns) | |
if not match_columns: | |
insp = sa.inspect(engine) | |
match_columns = insp.get_pk_constraint(table_name, schema=schema)[ | |
"constrained_columns" | |
] | |
columns_to_update = [col for col in df_columns if col not in match_columns] | |
insert_col_list = ", ".join([f'"{col_name}"' for col_name in df_columns]) | |
stmt = f"INSERT INTO {table_spec} ({insert_col_list})\n" | |
stmt += f"SELECT {insert_col_list} FROM temp_table\n" | |
match_col_list = ", ".join([f'"{col}"' for col in match_columns]) | |
stmt += f"ON CONFLICT ({match_col_list}) DO " | |
if insert_only: | |
stmt += "NOTHING" | |
else: | |
stmt += "UPDATE SET\n" | |
stmt += ", ".join( | |
[f'"{col}" = EXCLUDED."{col}"' for col in columns_to_update] | |
) | |
with engine.begin() as conn: | |
conn.exec_driver_sql("DROP TABLE IF EXISTS temp_table") | |
conn.exec_driver_sql( | |
f"CREATE TEMPORARY TABLE temp_table AS SELECT * FROM {table_spec} WHERE false" | |
) | |
data_frame.to_sql("temp_table", conn, if_exists="append", index=False) | |
conn.exec_driver_sql(stmt) | |
if __name__ == "__main__": | |
# Usage example adapted from | |
# https://stackoverflow.com/a/62379384/2144390 | |
engine = sa.create_engine("postgresql://scott:[email protected]/test") | |
# create example environment | |
with engine.begin() as conn: | |
conn.exec_driver_sql("DROP TABLE IF EXISTS main_table") | |
conn.exec_driver_sql( | |
"CREATE TABLE main_table (id int primary key, txt varchar(50), status varchar(50))" | |
) | |
conn.exec_driver_sql( | |
"INSERT INTO main_table (id, txt, status) VALUES (1, 'row 1 old text', 'original')" | |
) | |
# [(1, 'row 1 old text', 'original')] | |
# DataFrame to upsert | |
df = pd.DataFrame( | |
[(2, "new row 2 text", "upserted"), (1, "row 1 new text", "upserted")], | |
columns=["id", "txt", "status"], | |
) | |
df_upsert(df, "main_table", engine) | |
"""The INSERT statement generated for this example: | |
INSERT INTO "main_table" ("id", "txt", "status") | |
SELECT "id", "txt", "status" FROM temp_table | |
ON CONFLICT ("id") DO UPDATE SET | |
"txt" = EXCLUDED."txt", "status" = EXCLUDED."status" | |
""" | |
# check results | |
with engine.begin() as conn: | |
print( | |
conn.exec_driver_sql("SELECT * FROM main_table").all() | |
) | |
# [(2, 'new row 2 text', 'upserted'), (1, 'row 1 new text', 'upserted')] |
Thanks for publishing this, @gordthompson !
I was running a bunch of concurrent upserts that would conflict with one another due to the single temp table name, so I created a version that adds a random string to the end of the temp table name:
https://gist.github.com/jackiep00/50eced6d1b63ac37e6841e1307007ef8
I'm not sure how to do a PR with gists, or else I woulda done that...
Snagged a copy from @jackiep00 as it suits my needs a bit better for volume sake however, @gordthompson Thank you! Both of you!!! I was thinking how to do this but you hero's just made my life much easier! Thank you!
Thank you so much!
worked so well
Thank you !
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thank you for putting this online!