Last active
June 12, 2024 21:05
-
-
Save xescuder/c333678eb1b7c719678eb5e020b2f8be to your computer and use it in GitHub Desktop.
Pandas Storing Only Changes to DB (SqlAlchemy 2.0, Pandas 2.0)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sqlalchemy as db | |
import pandas as pd | |
def create_upsert_method(meta: db.MetaData): | |
""" | |
Create upsert method that satisfied the pandas's to_sql API. Adapted to 2.0 | |
""" | |
def method(table, conn, keys, data_iter): | |
# select table that data is being inserted to (from pandas's context) | |
sql_table = db.Table(table.name, meta, autoload=True) | |
# list of dictionaries {col_name: value} of data to insert | |
values_to_insert = [dict(zip(keys, data)) for data in data_iter] | |
# create insert statement using postgresql dialect. | |
# For other dialects, please refer to https://docs.sqlalchemy.org/en/14/dialects/ | |
insert_stmt = db.dialects.postgresql.insert(sql_table).values(values_to_insert) | |
# create update statement for excluded fields on conflict | |
update_stmt = {exc_k.key: exc_k for exc_k in insert_stmt.excluded} | |
# create upsert statement. | |
upsert_stmt = insert_stmt.on_conflict_do_update( | |
index_elements=sql_table.primary_key.columns, # index elements are primary keys of a table | |
set_=update_stmt # the SET part of an INSERT statement | |
) | |
# execute upsert statement | |
conn.execute(upsert_stmt) | |
return method | |
def save_dataframe(df: pd.DataFrame, engine, table: str, if_exists="append"): | |
''' | |
Save dataframe to the database. | |
Index is saved if it has name. If it's None it will not be saved. | |
It implements INSERT IGNORE when inserting rows. | |
Table needs to exist before. | |
Arguments: | |
df {pd.DataFrame} -- dataframe to save | |
table {str} -- name of the db table | |
''' | |
if df.index.name is None: | |
save_index = False | |
else: | |
save_index = True | |
meta = db.MetaData() | |
meta.reflect(bind=engine) | |
upsert_method = create_upsert_method(meta) | |
with engine.connect() as conn: | |
df.to_sql(table, con=conn, index=save_index, if_exists=if_exists, chunksize=200, method=upsert_method) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment