-
-
Save mzpqnxow/bb13b75f850e4810ffde426082032a77 to your computer and use it in GitHub Desktop.
psycopg2 execute_values wrapper for accurate row counts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# One of the fastest ways to insert bulk data into Postgres (at least, aside from COPY) is using the psycopg2 extras function execute_values. | |
# However, this doesn't return an accurate row count value - instead, it just returns the row count for the last page inserted. | |
# This wraps the execute_values function with its own pagination to return an accurate count of rows inserted. | |
# Performance is approximately equivalent to underlying execute_values function - within 5-10% or so in my brief tests. | |
import psycopg2 | |
import psycopg2.extras | |
import math | |
db_connection_string = "dbname=EDITME host=EDITME" | |
def query_function(data, cur, page_size=500): | |
"""Runs execute_values function. Update with your values and query template for 'data' input. | |
'data' should be a list of dicts. Map dict keys to insert columns, if needed, in query_template.""" | |
insert_query = """insert into table_name (column1, column2) values %s""" | |
query_template = """( %(column1)s, %(column2)s )""" | |
# could probably support other sequences here too | |
if type(data) is not list: | |
print("Error: query_function() requires 'data' input be a list of dicts in order to perfom the insert correctly.") | |
return 0 | |
psycopg2.extras.execute_values(cur, insert_query, data, | |
template=query_template, page_size=page_size) | |
rowcount = cur.rowcount | |
return rowcount | |
def insert_paginator(data, page_size=500, debug=0): | |
""" Psycopg2's execute_values method doesn't retain accurate insert row counts if data length > page size. | |
This wraps an execute_values query function to get accurate counts.""" | |
with psycopg2.connect(db_connection_string) as conn: | |
with conn.cursor() as cur: | |
if len(data) > page_size: | |
# we'll need to do pagination and calculate our own # of rows inserted. | |
pages_needed = math.ceil(len(data) / page_size) | |
if debug: | |
print("we'll need to do", pages_needed, "page inserts of", | |
page_size, "values each to get through the", len(data), "input values") | |
detail_rows_inserted = [] # source and inserted counts by page. | |
rows_inserted = 0 | |
for i in range(pages_needed): | |
page_num = i | |
start_row = i * page_size | |
if page_num < (pages_needed - 1): | |
end_row = ((i+1) * page_size) | |
else: | |
# last page. | |
end_row = len(data) | |
page_rows = query_function(data[start_row:end_row], page_size=page_size, cur=cur) | |
rows_inserted += page_rows | |
if debug: | |
detail_dict = { "page": page_num, | |
"start row number": start_row, | |
"source row count": len(data[start_row:end_row]), | |
"inserted row count": page_rows } | |
print(detail_dict) | |
detail_rows_inserted.append(detail_dict) | |
else: | |
rows_inserted = query_function(data, page_size=page_size, cur=cur) | |
if debug: | |
return rows_inserted, detail_rows_inserted | |
else: | |
return rows_inserted |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment