Skip to content

Instantly share code, notes, and snippets.

@ellisvalentiner
Last active January 10, 2023 19:03
Show Gist options
  • Save ellisvalentiner/63b083180afe54f17f16843dd51f4394 to your computer and use it in GitHub Desktop.
Save ellisvalentiner/63b083180afe54f17f16843dd51f4394 to your computer and use it in GitHub Desktop.
Recipe for (fast) bulk insert from python Pandas DataFrame to Postgres database
#!/usr/bin/env/python
import psycopg2
import os
from io import StringIO
import pandas as pd
# Get a database connection
dsn = os.environ.get('DB_DSN') # Use ENV vars: keep it secret, keep it safe
conn = psycopg2.connect(dsn)
# Do something to create your dataframe here...
df = pd.read_csv("file.csv")
# Initialize a string buffer
sio = StringIO()
sio.write(df.to_csv(index=None, header=None)) # Write the Pandas DataFrame as a csv to the buffer
sio.seek(0) # Be sure to reset the position to the start of the stream
# Copy the string buffer to the database, as if it were an actual file
with conn.cursor() as c:
c.copy_from(sio, "schema.table", columns=df.columns, sep=',')
conn.commit()
@rehoter-cyber
Copy link

Hello! Thanks for your work! I learned a lot! It takes 12 minutes to insert 100W lines former to 10 seconds to insert them now!
I have a little question about rollback things.
I am reading a quite big file, so I use the chunk size about 100W lines per process.
What if one line of a chunk cannot meet the table data type requirement? and I want it shows in the log file?
I think one solution is about to take apart the chunk into a little chunk.
And I think the key to the question is about to position those unacceptable lines. How to fast abandon those lines?

def tb_insert(cursor):
    print("Inserting into [title_basics]......")
    create_tb_table(cursor)
    tb_miss = 0
    tb_log = open("title_basics_log1.txt", "w")
    tb_chunk_counter = -1
    for chunk in tqdm(pd.read_csv("title.basics.tsv", \
                             low_memory=False, sep="\t",\
                             chunksize = 1000000)):
          
        tb_chunk_counter = tb_chunk_counter + 1
        if (tb_chunk_counter == 3):
            break
        ck = chunk
        sio = StringIO()
        sio.write(chunk.to_csv(index=None, header=None))  
        sio.seek(0)
        
        print("Current chunk index = ", tb_chunk_counter)
        try:
            cur.copy_from(sio,'title_basics',columns=chunk.columns, sep=',')
        except:
            #tb_log.write(str(row)+'\n')
            try:
                for index, row in ck.iterrows():
                    cur.execute(
                    "INSERT INTO title_basics VALUES \
                    (%s, %s, %s, %s, %s, %s, %s, %s, %s)",
                    (row[0],row[1],row[2],row[3],row[4],row[5],\
                     row[6],row[7],row[8]))
            except:
                tb_miss = tb_miss + 1
                tb_log.write(str(row)+'\n')
                #pass
    
    print("Insert [title_basics] complete!")
    print("Miss item number: ")
    print(tb_miss)
    print("Miss item stroe in [title_basics_log1.txt]")
    tb_log.close()  

tb_insert(cur)

I think I figure it out. Use copy_expert can ignore those error lines. And we can do the data cleaning later in psql.

@ellisvalentiner
Copy link
Author

@rehoter-cyber It sounds like your solution is close to what I would suggest: first insert the data into a landing table and then copy over into a destination table, cleaning the data at that point

Out of curiosity, why use Python/psycopg2 to insert rather than inserting directly using psql?

@rehoter-cyber
Copy link

rehoter-cyber commented Jul 1, 2020

@rehoter-cyber It sounds like your solution is close to what I would suggest: first insert the data into a landing table and then copy over into a destination table, cleaning the data at that point

Out of curiosity, why use Python/psycopg2 to insert rather than inserting directly using psql?

LOL,thanks for your reply. First, by applying [copy_expert] the result I think is quite good. It shortens the time of insert from 10 hours to 10 minutes and without any rejection. hahha the reason for choosing the Python/psycopg2 is about the original file is a little bit big (700+MB) and separate (dataset have 6 files) and they are tsv and json files. (I was working on the kaggle IMDB dataset). And I want to see the execution time and memory use while inserting. So choose Python/psycopg2 to handle the task.

@ellisvalentiner
Copy link
Author

ellisvalentiner commented Aug 11, 2020

@rehoter-cyber could you try using the pandas.DataFrame.to_sql method but use the parameter method=psql_insert_copy, where psql_insert_copy is the callable function defined in the Insertion method documentation

@visualisedata
Copy link

I can't get it to work :( I think it's because in the dataframe everything is string but in the table there's a mixture of data types

This is the error I'm getting at the moment:

c.copy_from(sio, "public.sku", columns=df_ora.columns, sep=',')
psycopg2.errors.InvalidTextRepresentation: invalid input syntax for type numeric: ""

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment