Last active
April 15, 2020 22:02
-
-
Save evan-burke/53f990c6a68ccabc91eefd71b5483441 to your computer and use it in GitHub Desktop.
psycopg2 execute_values wrapper for accurate row counts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# One of the fastest ways to insert bulk data into Postgres (at least, aside from COPY) is using the psycopg2 extras function execute_values. | |
# However, this doesn't return an accurate row count value - instead, it just returns the row count for the last page inserted. | |
# This wraps the execute_values function with its own pagination to return an accurate count of rows inserted. | |
# Performance is approximately equivalent to underlying execute_values function - within 5-10% or so in my brief tests. | |
import psycopg2 | |
import psycopg2.extras | |
import math | |
db_connection_string = "dbname=EDITME host=EDITME" | |
def query_function(data, cur, page_size=500): | |
"""Runs execute_values function. Update with your values and query template for 'data' input. | |
'data' should be a list of dicts. Map dict keys to insert columns, if needed, in query_template.""" | |
insert_query = """insert into table_name (column1, column2) values %s""" | |
query_template = """( %(column1)s, %(column2)s )""" | |
# could probably support other sequences here too | |
if type(data) is not list: | |
print("Error: query_function() requires 'data' input be a list of dicts in order to perfom the insert correctly.") | |
return 0 | |
psycopg2.extras.execute_values(cur, insert_query, data, | |
template=query_template, page_size=page_size) | |
rowcount = cur.rowcount | |
return rowcount | |
def insert_paginator(data, page_size=500, debug=0): | |
""" Psycopg2's execute_values method doesn't retain accurate insert row counts if data length > page size. | |
This wraps an execute_values query function to get accurate counts.""" | |
with psycopg2.connect(db_connection_string) as conn: | |
with conn.cursor() as cur: | |
if len(data) > page_size: | |
# we'll need to do pagination and calculate our own # of rows inserted. | |
pages_needed = math.ceil(len(data) / page_size) | |
if debug: | |
print("we'll need to do", pages_needed, "page inserts of", | |
page_size, "values each to get through the", len(data), "input values") | |
detail_rows_inserted = [] # source and inserted counts by page. | |
rows_inserted = 0 | |
for i in range(pages_needed): | |
page_num = i | |
start_row = i * page_size | |
if page_num < (pages_needed - 1): | |
end_row = ((i+1) * page_size) | |
else: | |
# last page. | |
end_row = len(data) | |
page_rows = query_function(data[start_row:end_row], page_size=page_size, cur=cur) | |
rows_inserted += page_rows | |
if debug: | |
detail_dict = { "page": page_num, | |
"start row number": start_row, | |
"source row count": len(data[start_row:end_row]), | |
"inserted row count": page_rows } | |
print(detail_dict) | |
detail_rows_inserted.append(detail_dict) | |
else: | |
rows_inserted = query_function(data, page_size=page_size, cur=cur) | |
if debug: | |
return rows_inserted, detail_rows_inserted | |
else: | |
return rows_inserted |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
It is intentional for the execute_values method for some reasons discussed briefly here (and in similar github issues): psycopg/psycopg2#540
Still, it's a bit frustrating as a user. The execute_values method is relatively new, having been released in Feb 2017 - so I think they will end up fixing that behavior at some point.