Last active
September 10, 2024 19:47
-
-
Save hakib/7e723d2c113b947f7920bf55737e4d16 to your computer and use it in GitHub Desktop.
Source for article at https://hakibenita.com/fast-load-data-python-postgresql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# https://hakibenita.com/fast-load-data-python-postgresql | |
from typing import Iterator, Dict, Any, Optional | |
from urllib.parse import urlencode | |
import datetime | |
#------------------------ Profile | |
import time | |
from functools import wraps | |
from memory_profiler import memory_usage | |
def profile(fn): | |
@wraps(fn) | |
def inner(*args, **kwargs): | |
fn_kwargs_str = ', '.join(f'{k}={v}' for k, v in kwargs.items()) | |
print(f'\n{fn.__name__}({fn_kwargs_str})') | |
# Measure time | |
t = time.perf_counter() | |
retval = fn(*args, **kwargs) | |
elapsed = time.perf_counter() - t | |
print(f'Time {elapsed:0.4}') | |
# Measure memory | |
mem, retval = memory_usage((fn, args, kwargs), retval=True, timeout=200, interval=1e-7) | |
print(f'Memory {max(mem) - min(mem)}') | |
return retval | |
return inner | |
#------------------------ Data | |
import requests | |
def iter_beers_from_api(page_size: int = 25) -> Iterator[Dict[str, Any]]: | |
session = requests.Session() | |
page = 1 | |
while True: | |
response = session.get('https://api.punkapi.com/v2/beers?' + urlencode({ | |
'page': page, | |
'per_page': page_size | |
})) | |
response.raise_for_status() | |
data = response.json() | |
if not data: | |
break | |
for beer in data: | |
yield beer | |
page += 1 | |
def iter_beers_from_file(path: str) -> Iterator[Dict[str, Any]]: | |
import json | |
with open(path, 'r') as f: | |
data = json.load(f) | |
for beer in data: | |
yield beer | |
#------------------------ Load | |
def create_staging_table(cursor): | |
cursor.execute(""" | |
DROP TABLE IF EXISTS staging_beers; | |
CREATE TABLE staging_beers ( | |
id INTEGER, | |
name TEXT, | |
tagline TEXT, | |
first_brewed DATE, | |
description TEXT, | |
image_url TEXT, | |
abv DECIMAL, | |
ibu DECIMAL, | |
target_fg DECIMAL, | |
target_og DECIMAL, | |
ebc DECIMAL, | |
srm DECIMAL, | |
ph DECIMAL, | |
attenuation_level DECIMAL, | |
brewers_tips TEXT, | |
contributed_by TEXT, | |
volume INTEGER | |
); | |
""") | |
def parse_first_brewed(text: str) -> datetime.date: | |
parts = text.split('/') | |
if len(parts) == 2: | |
return datetime.date(int(parts[1]), int(parts[0]), 1) | |
elif len(parts) == 1: | |
return datetime.date(int(parts[0]), 1, 1) | |
else: | |
assert False, 'Unknown date format' | |
@profile | |
def insert_one_by_one(connection, beers: Iterator[Dict[str, Any]]) -> None: | |
with connection.cursor() as cursor: | |
create_staging_table(cursor) | |
for beer in beers: | |
cursor.execute(""" | |
INSERT INTO staging_beers VALUES ( | |
%(id)s, | |
%(name)s, | |
%(tagline)s, | |
%(first_brewed)s, | |
%(description)s, | |
%(image_url)s, | |
%(abv)s, | |
%(ibu)s, | |
%(target_fg)s, | |
%(target_og)s, | |
%(ebc)s, | |
%(srm)s, | |
%(ph)s, | |
%(attenuation_level)s, | |
%(brewers_tips)s, | |
%(contributed_by)s, | |
%(volume)s | |
); | |
""", { | |
**beer, | |
'first_brewed': parse_first_brewed(beer['first_brewed']), | |
'volume': beer['volume']['value'], | |
}) | |
# http://initd.org/psycopg/docs/cursor.html#cursor.executemany | |
@profile | |
def insert_executemany(connection, beers: Iterator[Dict[str, Any]]) -> None: | |
with connection.cursor() as cursor: | |
create_staging_table(cursor) | |
all_beers = [{ | |
**beer, | |
'first_brewed': parse_first_brewed(beer['first_brewed']), | |
'volume': beer['volume']['value'], | |
} for beer in beers] | |
cursor.executemany(""" | |
INSERT INTO staging_beers VALUES ( | |
%(id)s, | |
%(name)s, | |
%(tagline)s, | |
%(first_brewed)s, | |
%(description)s, | |
%(image_url)s, | |
%(abv)s, | |
%(ibu)s, | |
%(target_fg)s, | |
%(target_og)s, | |
%(ebc)s, | |
%(srm)s, | |
%(ph)s, | |
%(attenuation_level)s, | |
%(brewers_tips)s, | |
%(contributed_by)s, | |
%(volume)s | |
); | |
""", all_beers) | |
@profile | |
def insert_executemany_iterator(connection, beers: Iterator[Dict[str, Any]]) -> None: | |
with connection.cursor() as cursor: | |
create_staging_table(cursor) | |
cursor.executemany(""" | |
INSERT INTO staging_beers VALUES ( | |
%(id)s, | |
%(name)s, | |
%(tagline)s, | |
%(first_brewed)s, | |
%(description)s, | |
%(image_url)s, | |
%(abv)s, | |
%(ibu)s, | |
%(target_fg)s, | |
%(target_og)s, | |
%(ebc)s, | |
%(srm)s, | |
%(ph)s, | |
%(attenuation_level)s, | |
%(brewers_tips)s, | |
%(contributed_by)s, | |
%(volume)s | |
); | |
""", ({ | |
**beer, | |
'first_brewed': parse_first_brewed(beer['first_brewed']), | |
'volume': beer['volume']['value'], | |
} for beer in beers)) | |
# http://initd.org/psycopg/docs/extras.html#psycopg2.extras.execute_batch | |
import psycopg2.extras | |
@profile | |
def insert_execute_batch(connection, beers: Iterator[Dict[str, Any]], page_size: int = 100) -> None: | |
with connection.cursor() as cursor: | |
create_staging_table(cursor) | |
all_beers = [{ | |
**beer, | |
'first_brewed': parse_first_brewed(beer['first_brewed']), | |
'volume': beer['volume']['value'], | |
} for beer in beers] | |
psycopg2.extras.execute_batch(cursor, """ | |
INSERT INTO staging_beers VALUES ( | |
%(id)s, | |
%(name)s, | |
%(tagline)s, | |
%(first_brewed)s, | |
%(description)s, | |
%(image_url)s, | |
%(abv)s, | |
%(ibu)s, | |
%(target_fg)s, | |
%(target_og)s, | |
%(ebc)s, | |
%(srm)s, | |
%(ph)s, | |
%(attenuation_level)s, | |
%(brewers_tips)s, | |
%(contributed_by)s, | |
%(volume)s | |
); | |
""", all_beers, page_size=page_size) | |
@profile | |
def insert_execute_batch_iterator(connection, beers: Iterator[Dict[str, Any]], page_size: int = 100) -> None: | |
with connection.cursor() as cursor: | |
create_staging_table(cursor) | |
iter_beers = ({ | |
**beer, | |
'first_brewed': parse_first_brewed(beer['first_brewed']), | |
'volume': beer['volume']['value'], | |
} for beer in beers) | |
psycopg2.extras.execute_batch(cursor, """ | |
INSERT INTO staging_beers VALUES ( | |
%(id)s, | |
%(name)s, | |
%(tagline)s, | |
%(first_brewed)s, | |
%(description)s, | |
%(image_url)s, | |
%(abv)s, | |
%(ibu)s, | |
%(target_fg)s, | |
%(target_og)s, | |
%(ebc)s, | |
%(srm)s, | |
%(ph)s, | |
%(attenuation_level)s, | |
%(brewers_tips)s, | |
%(contributed_by)s, | |
%(volume)s | |
); | |
""", iter_beers, page_size=page_size) | |
# http://initd.org/psycopg/docs/extras.html#psycopg2.extras.execute_values | |
import psycopg2.extras | |
@profile | |
def insert_execute_values(connection, beers: Iterator[Dict[str, Any]]) -> None: | |
with connection.cursor() as cursor: | |
create_staging_table(cursor) | |
psycopg2.extras.execute_values(cursor, """ | |
INSERT INTO staging_beers VALUES %s; | |
""", [( | |
beer['id'], | |
beer['name'], | |
beer['tagline'], | |
parse_first_brewed(beer['first_brewed']), | |
beer['description'], | |
beer['image_url'], | |
beer['abv'], | |
beer['ibu'], | |
beer['target_fg'], | |
beer['target_og'], | |
beer['ebc'], | |
beer['srm'], | |
beer['ph'], | |
beer['attenuation_level'], | |
beer['brewers_tips'], | |
beer['contributed_by'], | |
beer['volume']['value'], | |
) for beer in beers]) | |
@profile | |
def insert_execute_values_iterator(connection, beers: Iterator[Dict[str, Any]], page_size: int = 100) -> None: | |
with connection.cursor() as cursor: | |
create_staging_table(cursor) | |
psycopg2.extras.execute_values(cursor, """ | |
INSERT INTO staging_beers VALUES %s; | |
""", (( | |
beer['id'], | |
beer['name'], | |
beer['tagline'], | |
parse_first_brewed(beer['first_brewed']), | |
beer['description'], | |
beer['image_url'], | |
beer['abv'], | |
beer['ibu'], | |
beer['target_fg'], | |
beer['target_og'], | |
beer['ebc'], | |
beer['srm'], | |
beer['ph'], | |
beer['attenuation_level'], | |
beer['brewers_tips'], | |
beer['contributed_by'], | |
beer['volume']['value'], | |
) for beer in beers), page_size=page_size) | |
# http://initd.org/psycopg/docs/cursor.html#cursor.copy_from | |
# https://docs.python.org/3.7/library/io.html?io.StringIO#io.StringIO | |
import io | |
def clean_csv_value(value: Optional[Any]) -> str: | |
if value is None: | |
return r'\N' | |
return str(value).replace('\n', '\\n') | |
@profile | |
def copy_stringio(connection, beers: Iterator[Dict[str, Any]]) -> None: | |
with connection.cursor() as cursor: | |
create_staging_table(cursor) | |
csv_file_like_object = io.StringIO() | |
for beer in beers: | |
csv_file_like_object.write('|'.join(map(clean_csv_value, ( | |
beer['id'], | |
beer['name'], | |
beer['tagline'], | |
parse_first_brewed(beer['first_brewed']), | |
beer['description'], | |
beer['image_url'], | |
beer['abv'], | |
beer['ibu'], | |
beer['target_fg'], | |
beer['target_og'], | |
beer['ebc'], | |
beer['srm'], | |
beer['ph'], | |
beer['attenuation_level'], | |
beer['contributed_by'], | |
beer['brewers_tips'], | |
beer['volume']['value'], | |
))) + '\n') | |
csv_file_like_object.seek(0) | |
cursor.copy_from(csv_file_like_object, 'staging_beers', sep='|') | |
class StringIteratorIO(io.TextIOBase): | |
def __init__(self, iter: Iterator[str]): | |
self._iter = iter | |
self._buff = '' | |
def readable(self) -> bool: | |
return True | |
def _read1(self, n: Optional[int] = None) -> str: | |
while not self._buff: | |
try: | |
self._buff = next(self._iter) | |
except StopIteration: | |
break | |
ret = self._buff[:n] | |
self._buff = self._buff[len(ret):] | |
return ret | |
def read(self, n: Optional[int] = None) -> str: | |
line = [] | |
if n is None or n < 0: | |
while True: | |
m = self._read1() | |
if not m: | |
break | |
line.append(m) | |
else: | |
while n > 0: | |
m = self._read1(n) | |
if not m: | |
break | |
n -= len(m) | |
line.append(m) | |
return ''.join(line) | |
@profile | |
def copy_string_iterator(connection, beers: Iterator[Dict[str, Any]], size: int = 8192) -> None: | |
with connection.cursor() as cursor: | |
create_staging_table(cursor) | |
beers_string_iterator = StringIteratorIO(( | |
'|'.join(map(clean_csv_value, ( | |
beer['id'], | |
beer['name'], | |
beer['tagline'], | |
parse_first_brewed(beer['first_brewed']).isoformat(), | |
beer['description'], | |
beer['image_url'], | |
beer['abv'], | |
beer['ibu'], | |
beer['target_fg'], | |
beer['target_og'], | |
beer['ebc'], | |
beer['srm'], | |
beer['ph'], | |
beer['attenuation_level'], | |
beer['brewers_tips'], | |
beer['contributed_by'], | |
beer['volume']['value'], | |
))) + '\n' | |
for beer in beers | |
)) | |
cursor.copy_from(beers_string_iterator, 'staging_beers', sep='|', size=size) | |
#------------------------ Benchmark | |
connection = psycopg2.connect( | |
host='localhost', | |
database='testload', | |
user='haki', | |
password=None, | |
) | |
connection.set_session(autocommit=True) | |
import psycopg2.extras | |
def test(connection, n: int): | |
# Make sure the data was loaded | |
with connection.cursor(cursor_factory=psycopg2.extras.NamedTupleCursor) as cursor: | |
# Test number of rows. | |
cursor.execute('SELECT COUNT(*) AS cnt FROM staging_beers') | |
record = cursor.fetchone() | |
assert record.cnt == n, f'Expected {n} rows, got {record.cnt} rows!' | |
# Test that the data was loaded, and that transformations were applied correctly. | |
cursor.execute(""" | |
SELECT DISTINCT ON (id) | |
* | |
FROM | |
staging_beers | |
WHERE | |
id IN (1, 235) | |
ORDER BY | |
id; | |
""") | |
beer_1 = cursor.fetchone() | |
assert beer_1.name == 'Buzz' | |
assert beer_1.first_brewed == datetime.date(2007, 9, 1) | |
assert beer_1.volume == 20 | |
beer_235 = cursor.fetchone() | |
assert beer_235.name == 'Mango And Chili Barley Wine' | |
assert beer_235.first_brewed == datetime.date(2016, 1, 1) | |
assert beer_235.volume == 20 | |
beers = list(iter_beers_from_api()) * 100 | |
insert_one_by_one(connection, beers) | |
test(connection, len(beers)) | |
insert_executemany(connection, beers) | |
test(connection, len(beers)) | |
insert_executemany_iterator(connection, beers) | |
test(connection, len(beers)) | |
insert_execute_batch(connection, beers) | |
test(connection, len(beers)) | |
insert_execute_batch_iterator(connection, beers, page_size=1) | |
test(connection, len(beers)) | |
insert_execute_batch_iterator(connection, beers, page_size=100) | |
test(connection, len(beers)) | |
insert_execute_batch_iterator(connection, beers, page_size=1000) | |
test(connection, len(beers)) | |
insert_execute_batch_iterator(connection, beers, page_size=10000) | |
test(connection, len(beers)) | |
insert_execute_values(connection, beers) | |
test(connection, len(beers)) | |
insert_execute_values_iterator(connection, beers, page_size=1) | |
test(connection, len(beers)) | |
insert_execute_values_iterator(connection, beers, page_size=100) | |
test(connection, len(beers)) | |
insert_execute_values_iterator(connection, beers, page_size=1000) | |
test(connection, len(beers)) | |
insert_execute_values_iterator(connection, beers, page_size=10000) | |
test(connection, len(beers)) | |
copy_stringio(connection, beers) | |
test(connection, len(beers)) | |
copy_string_iterator(connection, beers, size=1024) | |
test(connection, len(beers)) | |
copy_string_iterator(connection, beers, size=1024 * 8) | |
test(connection, len(beers)) | |
copy_string_iterator(connection, beers, size=1024 * 16) | |
test(connection, len(beers)) | |
copy_string_iterator(connection, beers, size=1024 * 64) | |
test(connection, len(beers)) |
Try using TextIOWrapper.
I used the StringIteratorIO to load data from csv into postgreSQL using the copy_from method available in psycopg2. It is quite fast. With my infrastructure, I was able to load 32 million rows in less than 8 minutes. That was quite impressive. The only observation that I had with it was the buffer/cache went up and it remained high even after the program completed execution. Any suggestion to avoid it?
This is amazing. Thank you. The table parameter in copy_from does not accept 'schema.table' reference, therefore I had to use copy_expert. The code below works -- I did not test for memory and speed. The dataframe columns/data/dtypes match the table so I do not specify columns clearly.
def insert_with_string_io(df: pd.DataFrame):
buffer = io.StringIO()
df.to_csv(buffer, index=False, header=False)
buffer.seek(0)
with conn.cursor() as cursor:
try:
cursor.copy_expert(f"COPY <database>.<schema>.<table> FROM STDIN (FORMAT 'csv', HEADER false)" , buffer)
except (Exception, psycopg2.DatabaseError) as error:
print("Error: %s" % error)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This is wonderful. I am eternally grateful for the efforts you have put in.
I am however facing a problem.
I am copying a 25MB-ish CSV file from an s3 bucket to a Postgres database. Everything in the code is the same except for the part where I convert the
byte
data (that comes from s3 response [StreamingResponseBody
]) to autf-8
string data inside the_read1()
function. And the copy is extremely slow. The 25mb-ish file takes more than 2 secs to copy.Do you think this is causing the issue? Also is there an alternative to what I am using?