This small subclass of the Pandas sqlalchemy-based SQL support for reading/storing tables uses the Postgres-specific "COPY FROM" method to insert large amounts of data to the database. It is much faster that using INSERT. To acheive this, the table is created in the normal way using sqlalchemy but no data is inserted. Instead the data is saved to a temporary CSV file (using Pandas' mature CSV support) then read back to Postgres using Psychopg2 support for COPY FROM STDIN.
Last active
March 30, 2021 21:34
-
-
Save mangecoeur/1fbd63d4758c2ba0c470 to your computer and use it in GitHub Desktop.
Pandas PostgresSQL support for loading to DB using fast COPY FROM method
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import tempfile | |
import pandas.io.sql | |
class PgSQLDatabase(pandas.io.sql.SQLDatabase): | |
# FIXME Schema is pulled from Meta object, shouldn't actually be part of signature! | |
def to_sql(self, frame, name, if_exists='fail', index=True, | |
index_label=None, schema=None, chunksize=None, dtype=None, pk=None): | |
""" | |
Write records stored in a DataFrame to a SQL database. | |
Parameters | |
---------- | |
frame : DataFrame | |
name : string | |
Name of SQL table | |
if_exists : {'fail', 'replace', 'append'}, default 'fail' | |
- fail: If table exists, do nothing. | |
- replace: If table exists, drop it, recreate it, and insert data. | |
- append: If table exists, insert data. Create if does not exist. | |
index : boolean, default True | |
Write DataFrame index as a column | |
index_label : string or sequence, default None | |
Column label for index column(s). If None is given (default) and | |
`index` is True, then the index names are used. | |
A sequence should be given if the DataFrame uses MultiIndex. | |
schema : string, default None | |
Name of SQL schema in database to write to (if database flavor | |
supports this). If specified, this overwrites the default | |
schema of the SQLDatabase object. | |
chunksize : int, default None | |
If not None, then rows will be written in batches of this size at a | |
time. If None, all rows will be written at once. | |
dtype : dict of column name to SQL type, default None | |
Optional specifying the datatype for columns. The SQL type should | |
be a SQLAlchemy type. | |
pk: name of column(s) to set as primary keys | |
""" | |
if dtype is not None: | |
import sqlalchemy.sql.type_api as type_api | |
for col, my_type in dtype.items(): | |
if not issubclass(my_type, type_api.TypeEngine): | |
raise ValueError('The type of %s is not a SQLAlchemy ' | |
'type ' % col) | |
table = pandas.io.sql.SQLTable(name, self, frame=frame, index=index, | |
if_exists=if_exists, index_label=index_label, | |
schema=self.meta.schema, dtype=dtype) | |
table.create() | |
if pk is not None: | |
if isinstance(pk, str): | |
pks = pk | |
else: | |
pks = ", ".join(pk) | |
sql = "ALTER TABLE {schema_name}.{table_name} ADD PRIMARY KEY ({pks})".format(schema_name=self.meta.schema, table_name=name, pks=pks) | |
self.execute(sql) | |
# Some tricks needed here: | |
# Need to explicitly keep reference to connection | |
# Need to "open" temp file seperately in write and read mode | |
# Otherwise data does not get loaded | |
conn = self.engine.raw_connection() | |
with conn.cursor() as cur, tempfile.NamedTemporaryFile(mode='w') as temp_file: | |
frame.to_csv(temp_file, index=index) | |
with open(temp_file.name, 'r') as f: | |
sql = "COPY {schema_name}.{table_name} FROM STDIN WITH (FORMAT CSV, HEADER TRUE)".format( | |
schema_name=self.meta.schema, table_name=name) | |
cur.copy_expert(sql, f) | |
conn.commit() | |
# check for potentially case sensitivity issues (GH7815) | |
self.meta.reflect() | |
if name not in self.engine.table_names(schema=schema or self.meta.schema): | |
warnings.warn("The provided table name '{0}' is not found exactly " | |
"as such in the database after writing the table, " | |
"possibly due to case sensitivity issues. Consider " | |
"using lower case table names.".format(name), UserWarning) |
what should be the type of meta.schema ??
Can you please share a sample schema ?
I've been using this custom code:
def create_file_object(df, file_path=None, string_io=True):
"""Creates a csv file or writes to memory"""
if string_io:
s_buf = StringIO()
df.to_csv(s_buf, index=False)
s_buf.seek(0)
file_object = s_buf
else:
df.to_csv(file_path, index=False)
df = open(file_path)
file_object = df
return file_object
def load_to_database(table, unique_columns, file_object, header=True):
fake_conn = sqlalchemy.create_engine(engine).raw_connection()
fake_cur = fake_conn.cursor()
if header:
columns = ', '.join([f'{col}' for col in unique_columns])
sql = f'COPY {table} ({columns}) FROM STDIN WITH CSV HEADER'
fake_cur.copy_expert(sql=sql, file=file_object)
else:
columns = ', '.join([f'{col}' for col in unique_columns])
sql = f'COPY {table} ({columns}) FROM STDIN WITH CSV'
fake_cur.copy_expert(sql=sql, file=file_object)
fake_conn.commit()
fake_cur.close()
del fake_cur
fake_conn.close()
The reason I have the list comprehension is because I am often loading a set of columns to a table which has a postgres SERIAL column as the primary key. This has been working, but I would rather import something that someone else has made better. The StringIO method works perfectly fine though and I have been using it consistently.
Is there any reason why you chose to use tempfile instead of StringIO?
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This was a helpful gist, thanks! One thing I found is that instead of re-opening the file the following approach worked for me: