Skip to content

Instantly share code, notes, and snippets.

@hakib
Last active September 10, 2024 19:47
Show Gist options
  • Save hakib/7e723d2c113b947f7920bf55737e4d16 to your computer and use it in GitHub Desktop.
Save hakib/7e723d2c113b947f7920bf55737e4d16 to your computer and use it in GitHub Desktop.
# https://hakibenita.com/fast-load-data-python-postgresql
from typing import Iterator, Dict, Any, Optional
from urllib.parse import urlencode
import datetime
#------------------------ Profile
import time
from functools import wraps
from memory_profiler import memory_usage
def profile(fn):
@wraps(fn)
def inner(*args, **kwargs):
fn_kwargs_str = ', '.join(f'{k}={v}' for k, v in kwargs.items())
print(f'\n{fn.__name__}({fn_kwargs_str})')
# Measure time
t = time.perf_counter()
retval = fn(*args, **kwargs)
elapsed = time.perf_counter() - t
print(f'Time {elapsed:0.4}')
# Measure memory
mem, retval = memory_usage((fn, args, kwargs), retval=True, timeout=200, interval=1e-7)
print(f'Memory {max(mem) - min(mem)}')
return retval
return inner
#------------------------ Data
import requests
def iter_beers_from_api(page_size: int = 25) -> Iterator[Dict[str, Any]]:
session = requests.Session()
page = 1
while True:
response = session.get('https://api.punkapi.com/v2/beers?' + urlencode({
'page': page,
'per_page': page_size
}))
response.raise_for_status()
data = response.json()
if not data:
break
for beer in data:
yield beer
page += 1
def iter_beers_from_file(path: str) -> Iterator[Dict[str, Any]]:
import json
with open(path, 'r') as f:
data = json.load(f)
for beer in data:
yield beer
#------------------------ Load
def create_staging_table(cursor):
cursor.execute("""
DROP TABLE IF EXISTS staging_beers;
CREATE TABLE staging_beers (
id INTEGER,
name TEXT,
tagline TEXT,
first_brewed DATE,
description TEXT,
image_url TEXT,
abv DECIMAL,
ibu DECIMAL,
target_fg DECIMAL,
target_og DECIMAL,
ebc DECIMAL,
srm DECIMAL,
ph DECIMAL,
attenuation_level DECIMAL,
brewers_tips TEXT,
contributed_by TEXT,
volume INTEGER
);
""")
def parse_first_brewed(text: str) -> datetime.date:
parts = text.split('/')
if len(parts) == 2:
return datetime.date(int(parts[1]), int(parts[0]), 1)
elif len(parts) == 1:
return datetime.date(int(parts[0]), 1, 1)
else:
assert False, 'Unknown date format'
@profile
def insert_one_by_one(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
for beer in beers:
cursor.execute("""
INSERT INTO staging_beers VALUES (
%(id)s,
%(name)s,
%(tagline)s,
%(first_brewed)s,
%(description)s,
%(image_url)s,
%(abv)s,
%(ibu)s,
%(target_fg)s,
%(target_og)s,
%(ebc)s,
%(srm)s,
%(ph)s,
%(attenuation_level)s,
%(brewers_tips)s,
%(contributed_by)s,
%(volume)s
);
""", {
**beer,
'first_brewed': parse_first_brewed(beer['first_brewed']),
'volume': beer['volume']['value'],
})
# http://initd.org/psycopg/docs/cursor.html#cursor.executemany
@profile
def insert_executemany(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
all_beers = [{
**beer,
'first_brewed': parse_first_brewed(beer['first_brewed']),
'volume': beer['volume']['value'],
} for beer in beers]
cursor.executemany("""
INSERT INTO staging_beers VALUES (
%(id)s,
%(name)s,
%(tagline)s,
%(first_brewed)s,
%(description)s,
%(image_url)s,
%(abv)s,
%(ibu)s,
%(target_fg)s,
%(target_og)s,
%(ebc)s,
%(srm)s,
%(ph)s,
%(attenuation_level)s,
%(brewers_tips)s,
%(contributed_by)s,
%(volume)s
);
""", all_beers)
@profile
def insert_executemany_iterator(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
cursor.executemany("""
INSERT INTO staging_beers VALUES (
%(id)s,
%(name)s,
%(tagline)s,
%(first_brewed)s,
%(description)s,
%(image_url)s,
%(abv)s,
%(ibu)s,
%(target_fg)s,
%(target_og)s,
%(ebc)s,
%(srm)s,
%(ph)s,
%(attenuation_level)s,
%(brewers_tips)s,
%(contributed_by)s,
%(volume)s
);
""", ({
**beer,
'first_brewed': parse_first_brewed(beer['first_brewed']),
'volume': beer['volume']['value'],
} for beer in beers))
# http://initd.org/psycopg/docs/extras.html#psycopg2.extras.execute_batch
import psycopg2.extras
@profile
def insert_execute_batch(connection, beers: Iterator[Dict[str, Any]], page_size: int = 100) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
all_beers = [{
**beer,
'first_brewed': parse_first_brewed(beer['first_brewed']),
'volume': beer['volume']['value'],
} for beer in beers]
psycopg2.extras.execute_batch(cursor, """
INSERT INTO staging_beers VALUES (
%(id)s,
%(name)s,
%(tagline)s,
%(first_brewed)s,
%(description)s,
%(image_url)s,
%(abv)s,
%(ibu)s,
%(target_fg)s,
%(target_og)s,
%(ebc)s,
%(srm)s,
%(ph)s,
%(attenuation_level)s,
%(brewers_tips)s,
%(contributed_by)s,
%(volume)s
);
""", all_beers, page_size=page_size)
@profile
def insert_execute_batch_iterator(connection, beers: Iterator[Dict[str, Any]], page_size: int = 100) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
iter_beers = ({
**beer,
'first_brewed': parse_first_brewed(beer['first_brewed']),
'volume': beer['volume']['value'],
} for beer in beers)
psycopg2.extras.execute_batch(cursor, """
INSERT INTO staging_beers VALUES (
%(id)s,
%(name)s,
%(tagline)s,
%(first_brewed)s,
%(description)s,
%(image_url)s,
%(abv)s,
%(ibu)s,
%(target_fg)s,
%(target_og)s,
%(ebc)s,
%(srm)s,
%(ph)s,
%(attenuation_level)s,
%(brewers_tips)s,
%(contributed_by)s,
%(volume)s
);
""", iter_beers, page_size=page_size)
# http://initd.org/psycopg/docs/extras.html#psycopg2.extras.execute_values
import psycopg2.extras
@profile
def insert_execute_values(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
psycopg2.extras.execute_values(cursor, """
INSERT INTO staging_beers VALUES %s;
""", [(
beer['id'],
beer['name'],
beer['tagline'],
parse_first_brewed(beer['first_brewed']),
beer['description'],
beer['image_url'],
beer['abv'],
beer['ibu'],
beer['target_fg'],
beer['target_og'],
beer['ebc'],
beer['srm'],
beer['ph'],
beer['attenuation_level'],
beer['brewers_tips'],
beer['contributed_by'],
beer['volume']['value'],
) for beer in beers])
@profile
def insert_execute_values_iterator(connection, beers: Iterator[Dict[str, Any]], page_size: int = 100) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
psycopg2.extras.execute_values(cursor, """
INSERT INTO staging_beers VALUES %s;
""", ((
beer['id'],
beer['name'],
beer['tagline'],
parse_first_brewed(beer['first_brewed']),
beer['description'],
beer['image_url'],
beer['abv'],
beer['ibu'],
beer['target_fg'],
beer['target_og'],
beer['ebc'],
beer['srm'],
beer['ph'],
beer['attenuation_level'],
beer['brewers_tips'],
beer['contributed_by'],
beer['volume']['value'],
) for beer in beers), page_size=page_size)
# http://initd.org/psycopg/docs/cursor.html#cursor.copy_from
# https://docs.python.org/3.7/library/io.html?io.StringIO#io.StringIO
import io
def clean_csv_value(value: Optional[Any]) -> str:
if value is None:
return r'\N'
return str(value).replace('\n', '\\n')
@profile
def copy_stringio(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
csv_file_like_object = io.StringIO()
for beer in beers:
csv_file_like_object.write('|'.join(map(clean_csv_value, (
beer['id'],
beer['name'],
beer['tagline'],
parse_first_brewed(beer['first_brewed']),
beer['description'],
beer['image_url'],
beer['abv'],
beer['ibu'],
beer['target_fg'],
beer['target_og'],
beer['ebc'],
beer['srm'],
beer['ph'],
beer['attenuation_level'],
beer['contributed_by'],
beer['brewers_tips'],
beer['volume']['value'],
))) + '\n')
csv_file_like_object.seek(0)
cursor.copy_from(csv_file_like_object, 'staging_beers', sep='|')
class StringIteratorIO(io.TextIOBase):
def __init__(self, iter: Iterator[str]):
self._iter = iter
self._buff = ''
def readable(self) -> bool:
return True
def _read1(self, n: Optional[int] = None) -> str:
while not self._buff:
try:
self._buff = next(self._iter)
except StopIteration:
break
ret = self._buff[:n]
self._buff = self._buff[len(ret):]
return ret
def read(self, n: Optional[int] = None) -> str:
line = []
if n is None or n < 0:
while True:
m = self._read1()
if not m:
break
line.append(m)
else:
while n > 0:
m = self._read1(n)
if not m:
break
n -= len(m)
line.append(m)
return ''.join(line)
@profile
def copy_string_iterator(connection, beers: Iterator[Dict[str, Any]], size: int = 8192) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
beers_string_iterator = StringIteratorIO((
'|'.join(map(clean_csv_value, (
beer['id'],
beer['name'],
beer['tagline'],
parse_first_brewed(beer['first_brewed']).isoformat(),
beer['description'],
beer['image_url'],
beer['abv'],
beer['ibu'],
beer['target_fg'],
beer['target_og'],
beer['ebc'],
beer['srm'],
beer['ph'],
beer['attenuation_level'],
beer['brewers_tips'],
beer['contributed_by'],
beer['volume']['value'],
))) + '\n'
for beer in beers
))
cursor.copy_from(beers_string_iterator, 'staging_beers', sep='|', size=size)
#------------------------ Benchmark
connection = psycopg2.connect(
host='localhost',
database='testload',
user='haki',
password=None,
)
connection.set_session(autocommit=True)
import psycopg2.extras
def test(connection, n: int):
# Make sure the data was loaded
with connection.cursor(cursor_factory=psycopg2.extras.NamedTupleCursor) as cursor:
# Test number of rows.
cursor.execute('SELECT COUNT(*) AS cnt FROM staging_beers')
record = cursor.fetchone()
assert record.cnt == n, f'Expected {n} rows, got {record.cnt} rows!'
# Test that the data was loaded, and that transformations were applied correctly.
cursor.execute("""
SELECT DISTINCT ON (id)
*
FROM
staging_beers
WHERE
id IN (1, 235)
ORDER BY
id;
""")
beer_1 = cursor.fetchone()
assert beer_1.name == 'Buzz'
assert beer_1.first_brewed == datetime.date(2007, 9, 1)
assert beer_1.volume == 20
beer_235 = cursor.fetchone()
assert beer_235.name == 'Mango And Chili Barley Wine'
assert beer_235.first_brewed == datetime.date(2016, 1, 1)
assert beer_235.volume == 20
beers = list(iter_beers_from_api()) * 100
insert_one_by_one(connection, beers)
test(connection, len(beers))
insert_executemany(connection, beers)
test(connection, len(beers))
insert_executemany_iterator(connection, beers)
test(connection, len(beers))
insert_execute_batch(connection, beers)
test(connection, len(beers))
insert_execute_batch_iterator(connection, beers, page_size=1)
test(connection, len(beers))
insert_execute_batch_iterator(connection, beers, page_size=100)
test(connection, len(beers))
insert_execute_batch_iterator(connection, beers, page_size=1000)
test(connection, len(beers))
insert_execute_batch_iterator(connection, beers, page_size=10000)
test(connection, len(beers))
insert_execute_values(connection, beers)
test(connection, len(beers))
insert_execute_values_iterator(connection, beers, page_size=1)
test(connection, len(beers))
insert_execute_values_iterator(connection, beers, page_size=100)
test(connection, len(beers))
insert_execute_values_iterator(connection, beers, page_size=1000)
test(connection, len(beers))
insert_execute_values_iterator(connection, beers, page_size=10000)
test(connection, len(beers))
copy_stringio(connection, beers)
test(connection, len(beers))
copy_string_iterator(connection, beers, size=1024)
test(connection, len(beers))
copy_string_iterator(connection, beers, size=1024 * 8)
test(connection, len(beers))
copy_string_iterator(connection, beers, size=1024 * 16)
test(connection, len(beers))
copy_string_iterator(connection, beers, size=1024 * 64)
test(connection, len(beers))
@hakib
Copy link
Author

hakib commented Jul 28, 2019

@jgentil itertools.count(1) is actually a pretty great idea and I'm going to use in my own projects. Thanks :)

@franz101
Copy link

franz101 commented Sep 7, 2019

Pure GOLD. What about ingesting JSON-Data?

@hakib
Copy link
Author

hakib commented Sep 8, 2019

Hey @franz101,
Thanks ;)

The benchmark is ingesting JSON data. It does that but converting it to a CSV on the fly, and importing into the database using PostygreSQL's COPY command.

The article explains it.

@jaumevalls
Copy link

jaumevalls commented Oct 10, 2019

Hello,

First of all thanks for the tutorial, looks very interesting.

I would like to fetch data from mongo paginated and insert them to Postgres sql.

When I try the code below nothing happen.

Does anyone knows what Im doing wrong?

Thanks in advance

@hakib
Copy link
Author

hakib commented Oct 11, 2019

Hey @jaumevalls, it's hard to understand from your comment what's not working. However, you can start by making sure print(list(iter_beers_from_api())) produces valid "CSV" text.

@jaumevalls
Copy link

Nothing, there is nothing printed on the console. Attached you could find the code. Maybe you could give me a hand.

https://gist.github.com/jaumevalls/90281e5476cb1101acee5de67edddb77

Thanks

@hakib
Copy link
Author

hakib commented Oct 11, 2019

I'm not familiar with mongodb client, but are you sure that data is a generator?

    yield from data  

There is a difference between yield and yield from.

@jaumevalls
Copy link

No, data is a cursor I think here is where I have the problem. Im new to python, previously I've used a ETL to parse data from mongo to Postgres but the performance is very low.

I assume I need to change the way I fetch the data.

@jaumevalls
Copy link

I solved the issue, thank you very much for your help much appreciated

@hakib
Copy link
Author

hakib commented Oct 13, 2019

That's great @jaumevalls, how is the performance and memory usage of this compared to your existing ETL process?

@jaumevalls
Copy link

Hey! I did some test this weekend and importing 800.000 rows take 3 min more or less. I'm not able to use the profile to catch all memory load.

My main function looks like this

mess_chunk_iter = iterate_by_chunks(col, 1000, 0, query={'tim':{'$gt': '1571041103'}}, projection={}) for docs in mess_chunk_iter: copy_string_iterator(connection_string, docs, size=1024)

Where I should put the @Profile in order to measure?

@hakib
Copy link
Author

hakib commented Oct 14, 2019

You might want to pass the generator directly to copy_string_iterator(connection_string, mess_chunk_iter).

@heatherzhang
Copy link

q, line 486, beers = list(iter_beers_from_api()) * 100, this will load everything into memory, right? then it really does not "streaming"

@hakib
Copy link
Author

hakib commented Jan 8, 2020

Read this section about how I ran the benchmark.

@heatherzhang
Copy link

Read this section about how I ran the benchmark.

Right, but if you dont use list(iter), the code does not load anything to table, if you pass iter(beers) with beers being original type

@hakib
Copy link
Author

hakib commented Jan 9, 2020

Sorry, but I'm not sure what you mean.

At the bottom of the file there is a test for every method I've used. You can see how every method is being used, and make sure that it's loading data into the database.

@heatherzhang
Copy link

Sorry, but I'm not sure what you mean.

At the bottom of the file there is a test for every method I've used. You can see how every method is being used, and make sure that it's loading data into the database.

if you remove line 486, so beers is just from generator, then excecute
copy_string_iterator(connection, iter(beers), size=16384), this does not load anything to pg table.

@Tiwo1991
Copy link

Hi I'm trying to slightly modify and apply your def copy_string_iterator function, since I need to filter the generated lines. I use itertools.filterfalse in the copy_string_iterator function code block:

))) + '\n') for row in filterfalse(lambda line: "#" in line.get('date'), log_lines) )

This results in an error when running the cursor.copy_from:

QueryCanceled: COPY from stdin failed: error in .read() call CONTEXT: COPY log_table, line 112910

My test file only has 112909 lines that match the filterfalse condition. Why does it try to copy line 112910?

Thank you for any assistance.

@hakib
Copy link
Author

hakib commented Mar 17, 2020

Hey @Tiwo1991, hard to tell.

I think the best way to check this is to dump the contents of your "clean" file into an actual file, and then try to inspect it, even to load it into the database. This way you'll be able to see exactly where the problem is.

@Tiwo1991
Copy link

@hakib, Thank you for the quick response. I dumped the contents into a text file in the following manner:

with open(r"text.txt","w") as file: w = csv.writer(file) for row in filterfalse(lambda line: "#" in line.get('date'), log_lines): json.dump(row,file) file.write('\n')

The last line in the txt file is an empty line. I expect it to be the culprit. If so I need to find a way for it to be skipped by copy_row() somehow.

@hakib
Copy link
Author

hakib commented Mar 17, 2020

@Tiwo1991 glad you found the problem. I think that instead of finding a way to skip this line it's best to look for a way to not write it in the first place (check if current line is last line and not write \n?)

@ededdneddyfan
Copy link

Just wanted to comment w/ a shoutout on this amazingly well explained and researched example. Great work @hakib

@hakib
Copy link
Author

hakib commented May 7, 2020

@ededdneddyfan Thanks :)

@onofre-marco
Copy link

Hi there, before my comment I will join the rest of the members congratulating @hakib. Great and useful job.

My question is more of a doubt. What is the difference between insert_execute_batch and insert_execute_batch_iterator??
The only difference I found is changing all_beers to iter_beers.

Best regards

@hakib
Copy link
Author

hakib commented Jan 20, 2021

It's subtle difference indeed. The function insert_execute_batch first evaluates the entire list:

 all_beers = [{ ... }]

Notice how all_beers is a list, not a generator.

The function insert_execute_batch_iterator is using a generator instead:

iter_beers = ({ ... })

Notice that iter_beers is using a round brackets, so the iterator is not evaluated immediately which should reduce the memory footprint.

@onofre-marco
Copy link

Thank you for the quick reply. I actually missed that.
Now with your post I'm learning how to optimize insert and some Python orthography at the same time :)

@Pkoiralap
Copy link

This is wonderful. I am eternally grateful for the efforts you have put in.

I am however facing a problem.

I am copying a 25MB-ish CSV file from an s3 bucket to a Postgres database. Everything in the code is the same except for the part where I convert the byte data (that comes from s3 response [StreamingResponseBody]) to a utf-8 string data inside the _read1() function. And the copy is extremely slow. The 25mb-ish file takes more than 2 secs to copy.

def _read1(self, n=None):
    while not self._buff:
        try:
            self._buff = next(self._iter)
            self._buff = self._buff.decode("utf-8")  #  <--------this line here
        except StopIteration:
            break
    ret = self._buff[:n]
    self._buff = self._buff[len(ret):]
    return ret

Do you think this is causing the issue? Also is there an alternative to what I am using?

@hakib
Copy link
Author

hakib commented Jan 22, 2021

Try using TextIOWrapper.

@anandrajakrishnan
Copy link

I used the StringIteratorIO to load data from csv into postgreSQL using the copy_from method available in psycopg2. It is quite fast. With my infrastructure, I was able to load 32 million rows in less than 8 minutes. That was quite impressive. The only observation that I had with it was the buffer/cache went up and it remained high even after the program completed execution. Any suggestion to avoid it?

@hale4029
Copy link

hale4029 commented May 5, 2022

This is amazing. Thank you. The table parameter in copy_from does not accept 'schema.table' reference, therefore I had to use copy_expert. The code below works -- I did not test for memory and speed. The dataframe columns/data/dtypes match the table so I do not specify columns clearly.

def insert_with_string_io(df: pd.DataFrame):
        buffer = io.StringIO()
        df.to_csv(buffer, index=False, header=False)
        buffer.seek(0)
        with conn.cursor() as cursor:
            try:
                cursor.copy_expert(f"COPY <database>.<schema>.<table> FROM STDIN (FORMAT 'csv', HEADER false)" , buffer)
            except (Exception, psycopg2.DatabaseError) as error:
                print("Error: %s" % error)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment