-
-
Save pedrovgp/b46773a1240165bf2b1448b3f70bed32 to your computer and use it in GitHub Desktop.
# Upsert function for pandas to_sql with postgres | |
# https://stackoverflow.com/questions/1109061/insert-on-duplicate-update-in-postgresql/8702291#8702291 | |
# https://www.postgresql.org/docs/devel/sql-insert.html#SQL-ON-CONFLICT | |
import pandas as pd | |
import sqlalchemy | |
import uuid | |
import os | |
def upsert_df(df: pd.DataFrame, table_name: str, engine: sqlalchemy.engine.Engine): | |
"""Implements the equivalent of pd.DataFrame.to_sql(..., if_exists='update') | |
(which does not exist). Creates or updates the db records based on the | |
dataframe records. | |
Conflicts to determine update are based on the dataframes index. | |
This will set primary keys on the table equal to the index names | |
1. Create a temp table from the dataframe | |
2. Insert/update from temp table into table_name | |
Returns: True if successful | |
""" | |
# If the table does not exist, we should just use to_sql to create it | |
if not engine.execute( | |
f"""SELECT EXISTS ( | |
SELECT FROM information_schema.tables | |
WHERE table_schema = 'public' | |
AND table_name = '{table_name}'); | |
""" | |
).first()[0]: | |
df.to_sql(table_name, engine) | |
return True | |
# If it already exists... | |
temp_table_name = f"temp_{uuid.uuid4().hex[:6]}" | |
df.to_sql(temp_table_name, engine, index=True) | |
index = list(df.index.names) | |
index_sql_txt = ", ".join([f'"{i}"' for i in index]) | |
columns = list(df.columns) | |
headers = index + columns | |
headers_sql_txt = ", ".join( | |
[f'"{i}"' for i in headers] | |
) # index1, index2, ..., column 1, col2, ... | |
# col1 = exluded.col1, col2=excluded.col2 | |
update_column_stmt = ", ".join([f'"{col}" = EXCLUDED."{col}"' for col in columns]) | |
# For the ON CONFLICT clause, postgres requires that the columns have unique constraint | |
query_pk = f""" | |
ALTER TABLE "{table_name}" ADD CONSTRAINT {table_name}_unique_constraint_for_upsert UNIQUE ({index_sql_txt}); | |
""" | |
try: | |
engine.execute(query_pk) | |
except Exception as e: | |
# relation "unique_constraint_for_upsert" already exists | |
if not 'unique_constraint_for_upsert" already exists' in e.args[0]: | |
raise e | |
# Compose and execute upsert query | |
query_upsert = f""" | |
INSERT INTO "{table_name}" ({headers_sql_txt}) | |
SELECT {headers_sql_txt} FROM "{temp_table_name}" | |
ON CONFLICT ({index_sql_txt}) DO UPDATE | |
SET {update_column_stmt}; | |
""" | |
engine.execute(query_upsert) | |
engine.execute(f'DROP TABLE "{temp_table_name}"') | |
return True | |
if __name__ == "__main__": | |
# TESTS (create environment variable DB_STR to do it) | |
engine = sqlalchemy.create_engine(os.getenv("DB_STR")) | |
indexes = ["id1", "id2"] | |
df = pd.DataFrame( | |
{ | |
"id1": [1, 2, 3, 3], | |
"id2": ["a", "a", "b", "c"], | |
"name": ["name1", "name2", "name3", "name4"], | |
"age": [20, 32, 29, 68], | |
} | |
).set_index(indexes) | |
df_update = pd.DataFrame( | |
{ | |
"id1": [1, 2, 3], | |
"id2": ["a", "a", "b"], | |
"name": ["surname1", "surname2", "surname3"], | |
"age": [13, 44, 29], | |
} | |
).set_index(indexes) | |
df_insert = pd.DataFrame( | |
{ | |
"id1": [1], | |
"id2": ["d"], | |
"name": ["dname"], | |
"age": [100], | |
} | |
).set_index(indexes) | |
expected_result = ( | |
pd.DataFrame( | |
{ | |
"id1": [1, 2, 3, 3, 1], | |
"id2": ["a", "a", "b", "c", "d"], | |
"name": ["surname1", "surname2", "surname3", "name4", "dname"], | |
"age": [13, 44, 29, 68, 100], | |
} | |
) | |
.set_index(indexes) | |
.sort_index() | |
) | |
TNAME = "test_upsert_df" | |
upsert_df(df=df, table_name=TNAME, engine=engine) | |
upsert_df(df=df_update, table_name=TNAME, engine=engine) | |
upsert_df(df=df_insert, table_name=TNAME, engine=engine) | |
result = pd.read_sql_table(TNAME, engine).set_index(indexes).sort_index() | |
assert (result == expected_result).all().all() | |
print("Passed tests") |
Hey Pedro, do you know what causes the exception here? I started encountering it and I don't understand it. The table doesn't have the unique constraint but it thinks it does?
I think I know what causes this and how to fix this. I believe it is caused when two tables are using the same constraint name. It can be remedied with something like the following so that each table has a unique constraint name and they don't share them:
constraint_name = f"unique_constraint_for_{table_name}_upsert"
query_pk = f"""
ALTER TABLE "{table_name}" DROP CONSTRAINT IF EXISTS {constraint_name};
ALTER TABLE "{table_name}" ADD CONSTRAINT {constraint_name} UNIQUE ({index_sql_txt});
""" # it's saying the index names must be unique
engine.execute(query_pk)
I seemed to stop getting the error after making this change.
Yes that was the problem. Nice fix by the way, as far as I remember, I was creating and deleting the constraint in the end.
How does this compare to https://github.com/ThibTrip/pangres/wiki/Upsert ?
Edit: this solution produced an exception:
File "/home/jupyter-admin/.local/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 717, in do_execute
cursor.execute(statement, parameters)
psycopg2.errors.SyntaxError: syntax error at or near "-"
Resolved it by replacing with ThibTrip/pangres#74 (comment)
Hi Pascal, I never tried anything like it in SQLite. You can try providing a sqlite engine (instead of a postgres one) to the function to probe it.