Skip to content

Instantly share code, notes, and snippets.

@gordthompson
Last active October 3, 2024 17:47
Show Gist options
  • Save gordthompson/1fb0f1c3f5edbf6192e596de8350f205 to your computer and use it in GitHub Desktop.
Save gordthompson/1fb0f1c3f5edbf6192e596de8350f205 to your computer and use it in GitHub Desktop.
Alternative to_sql() *method* for mssql+pyodbc/pymssql/pytds
# Alternative to_sql() *method* for mssql+pyodbc, mssql+pymssql, or mssql+pytds
#
# adapted from https://pandas.pydata.org/docs/user_guide/io.html#io-sql-method
#
# version 1.6 - 2024-03-28
from datetime import date
import json
import pandas as pd
import sqlalchemy as sa
def mssql_insert_json(table, conn, keys, data_iter):
"""
Execute SQL statement inserting data via OPENJSON
Parameters
----------
table : pandas.io.sql.SQLTable
conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection
keys : list of str
Column names
data_iter : Iterable that iterates the values to be inserted
"""
# build dict of {"column_name": "column_type"}
col_dict = {
str(col.name): "nvarchar(max)"
if str(col.type) in ["TEXT", "NTEXT"]
else "bit"
if str(col.type) == "BOOLEAN"
else "datetime2"
if str(col.type) == "DATETIME"
else str(col.type)
for col in table.table.columns
}
columns = ", ".join([f"[{k}]" for k in keys])
if table.schema:
table_name = f"[{table.schema}].[{table.name}]"
else:
table_name = f"[{table.name}]"
json_data = [dict(zip(keys, row)) for row in data_iter]
with_clause = ",\n".join(
[
f"[{col_name}] {col_type} '$.\"{col_name}\"'"
for col_name, col_type in col_dict.items()
]
)
placeholder = "?" if conn.dialect.paramstyle == "qmark" else "%s"
sql = f"""\
INSERT INTO {table_name} ({columns})
SELECT {columns}
FROM OPENJSON({placeholder})
WITH
(
{with_clause}
);
"""
conn.exec_driver_sql(sql, (json.dumps(json_data, default=str),))
if __name__ == "__main__":
# =============
# USAGE EXAMPLE
# =============
# note: fast_executemany=True is not required
engine = sa.create_engine("mssql+pyodbc://scott:tiger^5HHH@mssql_199")
df = pd.DataFrame(
[(1, "Cheers!", date(2001, 1, 1)), (2, "ΟΠΑ!", date(2002, 2, 2))],
columns=["id", "my text", "date added"],
)
df.to_sql(
"##tmp",
engine,
index=False,
if_exists="append",
dtype={"my text": sa.types.UnicodeText},
method=mssql_insert_json,
)
# check result
with engine.begin() as connection:
print(connection.exec_driver_sql("SELECT * FROM ##tmp").all())
# [(1, 'Cheers!', datetime.date(2001, 1, 1)), (2, 'ΟΠΑ!', datetime.date(2002, 2, 2))]
"""The INSERT statement generated for this example is:
INSERT INTO [##tmp] ([id], [my text], [date added])
SELECT [id], [my text], [date added]
FROM OPENJSON(?)
WITH
(
[id] BIGINT '$."id"',
[my text] nvarchar(max) '$."my text"',
[date added] DATE '$."date added"'
);
"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment