Skip to content

Instantly share code, notes, and snippets.

@recalde
Last active November 19, 2024 02:55
Show Gist options
  • Save recalde/e34699d8f89f8ad75653822fc119e87f to your computer and use it in GitHub Desktop.
Save recalde/e34699d8f89f8ad75653822fc119e87f to your computer and use it in GitHub Desktop.
import psycopg2
import pandas as pd
from psycopg2 import sql
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
# Database connection details
DB_CONFIG = {
"host": "localhost",
"dbname": "your_database",
"user": "your_username",
"password": "your_password",
"port": 5432
}
SCHEMA_NAME = "your_schema_name" # Replace with your schema name
HRE_CALC_TABLE = "hre_calc" # Replace with the actual name of the error table
BATCH_SIZE = 10 # Number of columns to process per query batch
MAX_WORKERS = 4 # Number of threads for concurrent processing
# Thread-safe progress counter
progress_lock = threading.Lock()
progress = 0
def get_numeric_columns(conn, schema_name):
"""Fetch all numeric columns in the specified schema."""
query = """
SELECT table_name, column_name
FROM information_schema.columns
WHERE table_schema = %s
AND data_type IN ('double precision', 'real', 'numeric');
"""
with conn.cursor() as cur:
cur.execute(query, (schema_name,))
return pd.DataFrame(cur.fetchall(), columns=["table_name", "column_name"])
def analyze_table(conn_params, table_name, numeric_columns):
"""Analyze a table for NaN/Infinity values and error percentages."""
conn = psycopg2.connect(**conn_params)
try:
results = []
num_columns = len(numeric_columns)
for i in range(0, num_columns, BATCH_SIZE):
column_batch = numeric_columns[i:i + BATCH_SIZE]
column_filters = [
f"COUNT(*) FILTER (WHERE \"{col}\" = 'NaN' OR \"{col}\" = 'Infinity' OR \"{col}\" = '-Infinity') AS {col}_nan_inf_count"
for col in column_batch
]
query = sql.SQL("""
WITH stats AS (
SELECT
COUNT(*) AS total_rows,
COUNT(*) FILTER (WHERE h.error IS NOT NULL) AS error_count,
{columns}
FROM {schema}.{table} t
LEFT JOIN {schema}.{hre_calc} h
ON t.calc_dt = h.calc_dt AND t.calc_id = h.calc_id
)
SELECT
{columns_names},
(error_count * 100.0 / total_rows) AS error_percentage
FROM stats;
""").format(
columns=sql.SQL(", ").join(sql.SQL(c) for c in column_filters),
schema=sql.Identifier(SCHEMA_NAME),
table=sql.Identifier(table_name),
hre_calc=sql.Identifier(HRE_CALC_TABLE),
columns_names=sql.SQL(", ").join(sql.Identifier(f"{col}_nan_inf_count") for col in column_batch)
)
with conn.cursor() as cur:
cur.execute(query)
result = cur.fetchone()
result_dict = {
"table_name": table_name,
"columns": column_batch,
"results": result[:-1],
"error_percentage": result[-1]
}
results.append(result_dict)
finally:
conn.close()
return results
def update_progress(total, completed):
"""Update and print progress percentage."""
with progress_lock:
percentage = (completed / total) * 100
print(f"Progress: {percentage:.2f}% ({completed}/{total})")
def process_table(conn_params, table_name, columns, total_tables):
"""Process a single table and update progress."""
global progress
result = analyze_table(conn_params, table_name, columns)
with progress_lock:
progress += 1
update_progress(total_tables, progress)
return result
def main():
# Connect to the database
conn = psycopg2.connect(**DB_CONFIG)
try:
# Get numeric columns in the schema
numeric_columns_df = get_numeric_columns(conn, SCHEMA_NAME)
# Group columns by table
tables = numeric_columns_df.groupby("table_name")["column_name"].apply(list)
total_tables = len(tables)
all_results = []
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = [
executor.submit(process_table, DB_CONFIG, table_name, columns, total_tables)
for table_name, columns in tables.items()
]
for future in as_completed(futures):
all_results.extend(future.result())
# Collect and sort the results
sorted_results = []
for result in all_results:
for col, count in zip(result["columns"], result["results"]):
sorted_results.append({
"table_name": result["table_name"],
"column_name": col,
"nan_inf_percentage": count,
"error_percentage": result["error_percentage"]
})
# Sort columns by the highest NaN/Infinity percentage
sorted_results = sorted(sorted_results, key=lambda x: x["nan_inf_percentage"], reverse=True)
# Print the results
for res in sorted_results:
print(f"Table: {res['table_name']}, Column: {res['column_name']}, "
f"NaN/Infinity %: {res['nan_inf_percentage']:.2f}, "
f"Error %: {res['error_percentage']:.2f}")
finally:
conn.close()
if __name__ == "__main__":
main()
DO $$
DECLARE
table_rec RECORD;
query TEXT;
schema_name TEXT := 'your_schema_name'; -- Replace with your schema name
calc_dt_column TEXT := 'calc_dt';
calc_id_column TEXT := 'calc_id';
BEGIN
-- Loop through all tables in the schema
FOR table_rec IN
SELECT tablename
FROM pg_tables
WHERE schemaname = schema_name
LOOP
-- Construct a query to scan the table once and calculate all statistics
query := format(
$$
WITH stats AS (
SELECT
%I AS table_name,
%L AS schema_name,
COUNT(*) AS total_rows,
COUNT(*) FILTER (WHERE h.error IS NOT NULL) AS error_count,
%s
FROM %I.%I t
LEFT JOIN %I.hre_calc h
ON t.%I = h.calc_dt AND t.%I = h.calc_id
)
SELECT
table_name,
schema_name,
column_name,
(nan_inf_count * 100.0 / total_rows) AS nan_or_inf_percentage,
(error_count * 100.0 / total_rows) AS error_percentage
FROM stats, UNNEST(column_stats) AS column_stat(column_name, nan_inf_count);
$$,
table_rec.tablename,
schema_name,
string_agg(format(
'ROW(%L, COUNT(*) FILTER (WHERE "%s" = ''NaN'' OR "%s" = ''Infinity'' OR "%s" = ''-Infinity''))',
column_name,
column_name,
column_name,
column_name
), ', '),
schema_name,
table_rec.tablename,
schema_name,
calc_dt_column,
calc_id_column
);
-- Execute the query
EXECUTE query;
END LOOP;
END $$;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment