Last active
November 19, 2024 02:55
-
-
Save recalde/e34699d8f89f8ad75653822fc119e87f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import psycopg2 | |
import pandas as pd | |
from psycopg2 import sql | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
import threading | |
# Database connection details | |
DB_CONFIG = { | |
"host": "localhost", | |
"dbname": "your_database", | |
"user": "your_username", | |
"password": "your_password", | |
"port": 5432 | |
} | |
SCHEMA_NAME = "your_schema_name" # Replace with your schema name | |
HRE_CALC_TABLE = "hre_calc" # Replace with the actual name of the error table | |
BATCH_SIZE = 10 # Number of columns to process per query batch | |
MAX_WORKERS = 4 # Number of threads for concurrent processing | |
# Thread-safe progress counter | |
progress_lock = threading.Lock() | |
progress = 0 | |
def get_numeric_columns(conn, schema_name): | |
"""Fetch all numeric columns in the specified schema.""" | |
query = """ | |
SELECT table_name, column_name | |
FROM information_schema.columns | |
WHERE table_schema = %s | |
AND data_type IN ('double precision', 'real', 'numeric'); | |
""" | |
with conn.cursor() as cur: | |
cur.execute(query, (schema_name,)) | |
return pd.DataFrame(cur.fetchall(), columns=["table_name", "column_name"]) | |
def analyze_table(conn_params, table_name, numeric_columns): | |
"""Analyze a table for NaN/Infinity values and error percentages.""" | |
conn = psycopg2.connect(**conn_params) | |
try: | |
results = [] | |
num_columns = len(numeric_columns) | |
for i in range(0, num_columns, BATCH_SIZE): | |
column_batch = numeric_columns[i:i + BATCH_SIZE] | |
column_filters = [ | |
f"COUNT(*) FILTER (WHERE \"{col}\" = 'NaN' OR \"{col}\" = 'Infinity' OR \"{col}\" = '-Infinity') AS {col}_nan_inf_count" | |
for col in column_batch | |
] | |
query = sql.SQL(""" | |
WITH stats AS ( | |
SELECT | |
COUNT(*) AS total_rows, | |
COUNT(*) FILTER (WHERE h.error IS NOT NULL) AS error_count, | |
{columns} | |
FROM {schema}.{table} t | |
LEFT JOIN {schema}.{hre_calc} h | |
ON t.calc_dt = h.calc_dt AND t.calc_id = h.calc_id | |
) | |
SELECT | |
{columns_names}, | |
(error_count * 100.0 / total_rows) AS error_percentage | |
FROM stats; | |
""").format( | |
columns=sql.SQL(", ").join(sql.SQL(c) for c in column_filters), | |
schema=sql.Identifier(SCHEMA_NAME), | |
table=sql.Identifier(table_name), | |
hre_calc=sql.Identifier(HRE_CALC_TABLE), | |
columns_names=sql.SQL(", ").join(sql.Identifier(f"{col}_nan_inf_count") for col in column_batch) | |
) | |
with conn.cursor() as cur: | |
cur.execute(query) | |
result = cur.fetchone() | |
result_dict = { | |
"table_name": table_name, | |
"columns": column_batch, | |
"results": result[:-1], | |
"error_percentage": result[-1] | |
} | |
results.append(result_dict) | |
finally: | |
conn.close() | |
return results | |
def update_progress(total, completed): | |
"""Update and print progress percentage.""" | |
with progress_lock: | |
percentage = (completed / total) * 100 | |
print(f"Progress: {percentage:.2f}% ({completed}/{total})") | |
def process_table(conn_params, table_name, columns, total_tables): | |
"""Process a single table and update progress.""" | |
global progress | |
result = analyze_table(conn_params, table_name, columns) | |
with progress_lock: | |
progress += 1 | |
update_progress(total_tables, progress) | |
return result | |
def main(): | |
# Connect to the database | |
conn = psycopg2.connect(**DB_CONFIG) | |
try: | |
# Get numeric columns in the schema | |
numeric_columns_df = get_numeric_columns(conn, SCHEMA_NAME) | |
# Group columns by table | |
tables = numeric_columns_df.groupby("table_name")["column_name"].apply(list) | |
total_tables = len(tables) | |
all_results = [] | |
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: | |
futures = [ | |
executor.submit(process_table, DB_CONFIG, table_name, columns, total_tables) | |
for table_name, columns in tables.items() | |
] | |
for future in as_completed(futures): | |
all_results.extend(future.result()) | |
# Collect and sort the results | |
sorted_results = [] | |
for result in all_results: | |
for col, count in zip(result["columns"], result["results"]): | |
sorted_results.append({ | |
"table_name": result["table_name"], | |
"column_name": col, | |
"nan_inf_percentage": count, | |
"error_percentage": result["error_percentage"] | |
}) | |
# Sort columns by the highest NaN/Infinity percentage | |
sorted_results = sorted(sorted_results, key=lambda x: x["nan_inf_percentage"], reverse=True) | |
# Print the results | |
for res in sorted_results: | |
print(f"Table: {res['table_name']}, Column: {res['column_name']}, " | |
f"NaN/Infinity %: {res['nan_inf_percentage']:.2f}, " | |
f"Error %: {res['error_percentage']:.2f}") | |
finally: | |
conn.close() | |
if __name__ == "__main__": | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
DO $$ | |
DECLARE | |
table_rec RECORD; | |
query TEXT; | |
schema_name TEXT := 'your_schema_name'; -- Replace with your schema name | |
calc_dt_column TEXT := 'calc_dt'; | |
calc_id_column TEXT := 'calc_id'; | |
BEGIN | |
-- Loop through all tables in the schema | |
FOR table_rec IN | |
SELECT tablename | |
FROM pg_tables | |
WHERE schemaname = schema_name | |
LOOP | |
-- Construct a query to scan the table once and calculate all statistics | |
query := format( | |
$$ | |
WITH stats AS ( | |
SELECT | |
%I AS table_name, | |
%L AS schema_name, | |
COUNT(*) AS total_rows, | |
COUNT(*) FILTER (WHERE h.error IS NOT NULL) AS error_count, | |
%s | |
FROM %I.%I t | |
LEFT JOIN %I.hre_calc h | |
ON t.%I = h.calc_dt AND t.%I = h.calc_id | |
) | |
SELECT | |
table_name, | |
schema_name, | |
column_name, | |
(nan_inf_count * 100.0 / total_rows) AS nan_or_inf_percentage, | |
(error_count * 100.0 / total_rows) AS error_percentage | |
FROM stats, UNNEST(column_stats) AS column_stat(column_name, nan_inf_count); | |
$$, | |
table_rec.tablename, | |
schema_name, | |
string_agg(format( | |
'ROW(%L, COUNT(*) FILTER (WHERE "%s" = ''NaN'' OR "%s" = ''Infinity'' OR "%s" = ''-Infinity''))', | |
column_name, | |
column_name, | |
column_name, | |
column_name | |
), ', '), | |
schema_name, | |
table_rec.tablename, | |
schema_name, | |
calc_dt_column, | |
calc_id_column | |
); | |
-- Execute the query | |
EXECUTE query; | |
END LOOP; | |
END $$; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment