This small subclass of the Pandas sqlalchemy-based SQL support for reading/storing tables uses the Postgres-specific "COPY FROM" method to insert large amounts of data to the database. It is much faster that using INSERT. To acheive this, the table is created in the normal way using sqlalchemy but no data is inserted. Instead the data is saved to a temporary CSV file (using Pandas' mature CSV support) then read back to Postgres using Psychopg2 support for COPY FROM STDIN.
-
-
Save afr-dt/b5f40ba701349c114a11e5c9cbb34e7f to your computer and use it in GitHub Desktop.
Pandas PostgresSQL support for loading to DB using fast COPY FROM method
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import tempfile | |
import pandas.io.sql | |
class PgSQLDatabase(pandas.io.sql.SQLDatabase): | |
# FIXME Schema is pulled from Meta object, shouldn't actually be part of signature! | |
def to_sql(self, frame, name, if_exists='fail', index=True, | |
index_label=None, schema=None, chunksize=None, dtype=None, pk=None): | |
""" | |
Write records stored in a DataFrame to a SQL database. | |
Parameters | |
---------- | |
frame : DataFrame | |
name : string | |
Name of SQL table | |
if_exists : {'fail', 'replace', 'append'}, default 'fail' | |
- fail: If table exists, do nothing. | |
- replace: If table exists, drop it, recreate it, and insert data. | |
- append: If table exists, insert data. Create if does not exist. | |
index : boolean, default True | |
Write DataFrame index as a column | |
index_label : string or sequence, default None | |
Column label for index column(s). If None is given (default) and | |
`index` is True, then the index names are used. | |
A sequence should be given if the DataFrame uses MultiIndex. | |
schema : string, default None | |
Name of SQL schema in database to write to (if database flavor | |
supports this). If specified, this overwrites the default | |
schema of the SQLDatabase object. | |
chunksize : int, default None | |
If not None, then rows will be written in batches of this size at a | |
time. If None, all rows will be written at once. | |
dtype : dict of column name to SQL type, default None | |
Optional specifying the datatype for columns. The SQL type should | |
be a SQLAlchemy type. | |
pk: name of column(s) to set as primary keys | |
""" | |
if dtype is not None: | |
import sqlalchemy.sql.type_api as type_api | |
for col, my_type in dtype.items(): | |
if not issubclass(my_type, type_api.TypeEngine): | |
raise ValueError('The type of %s is not a SQLAlchemy ' | |
'type ' % col) | |
table = pandas.io.sql.SQLTable(name, self, frame=frame, index=index, | |
if_exists=if_exists, index_label=index_label, | |
schema=self.meta.schema, dtype=dtype) | |
table.create() | |
if pk is not None: | |
if isinstance(pk, str): | |
pks = pk | |
else: | |
pks = ", ".join(pk) | |
sql = "ALTER TABLE {schema_name}.{table_name} ADD PRIMARY KEY ({pks})".format(schema_name=self.meta.schema, table_name=name, pks=pks) | |
self.execute(sql) | |
# Some tricks needed here: | |
# Need to explicitly keep reference to connection | |
# Need to "open" temp file seperately in write and read mode | |
# Otherwise data does not get loaded | |
conn = self.engine.raw_connection() | |
with conn.cursor() as cur, tempfile.NamedTemporaryFile(mode='w') as temp_file: | |
frame.to_csv(temp_file, index=index) | |
with open(temp_file.name, 'r') as f: | |
sql = "COPY {schema_name}.{table_name} FROM STDIN WITH (FORMAT CSV, HEADER TRUE)".format( | |
schema_name=self.meta.schema, table_name=name) | |
cur.copy_expert(sql, f) | |
conn.commit() | |
# check for potentially case sensitivity issues (GH7815) | |
self.meta.reflect() | |
if name not in self.engine.table_names(schema=schema or self.meta.schema): | |
warnings.warn("The provided table name '{0}' is not found exactly " | |
"as such in the database after writing the table, " | |
"possibly due to case sensitivity issues. Consider " | |
"using lower case table names.".format(name), UserWarning) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment