-
-
Save pedrovgp/b46773a1240165bf2b1448b3f70bed32 to your computer and use it in GitHub Desktop.
| # Upsert function for pandas to_sql with postgres | |
| # https://stackoverflow.com/questions/1109061/insert-on-duplicate-update-in-postgresql/8702291#8702291 | |
| # https://www.postgresql.org/docs/devel/sql-insert.html#SQL-ON-CONFLICT | |
| import pandas as pd | |
| import sqlalchemy | |
| import uuid | |
| import os | |
| def upsert_df(df: pd.DataFrame, table_name: str, engine: sqlalchemy.engine.Engine): | |
| """Implements the equivalent of pd.DataFrame.to_sql(..., if_exists='update') | |
| (which does not exist). Creates or updates the db records based on the | |
| dataframe records. | |
| Conflicts to determine update are based on the dataframes index. | |
| This will set primary keys on the table equal to the index names | |
| 1. Create a temp table from the dataframe | |
| 2. Insert/update from temp table into table_name | |
| Returns: True if successful | |
| """ | |
| # If the table does not exist, we should just use to_sql to create it | |
| if not engine.execute( | |
| f"""SELECT EXISTS ( | |
| SELECT FROM information_schema.tables | |
| WHERE table_schema = 'public' | |
| AND table_name = '{table_name}'); | |
| """ | |
| ).first()[0]: | |
| df.to_sql(table_name, engine) | |
| return True | |
| # If it already exists... | |
| temp_table_name = f"temp_{uuid.uuid4().hex[:6]}" | |
| df.to_sql(temp_table_name, engine, index=True) | |
| index = list(df.index.names) | |
| index_sql_txt = ", ".join([f'"{i}"' for i in index]) | |
| columns = list(df.columns) | |
| headers = index + columns | |
| headers_sql_txt = ", ".join( | |
| [f'"{i}"' for i in headers] | |
| ) # index1, index2, ..., column 1, col2, ... | |
| # col1 = exluded.col1, col2=excluded.col2 | |
| update_column_stmt = ", ".join([f'"{col}" = EXCLUDED."{col}"' for col in columns]) | |
| # For the ON CONFLICT clause, postgres requires that the columns have unique constraint | |
| query_pk = f""" | |
| ALTER TABLE "{table_name}" ADD CONSTRAINT {table_name}_unique_constraint_for_upsert UNIQUE ({index_sql_txt}); | |
| """ | |
| try: | |
| engine.execute(query_pk) | |
| except Exception as e: | |
| # relation "unique_constraint_for_upsert" already exists | |
| if not 'unique_constraint_for_upsert" already exists' in e.args[0]: | |
| raise e | |
| # Compose and execute upsert query | |
| query_upsert = f""" | |
| INSERT INTO "{table_name}" ({headers_sql_txt}) | |
| SELECT {headers_sql_txt} FROM "{temp_table_name}" | |
| ON CONFLICT ({index_sql_txt}) DO UPDATE | |
| SET {update_column_stmt}; | |
| """ | |
| engine.execute(query_upsert) | |
| engine.execute(f'DROP TABLE "{temp_table_name}"') | |
| return True | |
| if __name__ == "__main__": | |
| # TESTS (create environment variable DB_STR to do it) | |
| engine = sqlalchemy.create_engine(os.getenv("DB_STR")) | |
| indexes = ["id1", "id2"] | |
| df = pd.DataFrame( | |
| { | |
| "id1": [1, 2, 3, 3], | |
| "id2": ["a", "a", "b", "c"], | |
| "name": ["name1", "name2", "name3", "name4"], | |
| "age": [20, 32, 29, 68], | |
| } | |
| ).set_index(indexes) | |
| df_update = pd.DataFrame( | |
| { | |
| "id1": [1, 2, 3], | |
| "id2": ["a", "a", "b"], | |
| "name": ["surname1", "surname2", "surname3"], | |
| "age": [13, 44, 29], | |
| } | |
| ).set_index(indexes) | |
| df_insert = pd.DataFrame( | |
| { | |
| "id1": [1], | |
| "id2": ["d"], | |
| "name": ["dname"], | |
| "age": [100], | |
| } | |
| ).set_index(indexes) | |
| expected_result = ( | |
| pd.DataFrame( | |
| { | |
| "id1": [1, 2, 3, 3, 1], | |
| "id2": ["a", "a", "b", "c", "d"], | |
| "name": ["surname1", "surname2", "surname3", "name4", "dname"], | |
| "age": [13, 44, 29, 68, 100], | |
| } | |
| ) | |
| .set_index(indexes) | |
| .sort_index() | |
| ) | |
| TNAME = "test_upsert_df" | |
| upsert_df(df=df, table_name=TNAME, engine=engine) | |
| upsert_df(df=df_update, table_name=TNAME, engine=engine) | |
| upsert_df(df=df_insert, table_name=TNAME, engine=engine) | |
| result = pd.read_sql_table(TNAME, engine).set_index(indexes).sort_index() | |
| assert (result == expected_result).all().all() | |
| print("Passed tests") |
Hi here this is so useful and impressive @pedrovgp would you have a similar function for SQLite engine instead of PostgreSQL I believe the syntax would vary a bit but keen on trying something similar on SQLite
Hi Pascal, I never tried anything like it in SQLite. You can try providing a sqlite engine (instead of a postgres one) to the function to probe it.
Hey Pedro, do you know what causes the exception here? I started encountering it and I don't understand it. The table doesn't have the unique constraint but it thinks it does?
I think I know what causes this and how to fix this. I believe it is caused when two tables are using the same constraint name. It can be remedied with something like the following so that each table has a unique constraint name and they don't share them:
constraint_name = f"unique_constraint_for_{table_name}_upsert"
query_pk = f"""
ALTER TABLE "{table_name}" DROP CONSTRAINT IF EXISTS {constraint_name};
ALTER TABLE "{table_name}" ADD CONSTRAINT {constraint_name} UNIQUE ({index_sql_txt});
""" # it's saying the index names must be unique
engine.execute(query_pk)I seemed to stop getting the error after making this change.
Yes that was the problem. Nice fix by the way, as far as I remember, I was creating and deleting the constraint in the end.
How does this compare to https://github.com/ThibTrip/pangres/wiki/Upsert ?
Edit: this solution produced an exception:
File "/home/jupyter-admin/.local/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 717, in do_execute
cursor.execute(statement, parameters)
psycopg2.errors.SyntaxError: syntax error at or near "-"
Resolved it by replacing with ThibTrip/pangres#74 (comment)
As I use
enginefrom sqlmodel which has SQLAlchemy 2.0 underneath. Directengine.execute()is no longer supported. It needs to be inside connection object so I adjusted your code as belowAlso I'm not sure about This
InternalError: (psycopg2.errors.InFailedSqlTransaction) current transaction is aborted, commands ignored until end of transaction blockwhen connection object tries to execute"INSERT INTO ..."but addingconn.execute(sqlmodel.text("commit"))helpsps.1
type(engine)from sqlmodel issqlalchemy.future.engine.Engineps.2 The strings inside conn.execute() needs to be a class of
sqlalchemy.sql.elements.TextClauseso i cast it bysqlmodel.text(...)Here is my code: