Skip to content

Instantly share code, notes, and snippets.

@afr-dt
Forked from mangecoeur/description.md
Created September 28, 2018 18:57
Show Gist options
  • Save afr-dt/b5f40ba701349c114a11e5c9cbb34e7f to your computer and use it in GitHub Desktop.
Save afr-dt/b5f40ba701349c114a11e5c9cbb34e7f to your computer and use it in GitHub Desktop.
Pandas PostgresSQL support for loading to DB using fast COPY FROM method

This small subclass of the Pandas sqlalchemy-based SQL support for reading/storing tables uses the Postgres-specific "COPY FROM" method to insert large amounts of data to the database. It is much faster that using INSERT. To acheive this, the table is created in the normal way using sqlalchemy but no data is inserted. Instead the data is saved to a temporary CSV file (using Pandas' mature CSV support) then read back to Postgres using Psychopg2 support for COPY FROM STDIN.

import tempfile
import pandas.io.sql
class PgSQLDatabase(pandas.io.sql.SQLDatabase):
# FIXME Schema is pulled from Meta object, shouldn't actually be part of signature!
def to_sql(self, frame, name, if_exists='fail', index=True,
index_label=None, schema=None, chunksize=None, dtype=None, pk=None):
"""
Write records stored in a DataFrame to a SQL database.
Parameters
----------
frame : DataFrame
name : string
Name of SQL table
if_exists : {'fail', 'replace', 'append'}, default 'fail'
- fail: If table exists, do nothing.
- replace: If table exists, drop it, recreate it, and insert data.
- append: If table exists, insert data. Create if does not exist.
index : boolean, default True
Write DataFrame index as a column
index_label : string or sequence, default None
Column label for index column(s). If None is given (default) and
`index` is True, then the index names are used.
A sequence should be given if the DataFrame uses MultiIndex.
schema : string, default None
Name of SQL schema in database to write to (if database flavor
supports this). If specified, this overwrites the default
schema of the SQLDatabase object.
chunksize : int, default None
If not None, then rows will be written in batches of this size at a
time. If None, all rows will be written at once.
dtype : dict of column name to SQL type, default None
Optional specifying the datatype for columns. The SQL type should
be a SQLAlchemy type.
pk: name of column(s) to set as primary keys
"""
if dtype is not None:
import sqlalchemy.sql.type_api as type_api
for col, my_type in dtype.items():
if not issubclass(my_type, type_api.TypeEngine):
raise ValueError('The type of %s is not a SQLAlchemy '
'type ' % col)
table = pandas.io.sql.SQLTable(name, self, frame=frame, index=index,
if_exists=if_exists, index_label=index_label,
schema=self.meta.schema, dtype=dtype)
table.create()
if pk is not None:
if isinstance(pk, str):
pks = pk
else:
pks = ", ".join(pk)
sql = "ALTER TABLE {schema_name}.{table_name} ADD PRIMARY KEY ({pks})".format(schema_name=self.meta.schema, table_name=name, pks=pks)
self.execute(sql)
# Some tricks needed here:
# Need to explicitly keep reference to connection
# Need to "open" temp file seperately in write and read mode
# Otherwise data does not get loaded
conn = self.engine.raw_connection()
with conn.cursor() as cur, tempfile.NamedTemporaryFile(mode='w') as temp_file:
frame.to_csv(temp_file, index=index)
with open(temp_file.name, 'r') as f:
sql = "COPY {schema_name}.{table_name} FROM STDIN WITH (FORMAT CSV, HEADER TRUE)".format(
schema_name=self.meta.schema, table_name=name)
cur.copy_expert(sql, f)
conn.commit()
# check for potentially case sensitivity issues (GH7815)
self.meta.reflect()
if name not in self.engine.table_names(schema=schema or self.meta.schema):
warnings.warn("The provided table name '{0}' is not found exactly "
"as such in the database after writing the table, "
"possibly due to case sensitivity issues. Consider "
"using lower case table names.".format(name), UserWarning)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment