-
-
Save hakib/7e723d2c113b947f7920bf55737e4d16 to your computer and use it in GitHub Desktop.
# https://hakibenita.com/fast-load-data-python-postgresql | |
from typing import Iterator, Dict, Any, Optional | |
from urllib.parse import urlencode | |
import datetime | |
#------------------------ Profile | |
import time | |
from functools import wraps | |
from memory_profiler import memory_usage | |
def profile(fn): | |
@wraps(fn) | |
def inner(*args, **kwargs): | |
fn_kwargs_str = ', '.join(f'{k}={v}' for k, v in kwargs.items()) | |
print(f'\n{fn.__name__}({fn_kwargs_str})') | |
# Measure time | |
t = time.perf_counter() | |
retval = fn(*args, **kwargs) | |
elapsed = time.perf_counter() - t | |
print(f'Time {elapsed:0.4}') | |
# Measure memory | |
mem, retval = memory_usage((fn, args, kwargs), retval=True, timeout=200, interval=1e-7) | |
print(f'Memory {max(mem) - min(mem)}') | |
return retval | |
return inner | |
#------------------------ Data | |
import requests | |
def iter_beers_from_api(page_size: int = 25) -> Iterator[Dict[str, Any]]: | |
session = requests.Session() | |
page = 1 | |
while True: | |
response = session.get('https://api.punkapi.com/v2/beers?' + urlencode({ | |
'page': page, | |
'per_page': page_size | |
})) | |
response.raise_for_status() | |
data = response.json() | |
if not data: | |
break | |
for beer in data: | |
yield beer | |
page += 1 | |
def iter_beers_from_file(path: str) -> Iterator[Dict[str, Any]]: | |
import json | |
with open(path, 'r') as f: | |
data = json.load(f) | |
for beer in data: | |
yield beer | |
#------------------------ Load | |
def create_staging_table(cursor): | |
cursor.execute(""" | |
DROP TABLE IF EXISTS staging_beers; | |
CREATE TABLE staging_beers ( | |
id INTEGER, | |
name TEXT, | |
tagline TEXT, | |
first_brewed DATE, | |
description TEXT, | |
image_url TEXT, | |
abv DECIMAL, | |
ibu DECIMAL, | |
target_fg DECIMAL, | |
target_og DECIMAL, | |
ebc DECIMAL, | |
srm DECIMAL, | |
ph DECIMAL, | |
attenuation_level DECIMAL, | |
brewers_tips TEXT, | |
contributed_by TEXT, | |
volume INTEGER | |
); | |
""") | |
def parse_first_brewed(text: str) -> datetime.date: | |
parts = text.split('/') | |
if len(parts) == 2: | |
return datetime.date(int(parts[1]), int(parts[0]), 1) | |
elif len(parts) == 1: | |
return datetime.date(int(parts[0]), 1, 1) | |
else: | |
assert False, 'Unknown date format' | |
@profile | |
def insert_one_by_one(connection, beers: Iterator[Dict[str, Any]]) -> None: | |
with connection.cursor() as cursor: | |
create_staging_table(cursor) | |
for beer in beers: | |
cursor.execute(""" | |
INSERT INTO staging_beers VALUES ( | |
%(id)s, | |
%(name)s, | |
%(tagline)s, | |
%(first_brewed)s, | |
%(description)s, | |
%(image_url)s, | |
%(abv)s, | |
%(ibu)s, | |
%(target_fg)s, | |
%(target_og)s, | |
%(ebc)s, | |
%(srm)s, | |
%(ph)s, | |
%(attenuation_level)s, | |
%(brewers_tips)s, | |
%(contributed_by)s, | |
%(volume)s | |
); | |
""", { | |
**beer, | |
'first_brewed': parse_first_brewed(beer['first_brewed']), | |
'volume': beer['volume']['value'], | |
}) | |
# http://initd.org/psycopg/docs/cursor.html#cursor.executemany | |
@profile | |
def insert_executemany(connection, beers: Iterator[Dict[str, Any]]) -> None: | |
with connection.cursor() as cursor: | |
create_staging_table(cursor) | |
all_beers = [{ | |
**beer, | |
'first_brewed': parse_first_brewed(beer['first_brewed']), | |
'volume': beer['volume']['value'], | |
} for beer in beers] | |
cursor.executemany(""" | |
INSERT INTO staging_beers VALUES ( | |
%(id)s, | |
%(name)s, | |
%(tagline)s, | |
%(first_brewed)s, | |
%(description)s, | |
%(image_url)s, | |
%(abv)s, | |
%(ibu)s, | |
%(target_fg)s, | |
%(target_og)s, | |
%(ebc)s, | |
%(srm)s, | |
%(ph)s, | |
%(attenuation_level)s, | |
%(brewers_tips)s, | |
%(contributed_by)s, | |
%(volume)s | |
); | |
""", all_beers) | |
@profile | |
def insert_executemany_iterator(connection, beers: Iterator[Dict[str, Any]]) -> None: | |
with connection.cursor() as cursor: | |
create_staging_table(cursor) | |
cursor.executemany(""" | |
INSERT INTO staging_beers VALUES ( | |
%(id)s, | |
%(name)s, | |
%(tagline)s, | |
%(first_brewed)s, | |
%(description)s, | |
%(image_url)s, | |
%(abv)s, | |
%(ibu)s, | |
%(target_fg)s, | |
%(target_og)s, | |
%(ebc)s, | |
%(srm)s, | |
%(ph)s, | |
%(attenuation_level)s, | |
%(brewers_tips)s, | |
%(contributed_by)s, | |
%(volume)s | |
); | |
""", ({ | |
**beer, | |
'first_brewed': parse_first_brewed(beer['first_brewed']), | |
'volume': beer['volume']['value'], | |
} for beer in beers)) | |
# http://initd.org/psycopg/docs/extras.html#psycopg2.extras.execute_batch | |
import psycopg2.extras | |
@profile | |
def insert_execute_batch(connection, beers: Iterator[Dict[str, Any]], page_size: int = 100) -> None: | |
with connection.cursor() as cursor: | |
create_staging_table(cursor) | |
all_beers = [{ | |
**beer, | |
'first_brewed': parse_first_brewed(beer['first_brewed']), | |
'volume': beer['volume']['value'], | |
} for beer in beers] | |
psycopg2.extras.execute_batch(cursor, """ | |
INSERT INTO staging_beers VALUES ( | |
%(id)s, | |
%(name)s, | |
%(tagline)s, | |
%(first_brewed)s, | |
%(description)s, | |
%(image_url)s, | |
%(abv)s, | |
%(ibu)s, | |
%(target_fg)s, | |
%(target_og)s, | |
%(ebc)s, | |
%(srm)s, | |
%(ph)s, | |
%(attenuation_level)s, | |
%(brewers_tips)s, | |
%(contributed_by)s, | |
%(volume)s | |
); | |
""", all_beers, page_size=page_size) | |
@profile | |
def insert_execute_batch_iterator(connection, beers: Iterator[Dict[str, Any]], page_size: int = 100) -> None: | |
with connection.cursor() as cursor: | |
create_staging_table(cursor) | |
iter_beers = ({ | |
**beer, | |
'first_brewed': parse_first_brewed(beer['first_brewed']), | |
'volume': beer['volume']['value'], | |
} for beer in beers) | |
psycopg2.extras.execute_batch(cursor, """ | |
INSERT INTO staging_beers VALUES ( | |
%(id)s, | |
%(name)s, | |
%(tagline)s, | |
%(first_brewed)s, | |
%(description)s, | |
%(image_url)s, | |
%(abv)s, | |
%(ibu)s, | |
%(target_fg)s, | |
%(target_og)s, | |
%(ebc)s, | |
%(srm)s, | |
%(ph)s, | |
%(attenuation_level)s, | |
%(brewers_tips)s, | |
%(contributed_by)s, | |
%(volume)s | |
); | |
""", iter_beers, page_size=page_size) | |
# http://initd.org/psycopg/docs/extras.html#psycopg2.extras.execute_values | |
import psycopg2.extras | |
@profile | |
def insert_execute_values(connection, beers: Iterator[Dict[str, Any]]) -> None: | |
with connection.cursor() as cursor: | |
create_staging_table(cursor) | |
psycopg2.extras.execute_values(cursor, """ | |
INSERT INTO staging_beers VALUES %s; | |
""", [( | |
beer['id'], | |
beer['name'], | |
beer['tagline'], | |
parse_first_brewed(beer['first_brewed']), | |
beer['description'], | |
beer['image_url'], | |
beer['abv'], | |
beer['ibu'], | |
beer['target_fg'], | |
beer['target_og'], | |
beer['ebc'], | |
beer['srm'], | |
beer['ph'], | |
beer['attenuation_level'], | |
beer['brewers_tips'], | |
beer['contributed_by'], | |
beer['volume']['value'], | |
) for beer in beers]) | |
@profile | |
def insert_execute_values_iterator(connection, beers: Iterator[Dict[str, Any]], page_size: int = 100) -> None: | |
with connection.cursor() as cursor: | |
create_staging_table(cursor) | |
psycopg2.extras.execute_values(cursor, """ | |
INSERT INTO staging_beers VALUES %s; | |
""", (( | |
beer['id'], | |
beer['name'], | |
beer['tagline'], | |
parse_first_brewed(beer['first_brewed']), | |
beer['description'], | |
beer['image_url'], | |
beer['abv'], | |
beer['ibu'], | |
beer['target_fg'], | |
beer['target_og'], | |
beer['ebc'], | |
beer['srm'], | |
beer['ph'], | |
beer['attenuation_level'], | |
beer['brewers_tips'], | |
beer['contributed_by'], | |
beer['volume']['value'], | |
) for beer in beers), page_size=page_size) | |
# http://initd.org/psycopg/docs/cursor.html#cursor.copy_from | |
# https://docs.python.org/3.7/library/io.html?io.StringIO#io.StringIO | |
import io | |
def clean_csv_value(value: Optional[Any]) -> str: | |
if value is None: | |
return r'\N' | |
return str(value).replace('\n', '\\n') | |
@profile | |
def copy_stringio(connection, beers: Iterator[Dict[str, Any]]) -> None: | |
with connection.cursor() as cursor: | |
create_staging_table(cursor) | |
csv_file_like_object = io.StringIO() | |
for beer in beers: | |
csv_file_like_object.write('|'.join(map(clean_csv_value, ( | |
beer['id'], | |
beer['name'], | |
beer['tagline'], | |
parse_first_brewed(beer['first_brewed']), | |
beer['description'], | |
beer['image_url'], | |
beer['abv'], | |
beer['ibu'], | |
beer['target_fg'], | |
beer['target_og'], | |
beer['ebc'], | |
beer['srm'], | |
beer['ph'], | |
beer['attenuation_level'], | |
beer['contributed_by'], | |
beer['brewers_tips'], | |
beer['volume']['value'], | |
))) + '\n') | |
csv_file_like_object.seek(0) | |
cursor.copy_from(csv_file_like_object, 'staging_beers', sep='|') | |
class StringIteratorIO(io.TextIOBase): | |
def __init__(self, iter: Iterator[str]): | |
self._iter = iter | |
self._buff = '' | |
def readable(self) -> bool: | |
return True | |
def _read1(self, n: Optional[int] = None) -> str: | |
while not self._buff: | |
try: | |
self._buff = next(self._iter) | |
except StopIteration: | |
break | |
ret = self._buff[:n] | |
self._buff = self._buff[len(ret):] | |
return ret | |
def read(self, n: Optional[int] = None) -> str: | |
line = [] | |
if n is None or n < 0: | |
while True: | |
m = self._read1() | |
if not m: | |
break | |
line.append(m) | |
else: | |
while n > 0: | |
m = self._read1(n) | |
if not m: | |
break | |
n -= len(m) | |
line.append(m) | |
return ''.join(line) | |
@profile | |
def copy_string_iterator(connection, beers: Iterator[Dict[str, Any]], size: int = 8192) -> None: | |
with connection.cursor() as cursor: | |
create_staging_table(cursor) | |
beers_string_iterator = StringIteratorIO(( | |
'|'.join(map(clean_csv_value, ( | |
beer['id'], | |
beer['name'], | |
beer['tagline'], | |
parse_first_brewed(beer['first_brewed']).isoformat(), | |
beer['description'], | |
beer['image_url'], | |
beer['abv'], | |
beer['ibu'], | |
beer['target_fg'], | |
beer['target_og'], | |
beer['ebc'], | |
beer['srm'], | |
beer['ph'], | |
beer['attenuation_level'], | |
beer['brewers_tips'], | |
beer['contributed_by'], | |
beer['volume']['value'], | |
))) + '\n' | |
for beer in beers | |
)) | |
cursor.copy_from(beers_string_iterator, 'staging_beers', sep='|', size=size) | |
#------------------------ Benchmark | |
connection = psycopg2.connect( | |
host='localhost', | |
database='testload', | |
user='haki', | |
password=None, | |
) | |
connection.set_session(autocommit=True) | |
import psycopg2.extras | |
def test(connection, n: int): | |
# Make sure the data was loaded | |
with connection.cursor(cursor_factory=psycopg2.extras.NamedTupleCursor) as cursor: | |
# Test number of rows. | |
cursor.execute('SELECT COUNT(*) AS cnt FROM staging_beers') | |
record = cursor.fetchone() | |
assert record.cnt == n, f'Expected {n} rows, got {record.cnt} rows!' | |
# Test that the data was loaded, and that transformations were applied correctly. | |
cursor.execute(""" | |
SELECT DISTINCT ON (id) | |
* | |
FROM | |
staging_beers | |
WHERE | |
id IN (1, 235) | |
ORDER BY | |
id; | |
""") | |
beer_1 = cursor.fetchone() | |
assert beer_1.name == 'Buzz' | |
assert beer_1.first_brewed == datetime.date(2007, 9, 1) | |
assert beer_1.volume == 20 | |
beer_235 = cursor.fetchone() | |
assert beer_235.name == 'Mango And Chili Barley Wine' | |
assert beer_235.first_brewed == datetime.date(2016, 1, 1) | |
assert beer_235.volume == 20 | |
beers = list(iter_beers_from_api()) * 100 | |
insert_one_by_one(connection, beers) | |
test(connection, len(beers)) | |
insert_executemany(connection, beers) | |
test(connection, len(beers)) | |
insert_executemany_iterator(connection, beers) | |
test(connection, len(beers)) | |
insert_execute_batch(connection, beers) | |
test(connection, len(beers)) | |
insert_execute_batch_iterator(connection, beers, page_size=1) | |
test(connection, len(beers)) | |
insert_execute_batch_iterator(connection, beers, page_size=100) | |
test(connection, len(beers)) | |
insert_execute_batch_iterator(connection, beers, page_size=1000) | |
test(connection, len(beers)) | |
insert_execute_batch_iterator(connection, beers, page_size=10000) | |
test(connection, len(beers)) | |
insert_execute_values(connection, beers) | |
test(connection, len(beers)) | |
insert_execute_values_iterator(connection, beers, page_size=1) | |
test(connection, len(beers)) | |
insert_execute_values_iterator(connection, beers, page_size=100) | |
test(connection, len(beers)) | |
insert_execute_values_iterator(connection, beers, page_size=1000) | |
test(connection, len(beers)) | |
insert_execute_values_iterator(connection, beers, page_size=10000) | |
test(connection, len(beers)) | |
copy_stringio(connection, beers) | |
test(connection, len(beers)) | |
copy_string_iterator(connection, beers, size=1024) | |
test(connection, len(beers)) | |
copy_string_iterator(connection, beers, size=1024 * 8) | |
test(connection, len(beers)) | |
copy_string_iterator(connection, beers, size=1024 * 16) | |
test(connection, len(beers)) | |
copy_string_iterator(connection, beers, size=1024 * 64) | |
test(connection, len(beers)) |
Hey @Tiwo1991, hard to tell.
I think the best way to check this is to dump the contents of your "clean" file into an actual file, and then try to inspect it, even to load it into the database. This way you'll be able to see exactly where the problem is.
@hakib, Thank you for the quick response. I dumped the contents into a text file in the following manner:
with open(r"text.txt","w") as file: w = csv.writer(file) for row in filterfalse(lambda line: "#" in line.get('date'), log_lines): json.dump(row,file) file.write('\n')
The last line in the txt file is an empty line. I expect it to be the culprit. If so I need to find a way for it to be skipped by copy_row() somehow.
@Tiwo1991 glad you found the problem. I think that instead of finding a way to skip this line it's best to look for a way to not write it in the first place (check if current line is last line and not write \n
?)
Just wanted to comment w/ a shoutout on this amazingly well explained and researched example. Great work @hakib
@ededdneddyfan Thanks :)
Hi there, before my comment I will join the rest of the members congratulating @hakib. Great and useful job.
My question is more of a doubt. What is the difference between insert_execute_batch and insert_execute_batch_iterator??
The only difference I found is changing all_beers to iter_beers.
Best regards
It's subtle difference indeed. The function insert_execute_batch
first evaluates the entire list:
all_beers = [{ ... }]
Notice how all_beers
is a list, not a generator.
The function insert_execute_batch_iterator
is using a generator instead:
iter_beers = ({ ... })
Notice that iter_beers
is using a round brackets, so the iterator is not evaluated immediately which should reduce the memory footprint.
Thank you for the quick reply. I actually missed that.
Now with your post I'm learning how to optimize insert and some Python orthography at the same time :)
This is wonderful. I am eternally grateful for the efforts you have put in.
I am however facing a problem.
I am copying a 25MB-ish CSV file from an s3 bucket to a Postgres database. Everything in the code is the same except for the part where I convert the byte
data (that comes from s3 response [StreamingResponseBody
]) to a utf-8
string data inside the _read1()
function. And the copy is extremely slow. The 25mb-ish file takes more than 2 secs to copy.
def _read1(self, n=None):
while not self._buff:
try:
self._buff = next(self._iter)
self._buff = self._buff.decode("utf-8") # <--------this line here
except StopIteration:
break
ret = self._buff[:n]
self._buff = self._buff[len(ret):]
return ret
Do you think this is causing the issue? Also is there an alternative to what I am using?
Try using TextIOWrapper.
I used the StringIteratorIO to load data from csv into postgreSQL using the copy_from method available in psycopg2. It is quite fast. With my infrastructure, I was able to load 32 million rows in less than 8 minutes. That was quite impressive. The only observation that I had with it was the buffer/cache went up and it remained high even after the program completed execution. Any suggestion to avoid it?
This is amazing. Thank you. The table parameter in copy_from does not accept 'schema.table' reference, therefore I had to use copy_expert. The code below works -- I did not test for memory and speed. The dataframe columns/data/dtypes match the table so I do not specify columns clearly.
def insert_with_string_io(df: pd.DataFrame):
buffer = io.StringIO()
df.to_csv(buffer, index=False, header=False)
buffer.seek(0)
with conn.cursor() as cursor:
try:
cursor.copy_expert(f"COPY <database>.<schema>.<table> FROM STDIN (FORMAT 'csv', HEADER false)" , buffer)
except (Exception, psycopg2.DatabaseError) as error:
print("Error: %s" % error)
Hi I'm trying to slightly modify and apply your def copy_string_iterator function, since I need to filter the generated lines. I use itertools.filterfalse in the copy_string_iterator function code block:
))) + '\n') for row in filterfalse(lambda line: "#" in line.get('date'), log_lines) )
This results in an error when running the cursor.copy_from:
QueryCanceled: COPY from stdin failed: error in .read() call CONTEXT: COPY log_table, line 112910
My test file only has 112909 lines that match the filterfalse condition. Why does it try to copy line 112910?
Thank you for any assistance.