Last active
August 12, 2021 20:38
-
-
Save andrewdoss-bit/de0c1bcb015ce4b07bbf1e5f64199fbb to your computer and use it in GitHub Desktop.
Load
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Load pandas DataFrames to PostgreSQL on bit.io""" | |
from sqlalchemy import create_engine | |
def to_table(df, destination, pg_conn_string): | |
""" | |
Loads a pandas DataFrame to a bit.io database. | |
Parameters | |
---------- | |
df : pandas.DataFrame | |
destination : str | |
Fully qualified bit.io PostgreSQL table name. | |
pg_conn_string : str | |
A bit.io PostgreSQL connection string including credentials. | |
""" | |
# Validation and setup | |
if pg_conn_string is None: | |
raise ValueError("You must specify a PG connection string.") | |
schema, table = destination.split(".") | |
engine = create_engine(pg_conn_string) | |
# Check if table exists and set load type accordingly | |
if _table_exists(engine, schema, table): | |
_truncate_table(engine, schema, table) | |
if_exists = 'append' | |
else: | |
if_exists = 'fail' | |
with engine.connect() as conn: | |
# 10 minute upload limit | |
conn.execute("SET statement_timeout = 600000;") | |
df.to_sql( | |
table, | |
conn, | |
schema, | |
if_exists=if_exists, | |
index=False, | |
method=_psql_insert_copy) | |
# The following helper methods are truncated here for brevity, | |
# but are available on github.com/bitdotioinc/simple-pipeline | |
# _table_exists - returns boolean indicating whether a table already exists | |
# _truncate_table - deletes all data from existing table to prepare for fresh load | |
# _psql_insert_copy - implements a fast pandas -> PostgreSQL insert using COPY FROM CSV command |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment