Skip to content

Instantly share code, notes, and snippets.

@xescuder
Last active June 12, 2024 21:05
Show Gist options
  • Save xescuder/c333678eb1b7c719678eb5e020b2f8be to your computer and use it in GitHub Desktop.
Save xescuder/c333678eb1b7c719678eb5e020b2f8be to your computer and use it in GitHub Desktop.
Pandas Storing Only Changes to DB (SqlAlchemy 2.0, Pandas 2.0)
import sqlalchemy as db
import pandas as pd
def create_upsert_method(meta: db.MetaData):
"""
Create upsert method that satisfied the pandas's to_sql API. Adapted to 2.0
"""
def method(table, conn, keys, data_iter):
# select table that data is being inserted to (from pandas's context)
sql_table = db.Table(table.name, meta, autoload=True)
# list of dictionaries {col_name: value} of data to insert
values_to_insert = [dict(zip(keys, data)) for data in data_iter]
# create insert statement using postgresql dialect.
# For other dialects, please refer to https://docs.sqlalchemy.org/en/14/dialects/
insert_stmt = db.dialects.postgresql.insert(sql_table).values(values_to_insert)
# create update statement for excluded fields on conflict
update_stmt = {exc_k.key: exc_k for exc_k in insert_stmt.excluded}
# create upsert statement.
upsert_stmt = insert_stmt.on_conflict_do_update(
index_elements=sql_table.primary_key.columns, # index elements are primary keys of a table
set_=update_stmt # the SET part of an INSERT statement
)
# execute upsert statement
conn.execute(upsert_stmt)
return method
def save_dataframe(df: pd.DataFrame, engine, table: str, if_exists="append"):
'''
Save dataframe to the database.
Index is saved if it has name. If it's None it will not be saved.
It implements INSERT IGNORE when inserting rows.
Table needs to exist before.
Arguments:
df {pd.DataFrame} -- dataframe to save
table {str} -- name of the db table
'''
if df.index.name is None:
save_index = False
else:
save_index = True
meta = db.MetaData()
meta.reflect(bind=engine)
upsert_method = create_upsert_method(meta)
with engine.connect() as conn:
df.to_sql(table, con=conn, index=save_index, if_exists=if_exists, chunksize=200, method=upsert_method)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment