-
-
Save ellisvalentiner/63b083180afe54f17f16843dd51f4394 to your computer and use it in GitHub Desktop.
#!/usr/bin/env/python | |
import psycopg2 | |
import os | |
from io import StringIO | |
import pandas as pd | |
# Get a database connection | |
dsn = os.environ.get('DB_DSN') # Use ENV vars: keep it secret, keep it safe | |
conn = psycopg2.connect(dsn) | |
# Do something to create your dataframe here... | |
df = pd.read_csv("file.csv") | |
# Initialize a string buffer | |
sio = StringIO() | |
sio.write(df.to_csv(index=None, header=None)) # Write the Pandas DataFrame as a csv to the buffer | |
sio.seek(0) # Be sure to reset the position to the start of the stream | |
# Copy the string buffer to the database, as if it were an actual file | |
with conn.cursor() as c: | |
c.copy_from(sio, "schema.table", columns=df.columns, sep=',') | |
conn.commit() |
@ellisvalentiner This is super useful. Wanted to point out a small typo on line 22 where it should be df.columns
, I believe.
Hello! Thanks for your work! I learned a lot! It takes 12 minutes to insert 100W lines former to 10 seconds to insert them now!
I have a little question about rollback things.
I am reading a quite big file, so I use the chunk size about 100W lines per process.
What if one line of a chunk cannot meet the table data type requirement? and I want it shows in the log file?
I think one solution is about to take apart the chunk into a little chunk.
And I think the key to the question is about to position those unacceptable lines. How to fast abandon those lines?
def tb_insert(cursor):
print("Inserting into [title_basics]......")
create_tb_table(cursor)
tb_miss = 0
tb_log = open("title_basics_log1.txt", "w")
tb_chunk_counter = -1
for chunk in tqdm(pd.read_csv("title.basics.tsv", \
low_memory=False, sep="\t",\
chunksize = 1000000)):
tb_chunk_counter = tb_chunk_counter + 1
if (tb_chunk_counter == 3):
break
ck = chunk
sio = StringIO()
sio.write(chunk.to_csv(index=None, header=None))
sio.seek(0)
print("Current chunk index = ", tb_chunk_counter)
try:
cur.copy_from(sio,'title_basics',columns=chunk.columns, sep=',')
except:
#tb_log.write(str(row)+'\n')
try:
for index, row in ck.iterrows():
cur.execute(
"INSERT INTO title_basics VALUES \
(%s, %s, %s, %s, %s, %s, %s, %s, %s)",
(row[0],row[1],row[2],row[3],row[4],row[5],\
row[6],row[7],row[8]))
except:
tb_miss = tb_miss + 1
tb_log.write(str(row)+'\n')
#pass
print("Insert [title_basics] complete!")
print("Miss item number: ")
print(tb_miss)
print("Miss item stroe in [title_basics_log1.txt]")
tb_log.close()
tb_insert(cur)
Hello! Thanks for your work! I learned a lot! It takes 12 minutes to insert 100W lines former to 10 seconds to insert them now!
I have a little question about rollback things.
I am reading a quite big file, so I use the chunk size about 100W lines per process.
What if one line of a chunk cannot meet the table data type requirement? and I want it shows in the log file?
I think one solution is about to take apart the chunk into a little chunk.
And I think the key to the question is about to position those unacceptable lines. How to fast abandon those lines?def tb_insert(cursor): print("Inserting into [title_basics]......") create_tb_table(cursor) tb_miss = 0 tb_log = open("title_basics_log1.txt", "w") tb_chunk_counter = -1 for chunk in tqdm(pd.read_csv("title.basics.tsv", \ low_memory=False, sep="\t",\ chunksize = 1000000)): tb_chunk_counter = tb_chunk_counter + 1 if (tb_chunk_counter == 3): break ck = chunk sio = StringIO() sio.write(chunk.to_csv(index=None, header=None)) sio.seek(0) print("Current chunk index = ", tb_chunk_counter) try: cur.copy_from(sio,'title_basics',columns=chunk.columns, sep=',') except: #tb_log.write(str(row)+'\n') try: for index, row in ck.iterrows(): cur.execute( "INSERT INTO title_basics VALUES \ (%s, %s, %s, %s, %s, %s, %s, %s, %s)", (row[0],row[1],row[2],row[3],row[4],row[5],\ row[6],row[7],row[8])) except: tb_miss = tb_miss + 1 tb_log.write(str(row)+'\n') #pass print("Insert [title_basics] complete!") print("Miss item number: ") print(tb_miss) print("Miss item stroe in [title_basics_log1.txt]") tb_log.close() tb_insert(cur)
I think I figure it out. Use copy_expert can ignore those error lines. And we can do the data cleaning later in psql.
@rehoter-cyber It sounds like your solution is close to what I would suggest: first insert the data into a landing table and then copy over into a destination table, cleaning the data at that point
Out of curiosity, why use Python/psycopg2 to insert rather than inserting directly using psql
?
@rehoter-cyber It sounds like your solution is close to what I would suggest: first insert the data into a landing table and then copy over into a destination table, cleaning the data at that point
Out of curiosity, why use Python/psycopg2 to insert rather than inserting directly using
psql
?
LOL,thanks for your reply. First, by applying [copy_expert] the result I think is quite good. It shortens the time of insert from 10 hours to 10 minutes and without any rejection. hahha the reason for choosing the Python/psycopg2 is about the original file is a little bit big (700+MB) and separate (dataset have 6 files) and they are tsv and json files. (I was working on the kaggle IMDB dataset). And I want to see the execution time and memory use while inserting. So choose Python/psycopg2 to handle the task.
@rehoter-cyber could you try using the pandas.DataFrame.to_sql
method but use the parameter method=psql_insert_copy
, where psql_insert_copy
is the callable function defined in the Insertion method documentation
I can't get it to work :( I think it's because in the dataframe everything is string but in the table there's a mixture of data types
This is the error I'm getting at the moment:
c.copy_from(sio, "public.sku", columns=df_ora.columns, sep=',')
psycopg2.errors.InvalidTextRepresentation: invalid input syntax for type numeric: ""
ah sorry my friend, i read it as 'batch' or some reason :) thanks for your answer
No, still no faster ways to do this kind of inserts.