Last active
January 26, 2025 00:29
-
-
Save debedb/be6e9115b75e2c41c0e7c88cac418443 to your computer and use it in GitHub Desktop.
Postgres to Parquet
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import psycopg2 | |
import pandas as pd | |
import pyarrow.parquet as pq | |
import pyarrow as pa | |
import glob | |
# PostgreSQL connection | |
conn_pg = psycopg2.connect( | |
host="db_host, | |
database="drumwave", | |
user="postgres", | |
password="password" | |
) | |
# Function to load data in chunks and write to separate Parquet files | |
def fetch_and_save_in_chunks(tbl, chunk_size=10000): | |
# Get the total number of rows in the table | |
count_query = f"SELECT COUNT(*) FROM ref.{tbl}" | |
total_rows = pd.read_sql(count_query, conn_pg).iloc[0, 0] | |
print(f"Total rows in {tbl}: {total_rows}") | |
# Iterate over the rows in chunks | |
for start in range(0, total_rows, chunk_size): | |
# Define the SQL query with LIMIT and OFFSET | |
query = f""" | |
SELECT * FROM ref.{tbl} | |
LIMIT {chunk_size} OFFSET {start} | |
""" | |
# Fetch the chunk of data | |
df = pd.read_sql(query, conn_pg) | |
# Write the chunk to a Parquet file (separate file for each chunk) | |
parquet_file = f'./{tbl}_chunk_{start}.parquet' | |
df.to_parquet(parquet_file, engine='pyarrow', index=False) | |
print(f"Written chunk {start} to {parquet_file}") | |
# Function to merge Parquet chunk files into one single file | |
def merge_parquet_files(tbl): | |
# Get a list of all chunk files | |
files = glob.glob(f'./{tbl}_chunk_*.parquet') | |
# Read all chunk files and combine them into a single table | |
tables = [pq.read_table(f) for f in files] | |
combined_table = pa.concat_tables(tables) | |
# Write the combined table into a single Parquet file | |
pq.write_table(combined_table, f'./{tbl}.parquet') | |
print(f"Merged and written {tbl}.parquet") | |
# Export tables to Parquet in chunks, then merge them into one file | |
for tbl in ['dataprev', 'cc_fatura', 'customer3', 'lookup']: | |
fetch_and_save_in_chunks(tbl) # Step 1: Write chunks | |
merge_parquet_files(tbl) # Step 2: Merge chunks into one file |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment