-
-
Save hakib/7e723d2c113b947f7920bf55737e4d16 to your computer and use it in GitHub Desktop.
# https://hakibenita.com/fast-load-data-python-postgresql | |
from typing import Iterator, Dict, Any, Optional | |
from urllib.parse import urlencode | |
import datetime | |
#------------------------ Profile | |
import time | |
from functools import wraps | |
from memory_profiler import memory_usage | |
def profile(fn): | |
@wraps(fn) | |
def inner(*args, **kwargs): | |
fn_kwargs_str = ', '.join(f'{k}={v}' for k, v in kwargs.items()) | |
print(f'\n{fn.__name__}({fn_kwargs_str})') | |
# Measure time | |
t = time.perf_counter() | |
retval = fn(*args, **kwargs) | |
elapsed = time.perf_counter() - t | |
print(f'Time {elapsed:0.4}') | |
# Measure memory | |
mem, retval = memory_usage((fn, args, kwargs), retval=True, timeout=200, interval=1e-7) | |
print(f'Memory {max(mem) - min(mem)}') | |
return retval | |
return inner | |
#------------------------ Data | |
import requests | |
def iter_beers_from_api(page_size: int = 25) -> Iterator[Dict[str, Any]]: | |
session = requests.Session() | |
page = 1 | |
while True: | |
response = session.get('https://api.punkapi.com/v2/beers?' + urlencode({ | |
'page': page, | |
'per_page': page_size | |
})) | |
response.raise_for_status() | |
data = response.json() | |
if not data: | |
break | |
for beer in data: | |
yield beer | |
page += 1 | |
def iter_beers_from_file(path: str) -> Iterator[Dict[str, Any]]: | |
import json | |
with open(path, 'r') as f: | |
data = json.load(f) | |
for beer in data: | |
yield beer | |
#------------------------ Load | |
def create_staging_table(cursor): | |
cursor.execute(""" | |
DROP TABLE IF EXISTS staging_beers; | |
CREATE TABLE staging_beers ( | |
id INTEGER, | |
name TEXT, | |
tagline TEXT, | |
first_brewed DATE, | |
description TEXT, | |
image_url TEXT, | |
abv DECIMAL, | |
ibu DECIMAL, | |
target_fg DECIMAL, | |
target_og DECIMAL, | |
ebc DECIMAL, | |
srm DECIMAL, | |
ph DECIMAL, | |
attenuation_level DECIMAL, | |
brewers_tips TEXT, | |
contributed_by TEXT, | |
volume INTEGER | |
); | |
""") | |
def parse_first_brewed(text: str) -> datetime.date: | |
parts = text.split('/') | |
if len(parts) == 2: | |
return datetime.date(int(parts[1]), int(parts[0]), 1) | |
elif len(parts) == 1: | |
return datetime.date(int(parts[0]), 1, 1) | |
else: | |
assert False, 'Unknown date format' | |
@profile | |
def insert_one_by_one(connection, beers: Iterator[Dict[str, Any]]) -> None: | |
with connection.cursor() as cursor: | |
create_staging_table(cursor) | |
for beer in beers: | |
cursor.execute(""" | |
INSERT INTO staging_beers VALUES ( | |
%(id)s, | |
%(name)s, | |
%(tagline)s, | |
%(first_brewed)s, | |
%(description)s, | |
%(image_url)s, | |
%(abv)s, | |
%(ibu)s, | |
%(target_fg)s, | |
%(target_og)s, | |
%(ebc)s, | |
%(srm)s, | |
%(ph)s, | |
%(attenuation_level)s, | |
%(brewers_tips)s, | |
%(contributed_by)s, | |
%(volume)s | |
); | |
""", { | |
**beer, | |
'first_brewed': parse_first_brewed(beer['first_brewed']), | |
'volume': beer['volume']['value'], | |
}) | |
# http://initd.org/psycopg/docs/cursor.html#cursor.executemany | |
@profile | |
def insert_executemany(connection, beers: Iterator[Dict[str, Any]]) -> None: | |
with connection.cursor() as cursor: | |
create_staging_table(cursor) | |
all_beers = [{ | |
**beer, | |
'first_brewed': parse_first_brewed(beer['first_brewed']), | |
'volume': beer['volume']['value'], | |
} for beer in beers] | |
cursor.executemany(""" | |
INSERT INTO staging_beers VALUES ( | |
%(id)s, | |
%(name)s, | |
%(tagline)s, | |
%(first_brewed)s, | |
%(description)s, | |
%(image_url)s, | |
%(abv)s, | |
%(ibu)s, | |
%(target_fg)s, | |
%(target_og)s, | |
%(ebc)s, | |
%(srm)s, | |
%(ph)s, | |
%(attenuation_level)s, | |
%(brewers_tips)s, | |
%(contributed_by)s, | |
%(volume)s | |
); | |
""", all_beers) | |
@profile | |
def insert_executemany_iterator(connection, beers: Iterator[Dict[str, Any]]) -> None: | |
with connection.cursor() as cursor: | |
create_staging_table(cursor) | |
cursor.executemany(""" | |
INSERT INTO staging_beers VALUES ( | |
%(id)s, | |
%(name)s, | |
%(tagline)s, | |
%(first_brewed)s, | |
%(description)s, | |
%(image_url)s, | |
%(abv)s, | |
%(ibu)s, | |
%(target_fg)s, | |
%(target_og)s, | |
%(ebc)s, | |
%(srm)s, | |
%(ph)s, | |
%(attenuation_level)s, | |
%(brewers_tips)s, | |
%(contributed_by)s, | |
%(volume)s | |
); | |
""", ({ | |
**beer, | |
'first_brewed': parse_first_brewed(beer['first_brewed']), | |
'volume': beer['volume']['value'], | |
} for beer in beers)) | |
# http://initd.org/psycopg/docs/extras.html#psycopg2.extras.execute_batch | |
import psycopg2.extras | |
@profile | |
def insert_execute_batch(connection, beers: Iterator[Dict[str, Any]], page_size: int = 100) -> None: | |
with connection.cursor() as cursor: | |
create_staging_table(cursor) | |
all_beers = [{ | |
**beer, | |
'first_brewed': parse_first_brewed(beer['first_brewed']), | |
'volume': beer['volume']['value'], | |
} for beer in beers] | |
psycopg2.extras.execute_batch(cursor, """ | |
INSERT INTO staging_beers VALUES ( | |
%(id)s, | |
%(name)s, | |
%(tagline)s, | |
%(first_brewed)s, | |
%(description)s, | |
%(image_url)s, | |
%(abv)s, | |
%(ibu)s, | |
%(target_fg)s, | |
%(target_og)s, | |
%(ebc)s, | |
%(srm)s, | |
%(ph)s, | |
%(attenuation_level)s, | |
%(brewers_tips)s, | |
%(contributed_by)s, | |
%(volume)s | |
); | |
""", all_beers, page_size=page_size) | |
@profile | |
def insert_execute_batch_iterator(connection, beers: Iterator[Dict[str, Any]], page_size: int = 100) -> None: | |
with connection.cursor() as cursor: | |
create_staging_table(cursor) | |
iter_beers = ({ | |
**beer, | |
'first_brewed': parse_first_brewed(beer['first_brewed']), | |
'volume': beer['volume']['value'], | |
} for beer in beers) | |
psycopg2.extras.execute_batch(cursor, """ | |
INSERT INTO staging_beers VALUES ( | |
%(id)s, | |
%(name)s, | |
%(tagline)s, | |
%(first_brewed)s, | |
%(description)s, | |
%(image_url)s, | |
%(abv)s, | |
%(ibu)s, | |
%(target_fg)s, | |
%(target_og)s, | |
%(ebc)s, | |
%(srm)s, | |
%(ph)s, | |
%(attenuation_level)s, | |
%(brewers_tips)s, | |
%(contributed_by)s, | |
%(volume)s | |
); | |
""", iter_beers, page_size=page_size) | |
# http://initd.org/psycopg/docs/extras.html#psycopg2.extras.execute_values | |
import psycopg2.extras | |
@profile | |
def insert_execute_values(connection, beers: Iterator[Dict[str, Any]]) -> None: | |
with connection.cursor() as cursor: | |
create_staging_table(cursor) | |
psycopg2.extras.execute_values(cursor, """ | |
INSERT INTO staging_beers VALUES %s; | |
""", [( | |
beer['id'], | |
beer['name'], | |
beer['tagline'], | |
parse_first_brewed(beer['first_brewed']), | |
beer['description'], | |
beer['image_url'], | |
beer['abv'], | |
beer['ibu'], | |
beer['target_fg'], | |
beer['target_og'], | |
beer['ebc'], | |
beer['srm'], | |
beer['ph'], | |
beer['attenuation_level'], | |
beer['brewers_tips'], | |
beer['contributed_by'], | |
beer['volume']['value'], | |
) for beer in beers]) | |
@profile | |
def insert_execute_values_iterator(connection, beers: Iterator[Dict[str, Any]], page_size: int = 100) -> None: | |
with connection.cursor() as cursor: | |
create_staging_table(cursor) | |
psycopg2.extras.execute_values(cursor, """ | |
INSERT INTO staging_beers VALUES %s; | |
""", (( | |
beer['id'], | |
beer['name'], | |
beer['tagline'], | |
parse_first_brewed(beer['first_brewed']), | |
beer['description'], | |
beer['image_url'], | |
beer['abv'], | |
beer['ibu'], | |
beer['target_fg'], | |
beer['target_og'], | |
beer['ebc'], | |
beer['srm'], | |
beer['ph'], | |
beer['attenuation_level'], | |
beer['brewers_tips'], | |
beer['contributed_by'], | |
beer['volume']['value'], | |
) for beer in beers), page_size=page_size) | |
# http://initd.org/psycopg/docs/cursor.html#cursor.copy_from | |
# https://docs.python.org/3.7/library/io.html?io.StringIO#io.StringIO | |
import io | |
def clean_csv_value(value: Optional[Any]) -> str: | |
if value is None: | |
return r'\N' | |
return str(value).replace('\n', '\\n') | |
@profile | |
def copy_stringio(connection, beers: Iterator[Dict[str, Any]]) -> None: | |
with connection.cursor() as cursor: | |
create_staging_table(cursor) | |
csv_file_like_object = io.StringIO() | |
for beer in beers: | |
csv_file_like_object.write('|'.join(map(clean_csv_value, ( | |
beer['id'], | |
beer['name'], | |
beer['tagline'], | |
parse_first_brewed(beer['first_brewed']), | |
beer['description'], | |
beer['image_url'], | |
beer['abv'], | |
beer['ibu'], | |
beer['target_fg'], | |
beer['target_og'], | |
beer['ebc'], | |
beer['srm'], | |
beer['ph'], | |
beer['attenuation_level'], | |
beer['contributed_by'], | |
beer['brewers_tips'], | |
beer['volume']['value'], | |
))) + '\n') | |
csv_file_like_object.seek(0) | |
cursor.copy_from(csv_file_like_object, 'staging_beers', sep='|') | |
class StringIteratorIO(io.TextIOBase): | |
def __init__(self, iter: Iterator[str]): | |
self._iter = iter | |
self._buff = '' | |
def readable(self) -> bool: | |
return True | |
def _read1(self, n: Optional[int] = None) -> str: | |
while not self._buff: | |
try: | |
self._buff = next(self._iter) | |
except StopIteration: | |
break | |
ret = self._buff[:n] | |
self._buff = self._buff[len(ret):] | |
return ret | |
def read(self, n: Optional[int] = None) -> str: | |
line = [] | |
if n is None or n < 0: | |
while True: | |
m = self._read1() | |
if not m: | |
break | |
line.append(m) | |
else: | |
while n > 0: | |
m = self._read1(n) | |
if not m: | |
break | |
n -= len(m) | |
line.append(m) | |
return ''.join(line) | |
@profile | |
def copy_string_iterator(connection, beers: Iterator[Dict[str, Any]], size: int = 8192) -> None: | |
with connection.cursor() as cursor: | |
create_staging_table(cursor) | |
beers_string_iterator = StringIteratorIO(( | |
'|'.join(map(clean_csv_value, ( | |
beer['id'], | |
beer['name'], | |
beer['tagline'], | |
parse_first_brewed(beer['first_brewed']).isoformat(), | |
beer['description'], | |
beer['image_url'], | |
beer['abv'], | |
beer['ibu'], | |
beer['target_fg'], | |
beer['target_og'], | |
beer['ebc'], | |
beer['srm'], | |
beer['ph'], | |
beer['attenuation_level'], | |
beer['brewers_tips'], | |
beer['contributed_by'], | |
beer['volume']['value'], | |
))) + '\n' | |
for beer in beers | |
)) | |
cursor.copy_from(beers_string_iterator, 'staging_beers', sep='|', size=size) | |
#------------------------ Benchmark | |
connection = psycopg2.connect( | |
host='localhost', | |
database='testload', | |
user='haki', | |
password=None, | |
) | |
connection.set_session(autocommit=True) | |
import psycopg2.extras | |
def test(connection, n: int): | |
# Make sure the data was loaded | |
with connection.cursor(cursor_factory=psycopg2.extras.NamedTupleCursor) as cursor: | |
# Test number of rows. | |
cursor.execute('SELECT COUNT(*) AS cnt FROM staging_beers') | |
record = cursor.fetchone() | |
assert record.cnt == n, f'Expected {n} rows, got {record.cnt} rows!' | |
# Test that the data was loaded, and that transformations were applied correctly. | |
cursor.execute(""" | |
SELECT DISTINCT ON (id) | |
* | |
FROM | |
staging_beers | |
WHERE | |
id IN (1, 235) | |
ORDER BY | |
id; | |
""") | |
beer_1 = cursor.fetchone() | |
assert beer_1.name == 'Buzz' | |
assert beer_1.first_brewed == datetime.date(2007, 9, 1) | |
assert beer_1.volume == 20 | |
beer_235 = cursor.fetchone() | |
assert beer_235.name == 'Mango And Chili Barley Wine' | |
assert beer_235.first_brewed == datetime.date(2016, 1, 1) | |
assert beer_235.volume == 20 | |
beers = list(iter_beers_from_api()) * 100 | |
insert_one_by_one(connection, beers) | |
test(connection, len(beers)) | |
insert_executemany(connection, beers) | |
test(connection, len(beers)) | |
insert_executemany_iterator(connection, beers) | |
test(connection, len(beers)) | |
insert_execute_batch(connection, beers) | |
test(connection, len(beers)) | |
insert_execute_batch_iterator(connection, beers, page_size=1) | |
test(connection, len(beers)) | |
insert_execute_batch_iterator(connection, beers, page_size=100) | |
test(connection, len(beers)) | |
insert_execute_batch_iterator(connection, beers, page_size=1000) | |
test(connection, len(beers)) | |
insert_execute_batch_iterator(connection, beers, page_size=10000) | |
test(connection, len(beers)) | |
insert_execute_values(connection, beers) | |
test(connection, len(beers)) | |
insert_execute_values_iterator(connection, beers, page_size=1) | |
test(connection, len(beers)) | |
insert_execute_values_iterator(connection, beers, page_size=100) | |
test(connection, len(beers)) | |
insert_execute_values_iterator(connection, beers, page_size=1000) | |
test(connection, len(beers)) | |
insert_execute_values_iterator(connection, beers, page_size=10000) | |
test(connection, len(beers)) | |
copy_stringio(connection, beers) | |
test(connection, len(beers)) | |
copy_string_iterator(connection, beers, size=1024) | |
test(connection, len(beers)) | |
copy_string_iterator(connection, beers, size=1024 * 8) | |
test(connection, len(beers)) | |
copy_string_iterator(connection, beers, size=1024 * 16) | |
test(connection, len(beers)) | |
copy_string_iterator(connection, beers, size=1024 * 64) | |
test(connection, len(beers)) |
Pure GOLD. What about ingesting JSON-Data?
Hey @franz101,
Thanks ;)
The benchmark is ingesting JSON data. It does that but converting it to a CSV on the fly, and importing into the database using PostygreSQL's COPY command.
The article explains it.
Hello,
First of all thanks for the tutorial, looks very interesting.
I would like to fetch data from mongo paginated and insert them to Postgres sql.
When I try the code below nothing happen.
Does anyone knows what Im doing wrong?
Thanks in advance
Hey @jaumevalls, it's hard to understand from your comment what's not working. However, you can start by making sure print(list(iter_beers_from_api()))
produces valid "CSV" text.
Nothing, there is nothing printed on the console. Attached you could find the code. Maybe you could give me a hand.
https://gist.github.com/jaumevalls/90281e5476cb1101acee5de67edddb77
Thanks
I'm not familiar with mongodb client, but are you sure that data
is a generator?
yield from data
There is a difference between yield
and yield from
.
No, data is a cursor I think here is where I have the problem. Im new to python, previously I've used a ETL to parse data from mongo to Postgres but the performance is very low.
I assume I need to change the way I fetch the data.
I solved the issue, thank you very much for your help much appreciated
That's great @jaumevalls, how is the performance and memory usage of this compared to your existing ETL process?
Hey! I did some test this weekend and importing 800.000 rows take 3 min more or less. I'm not able to use the profile to catch all memory load.
My main function looks like this
mess_chunk_iter = iterate_by_chunks(col, 1000, 0, query={'tim':{'$gt': '1571041103'}}, projection={}) for docs in mess_chunk_iter: copy_string_iterator(connection_string, docs, size=1024)
Where I should put the @Profile in order to measure?
You might want to pass the generator directly to copy_string_iterator(connection_string, mess_chunk_iter)
.
q, line 486, beers = list(iter_beers_from_api()) * 100, this will load everything into memory, right? then it really does not "streaming"
Read this section about how I ran the benchmark.
Read this section about how I ran the benchmark.
Right, but if you dont use list(iter), the code does not load anything to table, if you pass iter(beers) with beers being original type
Sorry, but I'm not sure what you mean.
At the bottom of the file there is a test for every method I've used. You can see how every method is being used, and make sure that it's loading data into the database.
Sorry, but I'm not sure what you mean.
At the bottom of the file there is a test for every method I've used. You can see how every method is being used, and make sure that it's loading data into the database.
if you remove line 486, so beers is just from generator, then excecute
copy_string_iterator(connection, iter(beers), size=16384), this does not load anything to pg table.
Hi I'm trying to slightly modify and apply your def copy_string_iterator function, since I need to filter the generated lines. I use itertools.filterfalse in the copy_string_iterator function code block:
))) + '\n') for row in filterfalse(lambda line: "#" in line.get('date'), log_lines) )
This results in an error when running the cursor.copy_from:
QueryCanceled: COPY from stdin failed: error in .read() call CONTEXT: COPY log_table, line 112910
My test file only has 112909 lines that match the filterfalse condition. Why does it try to copy line 112910?
Thank you for any assistance.
Hey @Tiwo1991, hard to tell.
I think the best way to check this is to dump the contents of your "clean" file into an actual file, and then try to inspect it, even to load it into the database. This way you'll be able to see exactly where the problem is.
@hakib, Thank you for the quick response. I dumped the contents into a text file in the following manner:
with open(r"text.txt","w") as file: w = csv.writer(file) for row in filterfalse(lambda line: "#" in line.get('date'), log_lines): json.dump(row,file) file.write('\n')
The last line in the txt file is an empty line. I expect it to be the culprit. If so I need to find a way for it to be skipped by copy_row() somehow.
@Tiwo1991 glad you found the problem. I think that instead of finding a way to skip this line it's best to look for a way to not write it in the first place (check if current line is last line and not write \n
?)
Just wanted to comment w/ a shoutout on this amazingly well explained and researched example. Great work @hakib
@ededdneddyfan Thanks :)
Hi there, before my comment I will join the rest of the members congratulating @hakib. Great and useful job.
My question is more of a doubt. What is the difference between insert_execute_batch and insert_execute_batch_iterator??
The only difference I found is changing all_beers to iter_beers.
Best regards
It's subtle difference indeed. The function insert_execute_batch
first evaluates the entire list:
all_beers = [{ ... }]
Notice how all_beers
is a list, not a generator.
The function insert_execute_batch_iterator
is using a generator instead:
iter_beers = ({ ... })
Notice that iter_beers
is using a round brackets, so the iterator is not evaluated immediately which should reduce the memory footprint.
Thank you for the quick reply. I actually missed that.
Now with your post I'm learning how to optimize insert and some Python orthography at the same time :)
This is wonderful. I am eternally grateful for the efforts you have put in.
I am however facing a problem.
I am copying a 25MB-ish CSV file from an s3 bucket to a Postgres database. Everything in the code is the same except for the part where I convert the byte
data (that comes from s3 response [StreamingResponseBody
]) to a utf-8
string data inside the _read1()
function. And the copy is extremely slow. The 25mb-ish file takes more than 2 secs to copy.
def _read1(self, n=None):
while not self._buff:
try:
self._buff = next(self._iter)
self._buff = self._buff.decode("utf-8") # <--------this line here
except StopIteration:
break
ret = self._buff[:n]
self._buff = self._buff[len(ret):]
return ret
Do you think this is causing the issue? Also is there an alternative to what I am using?
Try using TextIOWrapper.
I used the StringIteratorIO to load data from csv into postgreSQL using the copy_from method available in psycopg2. It is quite fast. With my infrastructure, I was able to load 32 million rows in less than 8 minutes. That was quite impressive. The only observation that I had with it was the buffer/cache went up and it remained high even after the program completed execution. Any suggestion to avoid it?
This is amazing. Thank you. The table parameter in copy_from does not accept 'schema.table' reference, therefore I had to use copy_expert. The code below works -- I did not test for memory and speed. The dataframe columns/data/dtypes match the table so I do not specify columns clearly.
def insert_with_string_io(df: pd.DataFrame):
buffer = io.StringIO()
df.to_csv(buffer, index=False, header=False)
buffer.seek(0)
with conn.cursor() as cursor:
try:
cursor.copy_expert(f"COPY <database>.<schema>.<table> FROM STDIN (FORMAT 'csv', HEADER false)" , buffer)
except (Exception, psycopg2.DatabaseError) as error:
print("Error: %s" % error)
@jgentil
itertools.count(1)
is actually a pretty great idea and I'm going to use in my own projects. Thanks :)