Skip to content

Instantly share code, notes, and snippets.

@ellisvalentiner
Last active January 10, 2023 19:03
Show Gist options
  • Select an option

  • Save ellisvalentiner/63b083180afe54f17f16843dd51f4394 to your computer and use it in GitHub Desktop.

Select an option

Save ellisvalentiner/63b083180afe54f17f16843dd51f4394 to your computer and use it in GitHub Desktop.
Recipe for (fast) bulk insert from python Pandas DataFrame to Postgres database
#!/usr/bin/env/python
import psycopg2
import os
from io import StringIO
import pandas as pd
# Get a database connection
dsn = os.environ.get('DB_DSN') # Use ENV vars: keep it secret, keep it safe
conn = psycopg2.connect(dsn)
# Do something to create your dataframe here...
df = pd.read_csv("file.csv")
# Initialize a string buffer
sio = StringIO()
sio.write(df.to_csv(index=None, header=None)) # Write the Pandas DataFrame as a csv to the buffer
sio.seek(0) # Be sure to reset the position to the start of the stream
# Copy the string buffer to the database, as if it were an actual file
with conn.cursor() as c:
c.copy_from(sio, "schema.table", columns=df.columns, sep=',')
conn.commit()
@HQuality
Copy link
Copy Markdown

Hello! Nice work, this is one of the fastest insert methods indeed, however how is it "bulk" exactly if u're putting whole df into memory buffer

@ellisvalentiner
Copy link
Copy Markdown
Author

@HQuality it inserts the data frame in a single transaction rather than per-row

There are probably better ways to do this now using newer versions of psycopg2 and pandas

@HQuality
Copy link
Copy Markdown

HQuality commented May 20, 2020

ah sorry my friend, i read it as 'batch' or some reason :) thanks for your answer

No, still no faster ways to do this kind of inserts.

@rishabh-bhargava
Copy link
Copy Markdown

@ellisvalentiner This is super useful. Wanted to point out a small typo on line 22 where it should be df.columns, I believe.

@rehoter-cyber
Copy link
Copy Markdown

rehoter-cyber commented Jun 29, 2020

Hello! Thanks for your work! I learned a lot! It takes 12 minutes to insert 100W lines former to 10 seconds to insert them now!
I have a little question about rollback things.
I am reading a quite big file, so I use the chunk size about 100W lines per process.
What if one line of a chunk cannot meet the table data type requirement? and I want it shows in the log file?
I think one solution is about to take apart the chunk into a little chunk.
And I think the key to the question is about to position those unacceptable lines. How to fast abandon those lines?

def tb_insert(cursor):
    print("Inserting into [title_basics]......")
    create_tb_table(cursor)
    tb_miss = 0
    tb_log = open("title_basics_log1.txt", "w")
    tb_chunk_counter = -1
    for chunk in tqdm(pd.read_csv("title.basics.tsv", \
                             low_memory=False, sep="\t",\
                             chunksize = 1000000)):
          
        tb_chunk_counter = tb_chunk_counter + 1
        if (tb_chunk_counter == 3):
            break
        ck = chunk
        sio = StringIO()
        sio.write(chunk.to_csv(index=None, header=None))  
        sio.seek(0)
        
        print("Current chunk index = ", tb_chunk_counter)
        try:
            cur.copy_from(sio,'title_basics',columns=chunk.columns, sep=',')
        except:
            #tb_log.write(str(row)+'\n')
            try:
                for index, row in ck.iterrows():
                    cur.execute(
                    "INSERT INTO title_basics VALUES \
                    (%s, %s, %s, %s, %s, %s, %s, %s, %s)",
                    (row[0],row[1],row[2],row[3],row[4],row[5],\
                     row[6],row[7],row[8]))
            except:
                tb_miss = tb_miss + 1
                tb_log.write(str(row)+'\n')
                #pass
    
    print("Insert [title_basics] complete!")
    print("Miss item number: ")
    print(tb_miss)
    print("Miss item stroe in [title_basics_log1.txt]")
    tb_log.close()  

tb_insert(cur)

@rehoter-cyber
Copy link
Copy Markdown

Hello! Thanks for your work! I learned a lot! It takes 12 minutes to insert 100W lines former to 10 seconds to insert them now!
I have a little question about rollback things.
I am reading a quite big file, so I use the chunk size about 100W lines per process.
What if one line of a chunk cannot meet the table data type requirement? and I want it shows in the log file?
I think one solution is about to take apart the chunk into a little chunk.
And I think the key to the question is about to position those unacceptable lines. How to fast abandon those lines?

def tb_insert(cursor):
    print("Inserting into [title_basics]......")
    create_tb_table(cursor)
    tb_miss = 0
    tb_log = open("title_basics_log1.txt", "w")
    tb_chunk_counter = -1
    for chunk in tqdm(pd.read_csv("title.basics.tsv", \
                             low_memory=False, sep="\t",\
                             chunksize = 1000000)):
          
        tb_chunk_counter = tb_chunk_counter + 1
        if (tb_chunk_counter == 3):
            break
        ck = chunk
        sio = StringIO()
        sio.write(chunk.to_csv(index=None, header=None))  
        sio.seek(0)
        
        print("Current chunk index = ", tb_chunk_counter)
        try:
            cur.copy_from(sio,'title_basics',columns=chunk.columns, sep=',')
        except:
            #tb_log.write(str(row)+'\n')
            try:
                for index, row in ck.iterrows():
                    cur.execute(
                    "INSERT INTO title_basics VALUES \
                    (%s, %s, %s, %s, %s, %s, %s, %s, %s)",
                    (row[0],row[1],row[2],row[3],row[4],row[5],\
                     row[6],row[7],row[8]))
            except:
                tb_miss = tb_miss + 1
                tb_log.write(str(row)+'\n')
                #pass
    
    print("Insert [title_basics] complete!")
    print("Miss item number: ")
    print(tb_miss)
    print("Miss item stroe in [title_basics_log1.txt]")
    tb_log.close()  

tb_insert(cur)

I think I figure it out. Use copy_expert can ignore those error lines. And we can do the data cleaning later in psql.

@ellisvalentiner
Copy link
Copy Markdown
Author

@rehoter-cyber It sounds like your solution is close to what I would suggest: first insert the data into a landing table and then copy over into a destination table, cleaning the data at that point

Out of curiosity, why use Python/psycopg2 to insert rather than inserting directly using psql?

@rehoter-cyber
Copy link
Copy Markdown

rehoter-cyber commented Jul 1, 2020

@rehoter-cyber It sounds like your solution is close to what I would suggest: first insert the data into a landing table and then copy over into a destination table, cleaning the data at that point

Out of curiosity, why use Python/psycopg2 to insert rather than inserting directly using psql?

LOL,thanks for your reply. First, by applying [copy_expert] the result I think is quite good. It shortens the time of insert from 10 hours to 10 minutes and without any rejection. hahha the reason for choosing the Python/psycopg2 is about the original file is a little bit big (700+MB) and separate (dataset have 6 files) and they are tsv and json files. (I was working on the kaggle IMDB dataset). And I want to see the execution time and memory use while inserting. So choose Python/psycopg2 to handle the task.

@ellisvalentiner
Copy link
Copy Markdown
Author

ellisvalentiner commented Aug 11, 2020

@rehoter-cyber could you try using the pandas.DataFrame.to_sql method but use the parameter method=psql_insert_copy, where psql_insert_copy is the callable function defined in the Insertion method documentation

@visualisedata
Copy link
Copy Markdown

I can't get it to work :( I think it's because in the dataframe everything is string but in the table there's a mixture of data types

This is the error I'm getting at the moment:

c.copy_from(sio, "public.sku", columns=df_ora.columns, sep=',')
psycopg2.errors.InvalidTextRepresentation: invalid input syntax for type numeric: ""

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment