Skip to content

Instantly share code, notes, and snippets.

@hakib
Last active September 10, 2024 19:47
Show Gist options
  • Save hakib/7e723d2c113b947f7920bf55737e4d16 to your computer and use it in GitHub Desktop.
Save hakib/7e723d2c113b947f7920bf55737e4d16 to your computer and use it in GitHub Desktop.
# https://hakibenita.com/fast-load-data-python-postgresql
from typing import Iterator, Dict, Any, Optional
from urllib.parse import urlencode
import datetime
#------------------------ Profile
import time
from functools import wraps
from memory_profiler import memory_usage
def profile(fn):
@wraps(fn)
def inner(*args, **kwargs):
fn_kwargs_str = ', '.join(f'{k}={v}' for k, v in kwargs.items())
print(f'\n{fn.__name__}({fn_kwargs_str})')
# Measure time
t = time.perf_counter()
retval = fn(*args, **kwargs)
elapsed = time.perf_counter() - t
print(f'Time {elapsed:0.4}')
# Measure memory
mem, retval = memory_usage((fn, args, kwargs), retval=True, timeout=200, interval=1e-7)
print(f'Memory {max(mem) - min(mem)}')
return retval
return inner
#------------------------ Data
import requests
def iter_beers_from_api(page_size: int = 25) -> Iterator[Dict[str, Any]]:
session = requests.Session()
page = 1
while True:
response = session.get('https://api.punkapi.com/v2/beers?' + urlencode({
'page': page,
'per_page': page_size
}))
response.raise_for_status()
data = response.json()
if not data:
break
for beer in data:
yield beer
page += 1
def iter_beers_from_file(path: str) -> Iterator[Dict[str, Any]]:
import json
with open(path, 'r') as f:
data = json.load(f)
for beer in data:
yield beer
#------------------------ Load
def create_staging_table(cursor):
cursor.execute("""
DROP TABLE IF EXISTS staging_beers;
CREATE TABLE staging_beers (
id INTEGER,
name TEXT,
tagline TEXT,
first_brewed DATE,
description TEXT,
image_url TEXT,
abv DECIMAL,
ibu DECIMAL,
target_fg DECIMAL,
target_og DECIMAL,
ebc DECIMAL,
srm DECIMAL,
ph DECIMAL,
attenuation_level DECIMAL,
brewers_tips TEXT,
contributed_by TEXT,
volume INTEGER
);
""")
def parse_first_brewed(text: str) -> datetime.date:
parts = text.split('/')
if len(parts) == 2:
return datetime.date(int(parts[1]), int(parts[0]), 1)
elif len(parts) == 1:
return datetime.date(int(parts[0]), 1, 1)
else:
assert False, 'Unknown date format'
@profile
def insert_one_by_one(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
for beer in beers:
cursor.execute("""
INSERT INTO staging_beers VALUES (
%(id)s,
%(name)s,
%(tagline)s,
%(first_brewed)s,
%(description)s,
%(image_url)s,
%(abv)s,
%(ibu)s,
%(target_fg)s,
%(target_og)s,
%(ebc)s,
%(srm)s,
%(ph)s,
%(attenuation_level)s,
%(brewers_tips)s,
%(contributed_by)s,
%(volume)s
);
""", {
**beer,
'first_brewed': parse_first_brewed(beer['first_brewed']),
'volume': beer['volume']['value'],
})
# http://initd.org/psycopg/docs/cursor.html#cursor.executemany
@profile
def insert_executemany(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
all_beers = [{
**beer,
'first_brewed': parse_first_brewed(beer['first_brewed']),
'volume': beer['volume']['value'],
} for beer in beers]
cursor.executemany("""
INSERT INTO staging_beers VALUES (
%(id)s,
%(name)s,
%(tagline)s,
%(first_brewed)s,
%(description)s,
%(image_url)s,
%(abv)s,
%(ibu)s,
%(target_fg)s,
%(target_og)s,
%(ebc)s,
%(srm)s,
%(ph)s,
%(attenuation_level)s,
%(brewers_tips)s,
%(contributed_by)s,
%(volume)s
);
""", all_beers)
@profile
def insert_executemany_iterator(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
cursor.executemany("""
INSERT INTO staging_beers VALUES (
%(id)s,
%(name)s,
%(tagline)s,
%(first_brewed)s,
%(description)s,
%(image_url)s,
%(abv)s,
%(ibu)s,
%(target_fg)s,
%(target_og)s,
%(ebc)s,
%(srm)s,
%(ph)s,
%(attenuation_level)s,
%(brewers_tips)s,
%(contributed_by)s,
%(volume)s
);
""", ({
**beer,
'first_brewed': parse_first_brewed(beer['first_brewed']),
'volume': beer['volume']['value'],
} for beer in beers))
# http://initd.org/psycopg/docs/extras.html#psycopg2.extras.execute_batch
import psycopg2.extras
@profile
def insert_execute_batch(connection, beers: Iterator[Dict[str, Any]], page_size: int = 100) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
all_beers = [{
**beer,
'first_brewed': parse_first_brewed(beer['first_brewed']),
'volume': beer['volume']['value'],
} for beer in beers]
psycopg2.extras.execute_batch(cursor, """
INSERT INTO staging_beers VALUES (
%(id)s,
%(name)s,
%(tagline)s,
%(first_brewed)s,
%(description)s,
%(image_url)s,
%(abv)s,
%(ibu)s,
%(target_fg)s,
%(target_og)s,
%(ebc)s,
%(srm)s,
%(ph)s,
%(attenuation_level)s,
%(brewers_tips)s,
%(contributed_by)s,
%(volume)s
);
""", all_beers, page_size=page_size)
@profile
def insert_execute_batch_iterator(connection, beers: Iterator[Dict[str, Any]], page_size: int = 100) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
iter_beers = ({
**beer,
'first_brewed': parse_first_brewed(beer['first_brewed']),
'volume': beer['volume']['value'],
} for beer in beers)
psycopg2.extras.execute_batch(cursor, """
INSERT INTO staging_beers VALUES (
%(id)s,
%(name)s,
%(tagline)s,
%(first_brewed)s,
%(description)s,
%(image_url)s,
%(abv)s,
%(ibu)s,
%(target_fg)s,
%(target_og)s,
%(ebc)s,
%(srm)s,
%(ph)s,
%(attenuation_level)s,
%(brewers_tips)s,
%(contributed_by)s,
%(volume)s
);
""", iter_beers, page_size=page_size)
# http://initd.org/psycopg/docs/extras.html#psycopg2.extras.execute_values
import psycopg2.extras
@profile
def insert_execute_values(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
psycopg2.extras.execute_values(cursor, """
INSERT INTO staging_beers VALUES %s;
""", [(
beer['id'],
beer['name'],
beer['tagline'],
parse_first_brewed(beer['first_brewed']),
beer['description'],
beer['image_url'],
beer['abv'],
beer['ibu'],
beer['target_fg'],
beer['target_og'],
beer['ebc'],
beer['srm'],
beer['ph'],
beer['attenuation_level'],
beer['brewers_tips'],
beer['contributed_by'],
beer['volume']['value'],
) for beer in beers])
@profile
def insert_execute_values_iterator(connection, beers: Iterator[Dict[str, Any]], page_size: int = 100) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
psycopg2.extras.execute_values(cursor, """
INSERT INTO staging_beers VALUES %s;
""", ((
beer['id'],
beer['name'],
beer['tagline'],
parse_first_brewed(beer['first_brewed']),
beer['description'],
beer['image_url'],
beer['abv'],
beer['ibu'],
beer['target_fg'],
beer['target_og'],
beer['ebc'],
beer['srm'],
beer['ph'],
beer['attenuation_level'],
beer['brewers_tips'],
beer['contributed_by'],
beer['volume']['value'],
) for beer in beers), page_size=page_size)
# http://initd.org/psycopg/docs/cursor.html#cursor.copy_from
# https://docs.python.org/3.7/library/io.html?io.StringIO#io.StringIO
import io
def clean_csv_value(value: Optional[Any]) -> str:
if value is None:
return r'\N'
return str(value).replace('\n', '\\n')
@profile
def copy_stringio(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
csv_file_like_object = io.StringIO()
for beer in beers:
csv_file_like_object.write('|'.join(map(clean_csv_value, (
beer['id'],
beer['name'],
beer['tagline'],
parse_first_brewed(beer['first_brewed']),
beer['description'],
beer['image_url'],
beer['abv'],
beer['ibu'],
beer['target_fg'],
beer['target_og'],
beer['ebc'],
beer['srm'],
beer['ph'],
beer['attenuation_level'],
beer['contributed_by'],
beer['brewers_tips'],
beer['volume']['value'],
))) + '\n')
csv_file_like_object.seek(0)
cursor.copy_from(csv_file_like_object, 'staging_beers', sep='|')
class StringIteratorIO(io.TextIOBase):
def __init__(self, iter: Iterator[str]):
self._iter = iter
self._buff = ''
def readable(self) -> bool:
return True
def _read1(self, n: Optional[int] = None) -> str:
while not self._buff:
try:
self._buff = next(self._iter)
except StopIteration:
break
ret = self._buff[:n]
self._buff = self._buff[len(ret):]
return ret
def read(self, n: Optional[int] = None) -> str:
line = []
if n is None or n < 0:
while True:
m = self._read1()
if not m:
break
line.append(m)
else:
while n > 0:
m = self._read1(n)
if not m:
break
n -= len(m)
line.append(m)
return ''.join(line)
@profile
def copy_string_iterator(connection, beers: Iterator[Dict[str, Any]], size: int = 8192) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
beers_string_iterator = StringIteratorIO((
'|'.join(map(clean_csv_value, (
beer['id'],
beer['name'],
beer['tagline'],
parse_first_brewed(beer['first_brewed']).isoformat(),
beer['description'],
beer['image_url'],
beer['abv'],
beer['ibu'],
beer['target_fg'],
beer['target_og'],
beer['ebc'],
beer['srm'],
beer['ph'],
beer['attenuation_level'],
beer['brewers_tips'],
beer['contributed_by'],
beer['volume']['value'],
))) + '\n'
for beer in beers
))
cursor.copy_from(beers_string_iterator, 'staging_beers', sep='|', size=size)
#------------------------ Benchmark
connection = psycopg2.connect(
host='localhost',
database='testload',
user='haki',
password=None,
)
connection.set_session(autocommit=True)
import psycopg2.extras
def test(connection, n: int):
# Make sure the data was loaded
with connection.cursor(cursor_factory=psycopg2.extras.NamedTupleCursor) as cursor:
# Test number of rows.
cursor.execute('SELECT COUNT(*) AS cnt FROM staging_beers')
record = cursor.fetchone()
assert record.cnt == n, f'Expected {n} rows, got {record.cnt} rows!'
# Test that the data was loaded, and that transformations were applied correctly.
cursor.execute("""
SELECT DISTINCT ON (id)
*
FROM
staging_beers
WHERE
id IN (1, 235)
ORDER BY
id;
""")
beer_1 = cursor.fetchone()
assert beer_1.name == 'Buzz'
assert beer_1.first_brewed == datetime.date(2007, 9, 1)
assert beer_1.volume == 20
beer_235 = cursor.fetchone()
assert beer_235.name == 'Mango And Chili Barley Wine'
assert beer_235.first_brewed == datetime.date(2016, 1, 1)
assert beer_235.volume == 20
beers = list(iter_beers_from_api()) * 100
insert_one_by_one(connection, beers)
test(connection, len(beers))
insert_executemany(connection, beers)
test(connection, len(beers))
insert_executemany_iterator(connection, beers)
test(connection, len(beers))
insert_execute_batch(connection, beers)
test(connection, len(beers))
insert_execute_batch_iterator(connection, beers, page_size=1)
test(connection, len(beers))
insert_execute_batch_iterator(connection, beers, page_size=100)
test(connection, len(beers))
insert_execute_batch_iterator(connection, beers, page_size=1000)
test(connection, len(beers))
insert_execute_batch_iterator(connection, beers, page_size=10000)
test(connection, len(beers))
insert_execute_values(connection, beers)
test(connection, len(beers))
insert_execute_values_iterator(connection, beers, page_size=1)
test(connection, len(beers))
insert_execute_values_iterator(connection, beers, page_size=100)
test(connection, len(beers))
insert_execute_values_iterator(connection, beers, page_size=1000)
test(connection, len(beers))
insert_execute_values_iterator(connection, beers, page_size=10000)
test(connection, len(beers))
copy_stringio(connection, beers)
test(connection, len(beers))
copy_string_iterator(connection, beers, size=1024)
test(connection, len(beers))
copy_string_iterator(connection, beers, size=1024 * 8)
test(connection, len(beers))
copy_string_iterator(connection, beers, size=1024 * 16)
test(connection, len(beers))
copy_string_iterator(connection, beers, size=1024 * 64)
test(connection, len(beers))
@Pkoiralap
Copy link

This is wonderful. I am eternally grateful for the efforts you have put in.

I am however facing a problem.

I am copying a 25MB-ish CSV file from an s3 bucket to a Postgres database. Everything in the code is the same except for the part where I convert the byte data (that comes from s3 response [StreamingResponseBody]) to a utf-8 string data inside the _read1() function. And the copy is extremely slow. The 25mb-ish file takes more than 2 secs to copy.

def _read1(self, n=None):
    while not self._buff:
        try:
            self._buff = next(self._iter)
            self._buff = self._buff.decode("utf-8")  #  <--------this line here
        except StopIteration:
            break
    ret = self._buff[:n]
    self._buff = self._buff[len(ret):]
    return ret

Do you think this is causing the issue? Also is there an alternative to what I am using?

@hakib
Copy link
Author

hakib commented Jan 22, 2021

Try using TextIOWrapper.

@anandrajakrishnan
Copy link

I used the StringIteratorIO to load data from csv into postgreSQL using the copy_from method available in psycopg2. It is quite fast. With my infrastructure, I was able to load 32 million rows in less than 8 minutes. That was quite impressive. The only observation that I had with it was the buffer/cache went up and it remained high even after the program completed execution. Any suggestion to avoid it?

@hale4029
Copy link

hale4029 commented May 5, 2022

This is amazing. Thank you. The table parameter in copy_from does not accept 'schema.table' reference, therefore I had to use copy_expert. The code below works -- I did not test for memory and speed. The dataframe columns/data/dtypes match the table so I do not specify columns clearly.

def insert_with_string_io(df: pd.DataFrame):
        buffer = io.StringIO()
        df.to_csv(buffer, index=False, header=False)
        buffer.seek(0)
        with conn.cursor() as cursor:
            try:
                cursor.copy_expert(f"COPY <database>.<schema>.<table> FROM STDIN (FORMAT 'csv', HEADER false)" , buffer)
            except (Exception, psycopg2.DatabaseError) as error:
                print("Error: %s" % error)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment