Last active
February 24, 2025 11:24
-
-
Save rvaidya/c78f34b3c29ff087d8700297c0ca3b5b to your computer and use it in GitHub Desktop.
Dump database table to parquet file using sqlalchemy and fastparquet. Useful for loading large tables into pandas / Dask, since read_sql_table will hammer the server with queries if the # of partitions/chunks is high. Using this you write a temp parquet file, then use read_parquet to get the data into a DataFrame
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import numpy as np | |
import fastparquet | |
from sqlalchemy import create_engine, schema, Table | |
# Copied from pandas with modifications | |
def __get_dtype(column, sqltype): | |
import sqlalchemy.dialects as sqld | |
from sqlalchemy.types import (Integer, Float, Boolean, DateTime, | |
Date, TIMESTAMP) | |
if isinstance(sqltype, Float): | |
return float | |
elif isinstance(sqltype, Integer): | |
# Since DataFrame cannot handle nullable int, convert nullable ints to floats | |
if column.nullable: | |
return float | |
# TODO: Refine integer size. | |
return np.dtype('int64') | |
elif isinstance(sqltype, TIMESTAMP): | |
# we have a timezone capable type | |
if not sqltype.timezone: | |
return np.dtype('datetime64[ns]') | |
return DatetimeTZDtype | |
elif isinstance(sqltype, DateTime): | |
# Caution: np.datetime64 is also a subclass of np.number. | |
return np.dtype('datetime64[ns]') | |
elif isinstance(sqltype, Date): | |
return np.date | |
elif isinstance(sqltype, Boolean): | |
return bool | |
elif isinstance(sqltype, sqld.mssql.base.BIT): | |
# Handling database provider specific types | |
return np.dtype('u1') | |
# Catch all type - handle provider specific types in another elif block | |
return object | |
def __write_parquet(output_path: str, batch_array, column_dict, write_index: bool, | |
compression: str, append: bool): | |
# Create the DataFrame to hold the batch array contents | |
b_df = pd.DataFrame(batch_array, columns=column_dict) | |
# Cast the DataFrame columns to the sqlalchemy column analogues | |
b_df = b_df.astype(dtype=column_dict) | |
# Write to the parquet file (first write needs append=False) | |
fastparquet.write(output_path, b_df, | |
write_index=write_index, compression=compression, append=append) | |
def table_to_parquet(output_path: str, table_name: str, con, | |
batch_size: int = 10000, write_index: bool = True, | |
compression: str = None): | |
# Get database schema using sqlalchemy reflection | |
db_engine = create_engine(con) | |
db_metadata = schema.MetaData(bind=db_engine) | |
db_table = Table(table_name, db_metadata, autoload=True) | |
# Get the columns for the parquet file | |
column_dict = dict() | |
for column in db_table.columns: | |
dtype = __get_dtype(column, column.type) | |
column_dict[column.name] = dtype | |
# Query the table | |
result = db_table.select().execute() | |
row_batch = result.fetchmany(size=batch_size) | |
append = False | |
while(len(row_batch) > 0): | |
__write_parquet(output_path, row_batch, | |
column_dict, write_index, compression, append) | |
append = True | |
row_batch = result.fetchmany(size=batch_size) |
Replacing:
column_dict[column.name] = dtype
with
column_dict[str(column.name)] = dtype
resolved the issue. Seems like the column.name is not really a string.
Replacing:
column_dict[column.name] = dtype
withcolumn_dict[str(column.name)] = dtype
resolved the issue. Seems like the column.name is not really a string.
@Upgwades THANK YOU!!!
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Been using this code for awhile. Migrated the code to a new machine and it started throwing this error. I'm no python expert by any means, but I've been attempting to troubleshoot it for several days
File "C:\appl\python\Code\PythonTest\venv\lib\site-packages\fastparquet\writer.py", line 1499, in write_thrift
return f.write(obj.to_bytes())
TypeError: Expected unicode, got quoted_name
TypeError: Expected unicode, got quoted_name
Exception ignored in: 'fastparquet.cencoding.write_list'