-
-
Save hakib/7e723d2c113b947f7920bf55737e4d16 to your computer and use it in GitHub Desktop.
# https://hakibenita.com/fast-load-data-python-postgresql | |
from typing import Iterator, Dict, Any, Optional | |
from urllib.parse import urlencode | |
import datetime | |
#------------------------ Profile | |
import time | |
from functools import wraps | |
from memory_profiler import memory_usage | |
def profile(fn): | |
@wraps(fn) | |
def inner(*args, **kwargs): | |
fn_kwargs_str = ', '.join(f'{k}={v}' for k, v in kwargs.items()) | |
print(f'\n{fn.__name__}({fn_kwargs_str})') | |
# Measure time | |
t = time.perf_counter() | |
retval = fn(*args, **kwargs) | |
elapsed = time.perf_counter() - t | |
print(f'Time {elapsed:0.4}') | |
# Measure memory | |
mem, retval = memory_usage((fn, args, kwargs), retval=True, timeout=200, interval=1e-7) | |
print(f'Memory {max(mem) - min(mem)}') | |
return retval | |
return inner | |
#------------------------ Data | |
import requests | |
def iter_beers_from_api(page_size: int = 25) -> Iterator[Dict[str, Any]]: | |
session = requests.Session() | |
page = 1 | |
while True: | |
response = session.get('https://api.punkapi.com/v2/beers?' + urlencode({ | |
'page': page, | |
'per_page': page_size | |
})) | |
response.raise_for_status() | |
data = response.json() | |
if not data: | |
break | |
for beer in data: | |
yield beer | |
page += 1 | |
def iter_beers_from_file(path: str) -> Iterator[Dict[str, Any]]: | |
import json | |
with open(path, 'r') as f: | |
data = json.load(f) | |
for beer in data: | |
yield beer | |
#------------------------ Load | |
def create_staging_table(cursor): | |
cursor.execute(""" | |
DROP TABLE IF EXISTS staging_beers; | |
CREATE TABLE staging_beers ( | |
id INTEGER, | |
name TEXT, | |
tagline TEXT, | |
first_brewed DATE, | |
description TEXT, | |
image_url TEXT, | |
abv DECIMAL, | |
ibu DECIMAL, | |
target_fg DECIMAL, | |
target_og DECIMAL, | |
ebc DECIMAL, | |
srm DECIMAL, | |
ph DECIMAL, | |
attenuation_level DECIMAL, | |
brewers_tips TEXT, | |
contributed_by TEXT, | |
volume INTEGER | |
); | |
""") | |
def parse_first_brewed(text: str) -> datetime.date: | |
parts = text.split('/') | |
if len(parts) == 2: | |
return datetime.date(int(parts[1]), int(parts[0]), 1) | |
elif len(parts) == 1: | |
return datetime.date(int(parts[0]), 1, 1) | |
else: | |
assert False, 'Unknown date format' | |
@profile | |
def insert_one_by_one(connection, beers: Iterator[Dict[str, Any]]) -> None: | |
with connection.cursor() as cursor: | |
create_staging_table(cursor) | |
for beer in beers: | |
cursor.execute(""" | |
INSERT INTO staging_beers VALUES ( | |
%(id)s, | |
%(name)s, | |
%(tagline)s, | |
%(first_brewed)s, | |
%(description)s, | |
%(image_url)s, | |
%(abv)s, | |
%(ibu)s, | |
%(target_fg)s, | |
%(target_og)s, | |
%(ebc)s, | |
%(srm)s, | |
%(ph)s, | |
%(attenuation_level)s, | |
%(brewers_tips)s, | |
%(contributed_by)s, | |
%(volume)s | |
); | |
""", { | |
**beer, | |
'first_brewed': parse_first_brewed(beer['first_brewed']), | |
'volume': beer['volume']['value'], | |
}) | |
# http://initd.org/psycopg/docs/cursor.html#cursor.executemany | |
@profile | |
def insert_executemany(connection, beers: Iterator[Dict[str, Any]]) -> None: | |
with connection.cursor() as cursor: | |
create_staging_table(cursor) | |
all_beers = [{ | |
**beer, | |
'first_brewed': parse_first_brewed(beer['first_brewed']), | |
'volume': beer['volume']['value'], | |
} for beer in beers] | |
cursor.executemany(""" | |
INSERT INTO staging_beers VALUES ( | |
%(id)s, | |
%(name)s, | |
%(tagline)s, | |
%(first_brewed)s, | |
%(description)s, | |
%(image_url)s, | |
%(abv)s, | |
%(ibu)s, | |
%(target_fg)s, | |
%(target_og)s, | |
%(ebc)s, | |
%(srm)s, | |
%(ph)s, | |
%(attenuation_level)s, | |
%(brewers_tips)s, | |
%(contributed_by)s, | |
%(volume)s | |
); | |
""", all_beers) | |
@profile | |
def insert_executemany_iterator(connection, beers: Iterator[Dict[str, Any]]) -> None: | |
with connection.cursor() as cursor: | |
create_staging_table(cursor) | |
cursor.executemany(""" | |
INSERT INTO staging_beers VALUES ( | |
%(id)s, | |
%(name)s, | |
%(tagline)s, | |
%(first_brewed)s, | |
%(description)s, | |
%(image_url)s, | |
%(abv)s, | |
%(ibu)s, | |
%(target_fg)s, | |
%(target_og)s, | |
%(ebc)s, | |
%(srm)s, | |
%(ph)s, | |
%(attenuation_level)s, | |
%(brewers_tips)s, | |
%(contributed_by)s, | |
%(volume)s | |
); | |
""", ({ | |
**beer, | |
'first_brewed': parse_first_brewed(beer['first_brewed']), | |
'volume': beer['volume']['value'], | |
} for beer in beers)) | |
# http://initd.org/psycopg/docs/extras.html#psycopg2.extras.execute_batch | |
import psycopg2.extras | |
@profile | |
def insert_execute_batch(connection, beers: Iterator[Dict[str, Any]], page_size: int = 100) -> None: | |
with connection.cursor() as cursor: | |
create_staging_table(cursor) | |
all_beers = [{ | |
**beer, | |
'first_brewed': parse_first_brewed(beer['first_brewed']), | |
'volume': beer['volume']['value'], | |
} for beer in beers] | |
psycopg2.extras.execute_batch(cursor, """ | |
INSERT INTO staging_beers VALUES ( | |
%(id)s, | |
%(name)s, | |
%(tagline)s, | |
%(first_brewed)s, | |
%(description)s, | |
%(image_url)s, | |
%(abv)s, | |
%(ibu)s, | |
%(target_fg)s, | |
%(target_og)s, | |
%(ebc)s, | |
%(srm)s, | |
%(ph)s, | |
%(attenuation_level)s, | |
%(brewers_tips)s, | |
%(contributed_by)s, | |
%(volume)s | |
); | |
""", all_beers, page_size=page_size) | |
@profile | |
def insert_execute_batch_iterator(connection, beers: Iterator[Dict[str, Any]], page_size: int = 100) -> None: | |
with connection.cursor() as cursor: | |
create_staging_table(cursor) | |
iter_beers = ({ | |
**beer, | |
'first_brewed': parse_first_brewed(beer['first_brewed']), | |
'volume': beer['volume']['value'], | |
} for beer in beers) | |
psycopg2.extras.execute_batch(cursor, """ | |
INSERT INTO staging_beers VALUES ( | |
%(id)s, | |
%(name)s, | |
%(tagline)s, | |
%(first_brewed)s, | |
%(description)s, | |
%(image_url)s, | |
%(abv)s, | |
%(ibu)s, | |
%(target_fg)s, | |
%(target_og)s, | |
%(ebc)s, | |
%(srm)s, | |
%(ph)s, | |
%(attenuation_level)s, | |
%(brewers_tips)s, | |
%(contributed_by)s, | |
%(volume)s | |
); | |
""", iter_beers, page_size=page_size) | |
# http://initd.org/psycopg/docs/extras.html#psycopg2.extras.execute_values | |
import psycopg2.extras | |
@profile | |
def insert_execute_values(connection, beers: Iterator[Dict[str, Any]]) -> None: | |
with connection.cursor() as cursor: | |
create_staging_table(cursor) | |
psycopg2.extras.execute_values(cursor, """ | |
INSERT INTO staging_beers VALUES %s; | |
""", [( | |
beer['id'], | |
beer['name'], | |
beer['tagline'], | |
parse_first_brewed(beer['first_brewed']), | |
beer['description'], | |
beer['image_url'], | |
beer['abv'], | |
beer['ibu'], | |
beer['target_fg'], | |
beer['target_og'], | |
beer['ebc'], | |
beer['srm'], | |
beer['ph'], | |
beer['attenuation_level'], | |
beer['brewers_tips'], | |
beer['contributed_by'], | |
beer['volume']['value'], | |
) for beer in beers]) | |
@profile | |
def insert_execute_values_iterator(connection, beers: Iterator[Dict[str, Any]], page_size: int = 100) -> None: | |
with connection.cursor() as cursor: | |
create_staging_table(cursor) | |
psycopg2.extras.execute_values(cursor, """ | |
INSERT INTO staging_beers VALUES %s; | |
""", (( | |
beer['id'], | |
beer['name'], | |
beer['tagline'], | |
parse_first_brewed(beer['first_brewed']), | |
beer['description'], | |
beer['image_url'], | |
beer['abv'], | |
beer['ibu'], | |
beer['target_fg'], | |
beer['target_og'], | |
beer['ebc'], | |
beer['srm'], | |
beer['ph'], | |
beer['attenuation_level'], | |
beer['brewers_tips'], | |
beer['contributed_by'], | |
beer['volume']['value'], | |
) for beer in beers), page_size=page_size) | |
# http://initd.org/psycopg/docs/cursor.html#cursor.copy_from | |
# https://docs.python.org/3.7/library/io.html?io.StringIO#io.StringIO | |
import io | |
def clean_csv_value(value: Optional[Any]) -> str: | |
if value is None: | |
return r'\N' | |
return str(value).replace('\n', '\\n') | |
@profile | |
def copy_stringio(connection, beers: Iterator[Dict[str, Any]]) -> None: | |
with connection.cursor() as cursor: | |
create_staging_table(cursor) | |
csv_file_like_object = io.StringIO() | |
for beer in beers: | |
csv_file_like_object.write('|'.join(map(clean_csv_value, ( | |
beer['id'], | |
beer['name'], | |
beer['tagline'], | |
parse_first_brewed(beer['first_brewed']), | |
beer['description'], | |
beer['image_url'], | |
beer['abv'], | |
beer['ibu'], | |
beer['target_fg'], | |
beer['target_og'], | |
beer['ebc'], | |
beer['srm'], | |
beer['ph'], | |
beer['attenuation_level'], | |
beer['contributed_by'], | |
beer['brewers_tips'], | |
beer['volume']['value'], | |
))) + '\n') | |
csv_file_like_object.seek(0) | |
cursor.copy_from(csv_file_like_object, 'staging_beers', sep='|') | |
class StringIteratorIO(io.TextIOBase): | |
def __init__(self, iter: Iterator[str]): | |
self._iter = iter | |
self._buff = '' | |
def readable(self) -> bool: | |
return True | |
def _read1(self, n: Optional[int] = None) -> str: | |
while not self._buff: | |
try: | |
self._buff = next(self._iter) | |
except StopIteration: | |
break | |
ret = self._buff[:n] | |
self._buff = self._buff[len(ret):] | |
return ret | |
def read(self, n: Optional[int] = None) -> str: | |
line = [] | |
if n is None or n < 0: | |
while True: | |
m = self._read1() | |
if not m: | |
break | |
line.append(m) | |
else: | |
while n > 0: | |
m = self._read1(n) | |
if not m: | |
break | |
n -= len(m) | |
line.append(m) | |
return ''.join(line) | |
@profile | |
def copy_string_iterator(connection, beers: Iterator[Dict[str, Any]], size: int = 8192) -> None: | |
with connection.cursor() as cursor: | |
create_staging_table(cursor) | |
beers_string_iterator = StringIteratorIO(( | |
'|'.join(map(clean_csv_value, ( | |
beer['id'], | |
beer['name'], | |
beer['tagline'], | |
parse_first_brewed(beer['first_brewed']).isoformat(), | |
beer['description'], | |
beer['image_url'], | |
beer['abv'], | |
beer['ibu'], | |
beer['target_fg'], | |
beer['target_og'], | |
beer['ebc'], | |
beer['srm'], | |
beer['ph'], | |
beer['attenuation_level'], | |
beer['brewers_tips'], | |
beer['contributed_by'], | |
beer['volume']['value'], | |
))) + '\n' | |
for beer in beers | |
)) | |
cursor.copy_from(beers_string_iterator, 'staging_beers', sep='|', size=size) | |
#------------------------ Benchmark | |
connection = psycopg2.connect( | |
host='localhost', | |
database='testload', | |
user='haki', | |
password=None, | |
) | |
connection.set_session(autocommit=True) | |
import psycopg2.extras | |
def test(connection, n: int): | |
# Make sure the data was loaded | |
with connection.cursor(cursor_factory=psycopg2.extras.NamedTupleCursor) as cursor: | |
# Test number of rows. | |
cursor.execute('SELECT COUNT(*) AS cnt FROM staging_beers') | |
record = cursor.fetchone() | |
assert record.cnt == n, f'Expected {n} rows, got {record.cnt} rows!' | |
# Test that the data was loaded, and that transformations were applied correctly. | |
cursor.execute(""" | |
SELECT DISTINCT ON (id) | |
* | |
FROM | |
staging_beers | |
WHERE | |
id IN (1, 235) | |
ORDER BY | |
id; | |
""") | |
beer_1 = cursor.fetchone() | |
assert beer_1.name == 'Buzz' | |
assert beer_1.first_brewed == datetime.date(2007, 9, 1) | |
assert beer_1.volume == 20 | |
beer_235 = cursor.fetchone() | |
assert beer_235.name == 'Mango And Chili Barley Wine' | |
assert beer_235.first_brewed == datetime.date(2016, 1, 1) | |
assert beer_235.volume == 20 | |
beers = list(iter_beers_from_api()) * 100 | |
insert_one_by_one(connection, beers) | |
test(connection, len(beers)) | |
insert_executemany(connection, beers) | |
test(connection, len(beers)) | |
insert_executemany_iterator(connection, beers) | |
test(connection, len(beers)) | |
insert_execute_batch(connection, beers) | |
test(connection, len(beers)) | |
insert_execute_batch_iterator(connection, beers, page_size=1) | |
test(connection, len(beers)) | |
insert_execute_batch_iterator(connection, beers, page_size=100) | |
test(connection, len(beers)) | |
insert_execute_batch_iterator(connection, beers, page_size=1000) | |
test(connection, len(beers)) | |
insert_execute_batch_iterator(connection, beers, page_size=10000) | |
test(connection, len(beers)) | |
insert_execute_values(connection, beers) | |
test(connection, len(beers)) | |
insert_execute_values_iterator(connection, beers, page_size=1) | |
test(connection, len(beers)) | |
insert_execute_values_iterator(connection, beers, page_size=100) | |
test(connection, len(beers)) | |
insert_execute_values_iterator(connection, beers, page_size=1000) | |
test(connection, len(beers)) | |
insert_execute_values_iterator(connection, beers, page_size=10000) | |
test(connection, len(beers)) | |
copy_stringio(connection, beers) | |
test(connection, len(beers)) | |
copy_string_iterator(connection, beers, size=1024) | |
test(connection, len(beers)) | |
copy_string_iterator(connection, beers, size=1024 * 8) | |
test(connection, len(beers)) | |
copy_string_iterator(connection, beers, size=1024 * 16) | |
test(connection, len(beers)) | |
copy_string_iterator(connection, beers, size=1024 * 64) | |
test(connection, len(beers)) |
@Tiwo1991 glad you found the problem. I think that instead of finding a way to skip this line it's best to look for a way to not write it in the first place (check if current line is last line and not write \n
?)
Just wanted to comment w/ a shoutout on this amazingly well explained and researched example. Great work @hakib
@ededdneddyfan Thanks :)
Hi there, before my comment I will join the rest of the members congratulating @hakib. Great and useful job.
My question is more of a doubt. What is the difference between insert_execute_batch and insert_execute_batch_iterator??
The only difference I found is changing all_beers to iter_beers.
Best regards
It's subtle difference indeed. The function insert_execute_batch
first evaluates the entire list:
all_beers = [{ ... }]
Notice how all_beers
is a list, not a generator.
The function insert_execute_batch_iterator
is using a generator instead:
iter_beers = ({ ... })
Notice that iter_beers
is using a round brackets, so the iterator is not evaluated immediately which should reduce the memory footprint.
Thank you for the quick reply. I actually missed that.
Now with your post I'm learning how to optimize insert and some Python orthography at the same time :)
This is wonderful. I am eternally grateful for the efforts you have put in.
I am however facing a problem.
I am copying a 25MB-ish CSV file from an s3 bucket to a Postgres database. Everything in the code is the same except for the part where I convert the byte
data (that comes from s3 response [StreamingResponseBody
]) to a utf-8
string data inside the _read1()
function. And the copy is extremely slow. The 25mb-ish file takes more than 2 secs to copy.
def _read1(self, n=None):
while not self._buff:
try:
self._buff = next(self._iter)
self._buff = self._buff.decode("utf-8") # <--------this line here
except StopIteration:
break
ret = self._buff[:n]
self._buff = self._buff[len(ret):]
return ret
Do you think this is causing the issue? Also is there an alternative to what I am using?
Try using TextIOWrapper.
I used the StringIteratorIO to load data from csv into postgreSQL using the copy_from method available in psycopg2. It is quite fast. With my infrastructure, I was able to load 32 million rows in less than 8 minutes. That was quite impressive. The only observation that I had with it was the buffer/cache went up and it remained high even after the program completed execution. Any suggestion to avoid it?
This is amazing. Thank you. The table parameter in copy_from does not accept 'schema.table' reference, therefore I had to use copy_expert. The code below works -- I did not test for memory and speed. The dataframe columns/data/dtypes match the table so I do not specify columns clearly.
def insert_with_string_io(df: pd.DataFrame):
buffer = io.StringIO()
df.to_csv(buffer, index=False, header=False)
buffer.seek(0)
with conn.cursor() as cursor:
try:
cursor.copy_expert(f"COPY <database>.<schema>.<table> FROM STDIN (FORMAT 'csv', HEADER false)" , buffer)
except (Exception, psycopg2.DatabaseError) as error:
print("Error: %s" % error)
@hakib, Thank you for the quick response. I dumped the contents into a text file in the following manner:
with open(r"text.txt","w") as file: w = csv.writer(file) for row in filterfalse(lambda line: "#" in line.get('date'), log_lines): json.dump(row,file) file.write('\n')
The last line in the txt file is an empty line. I expect it to be the culprit. If so I need to find a way for it to be skipped by copy_row() somehow.