Created
May 2, 2023 21:21
-
-
Save keivanipchihagh/6fab85647f397a668e808430ed18e711 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def upsert_df(self, df: pd.DataFrame, table_name: str) -> None: | |
""" | |
Implements the equivalent of pd.DataFrame.to_sql(..., if_exists='update') (which does not exist). Creates or updates the db records based on the dataframe records. | |
Parameters: | |
df (pd.DataFrame): Dataframe to upsert (NOTE: Primary keys of the destination 'table_name' must be equal to dataframe index and not present in the dataframe columns) | |
table_name (str): Table name | |
client (PostgresClient): Database client | |
Returns: | |
None | |
""" | |
# Create a temporary table and insert the dataframe into it | |
temp_table_name = f"temp_{uuid.uuid4().hex[:10]}" | |
self._insert(df, temp_table_name, index = True) | |
# Get the index and column names, and create the SQL text for them | |
index_names = list(df.index.names) | |
index_names_sql_txt = ", ".join([f'"{_}"' for _ in index_names]) | |
column_names = list(df.columns) | |
headers_sql_txt = ", ".join([f'"{i}"' for i in index_names + column_names]) # SQL columns: idx1, idx2, col1, col2, ... | |
# col1 = exluded.col1, col2=excluded.col2 | |
update_column_stmt = ", ".join([f'"{col}" = EXCLUDED."{col}"' for col in column_names]) | |
# Create a unique constraint on the index columns for the ON CONFILCT clause | |
self.engine.execute(f""" | |
ALTER TABLE "{table_name}" DROP CONSTRAINT IF EXISTS unique_constraint_{temp_table_name}; | |
ALTER TABLE "{table_name}" ADD CONSTRAINT unique_constraint_{temp_table_name} UNIQUE ({index_names_sql_txt}); | |
""") | |
# Apply the upsert and remove the temporary table | |
self.engine.execute(f""" | |
INSERT INTO "{table_name}" ({headers_sql_txt}) | |
SELECT {headers_sql_txt} FROM "{temp_table_name}" | |
ON CONFLICT ({index_names_sql_txt}) DO UPDATE | |
SET {update_column_stmt}; | |
""") | |
self.engine.execute(f"DROP TABLE {temp_table_name}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Added this function in the toolkit4life==0.2.2.