Created
July 7, 2025 19:40
-
-
Save ernestprovo23/2b31e39b2090537aae06f7c4e91774af to your computer and use it in GitHub Desktop.
Advanced Data Processing and Analytics Pipeline
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Advanced Data Processing and Analytics Pipeline | |
| Features: ETL operations, data validation, statistical analysis | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| from typing import Dict, List, Tuple, Optional, Any | |
| from dataclasses import dataclass | |
| import logging | |
| from datetime import datetime, timedelta | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| @dataclass | |
| class DataQualityReport: | |
| dataset_name: str | |
| total_records: int | |
| missing_values: Dict[str, int] | |
| duplicates: int | |
| data_types: Dict[str, str] | |
| quality_score: float | |
| recommendations: List[str] | |
| class DataProcessor: | |
| def __init__(self, log_level: str = "INFO"): | |
| self.logger = self._setup_logger(log_level) | |
| self.processing_history: List[Dict[str, Any]] = [] | |
| def _setup_logger(self, level: str) -> logging.Logger: | |
| """Set up logging configuration""" | |
| logger = logging.getLogger('DataProcessor') | |
| logger.setLevel(getattr(logging, level)) | |
| handler = logging.StreamHandler() | |
| formatter = logging.Formatter( | |
| '%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| handler.setFormatter(formatter) | |
| logger.addHandler(handler) | |
| return logger | |
| def validate_data_quality(self, df: pd.DataFrame, dataset_name: str) -> DataQualityReport: | |
| """Comprehensive data quality assessment""" | |
| self.logger.info(f"Validating data quality for {dataset_name}") | |
| # Calculate missing values | |
| missing_values = df.isnull().sum().to_dict() | |
| missing_percentage = (df.isnull().sum() / len(df) * 100).to_dict() | |
| # Check for duplicates | |
| duplicates = df.duplicated().sum() | |
| # Data types | |
| data_types = df.dtypes.astype(str).to_dict() | |
| # Calculate quality score | |
| quality_score = self._calculate_quality_score(df, missing_percentage, duplicates) | |
| # Generate recommendations | |
| recommendations = self._generate_recommendations(df, missing_percentage, duplicates) | |
| report = DataQualityReport( | |
| dataset_name=dataset_name, | |
| total_records=len(df), | |
| missing_values=missing_values, | |
| duplicates=duplicates, | |
| data_types=data_types, | |
| quality_score=quality_score, | |
| recommendations=recommendations | |
| ) | |
| self.logger.info(f"Data quality score: {quality_score:.2f}/100") | |
| return report | |
| def _calculate_quality_score(self, df: pd.DataFrame, missing_pct: Dict[str, float], | |
| duplicates: int) -> float: | |
| """Calculate overall data quality score""" | |
| # Base score | |
| score = 100.0 | |
| # Penalize missing values | |
| avg_missing = np.mean(list(missing_pct.values())) | |
| score -= avg_missing * 2 | |
| # Penalize duplicates | |
| duplicate_pct = (duplicates / len(df)) * 100 | |
| score -= duplicate_pct * 3 | |
| # Penalize inconsistent data types | |
| # (This would require more sophisticated type checking) | |
| return max(0, min(100, score)) | |
| def _generate_recommendations(self, df: pd.DataFrame, missing_pct: Dict[str, float], | |
| duplicates: int) -> List[str]: | |
| """Generate data quality recommendations""" | |
| recommendations = [] | |
| # Missing values recommendations | |
| for col, pct in missing_pct.items(): | |
| if pct > 50: | |
| recommendations.append(f"Consider removing column '{col}' due to {pct:.1f}% missing values") | |
| elif pct > 10: | |
| recommendations.append(f"Investigate and handle missing values in '{col}' ({pct:.1f}%)") | |
| # Duplicates recommendations | |
| if duplicates > 0: | |
| recommendations.append(f"Remove {duplicates} duplicate records") | |
| # Data type recommendations | |
| for col in df.columns: | |
| if df[col].dtype == 'object': | |
| if df[col].str.isnumeric().all(): | |
| recommendations.append(f"Convert '{col}' to numeric type") | |
| return recommendations | |
| def clean_data(self, df: pd.DataFrame, config: Dict[str, Any] = None) -> pd.DataFrame: | |
| """Clean data based on configuration""" | |
| self.logger.info("Starting data cleaning process") | |
| if config is None: | |
| config = { | |
| 'remove_duplicates': True, | |
| 'handle_missing': 'drop', | |
| 'standardize_columns': True, | |
| 'remove_outliers': False | |
| } | |
| df_cleaned = df.copy() | |
| # Remove duplicates | |
| if config.get('remove_duplicates', True): | |
| initial_count = len(df_cleaned) | |
| df_cleaned = df_cleaned.drop_duplicates() | |
| removed = initial_count - len(df_cleaned) | |
| self.logger.info(f"Removed {removed} duplicate records") | |
| # Handle missing values | |
| missing_strategy = config.get('handle_missing', 'drop') | |
| if missing_strategy == 'drop': | |
| df_cleaned = df_cleaned.dropna() | |
| elif missing_strategy == 'forward_fill': | |
| df_cleaned = df_cleaned.fillna(method='ffill') | |
| elif missing_strategy == 'backward_fill': | |
| df_cleaned = df_cleaned.fillna(method='bfill') | |
| elif missing_strategy == 'mean': | |
| numeric_cols = df_cleaned.select_dtypes(include=[np.number]).columns | |
| df_cleaned[numeric_cols] = df_cleaned[numeric_cols].fillna(df_cleaned[numeric_cols].mean()) | |
| # Standardize column names | |
| if config.get('standardize_columns', True): | |
| df_cleaned.columns = df_cleaned.columns.str.lower().str.replace(' ', '_') | |
| # Remove outliers (using IQR method) | |
| if config.get('remove_outliers', False): | |
| df_cleaned = self._remove_outliers(df_cleaned) | |
| self.logger.info(f"Cleaning complete. Records: {len(df)} -> {len(df_cleaned)}") | |
| return df_cleaned | |
| def _remove_outliers(self, df: pd.DataFrame, method: str = 'iqr') -> pd.DataFrame: | |
| """Remove outliers using specified method""" | |
| numeric_cols = df.select_dtypes(include=[np.number]).columns | |
| for col in numeric_cols: | |
| Q1 = df[col].quantile(0.25) | |
| Q3 = df[col].quantile(0.75) | |
| IQR = Q3 - Q1 | |
| lower_bound = Q1 - 1.5 * IQR | |
| upper_bound = Q3 + 1.5 * IQR | |
| df = df[(df[col] >= lower_bound) & (df[col] <= upper_bound)] | |
| return df | |
| def generate_statistical_summary(self, df: pd.DataFrame) -> Dict[str, Any]: | |
| """Generate comprehensive statistical summary""" | |
| self.logger.info("Generating statistical summary") | |
| summary = { | |
| 'basic_stats': df.describe().to_dict(), | |
| 'shape': df.shape, | |
| 'data_types': df.dtypes.to_dict(), | |
| 'missing_values': df.isnull().sum().to_dict(), | |
| 'memory_usage': df.memory_usage(deep=True).to_dict() | |
| } | |
| # Correlation analysis for numeric columns | |
| numeric_cols = df.select_dtypes(include=[np.number]).columns | |
| if len(numeric_cols) > 1: | |
| summary['correlation_matrix'] = df[numeric_cols].corr().to_dict() | |
| # Categorical analysis | |
| categorical_cols = df.select_dtypes(include=['object', 'category']).columns | |
| if len(categorical_cols) > 0: | |
| summary['categorical_stats'] = {} | |
| for col in categorical_cols: | |
| summary['categorical_stats'][col] = { | |
| 'unique_values': df[col].nunique(), | |
| 'most_frequent': df[col].mode().iloc[0] if not df[col].mode().empty else None, | |
| 'value_counts': df[col].value_counts().head(10).to_dict() | |
| } | |
| return summary | |
| def create_time_series_features(self, df: pd.DataFrame, date_column: str) -> pd.DataFrame: | |
| """Create time series features from date column""" | |
| self.logger.info(f"Creating time series features from {date_column}") | |
| df = df.copy() | |
| df[date_column] = pd.to_datetime(df[date_column]) | |
| # Extract time components | |
| df['year'] = df[date_column].dt.year | |
| df['month'] = df[date_column].dt.month | |
| df['day'] = df[date_column].dt.day | |
| df['weekday'] = df[date_column].dt.weekday | |
| df['quarter'] = df[date_column].dt.quarter | |
| df['week_of_year'] = df[date_column].dt.isocalendar().week | |
| df['is_weekend'] = df['weekday'].isin([5, 6]).astype(int) | |
| # Create lag features | |
| df = df.sort_values(date_column) | |
| for lag in [1, 7, 30]: | |
| for col in df.select_dtypes(include=[np.number]).columns: | |
| if col not in ['year', 'month', 'day', 'weekday', 'quarter', 'week_of_year', 'is_weekend']: | |
| df[f'{col}_lag_{lag}'] = df[col].shift(lag) | |
| return df | |
| # Example usage | |
| if __name__ == "__main__": | |
| # Initialize processor | |
| processor = DataProcessor(log_level="INFO") | |
| # Create sample data | |
| np.random.seed(42) | |
| sample_data = pd.DataFrame({ | |
| 'date': pd.date_range('2023-01-01', periods=1000, freq='D'), | |
| 'sales': np.random.normal(1000, 200, 1000), | |
| 'customers': np.random.randint(50, 200, 1000), | |
| 'category': np.random.choice(['A', 'B', 'C'], 1000), | |
| 'region': np.random.choice(['North', 'South', 'East', 'West'], 1000) | |
| }) | |
| # Add some missing values and duplicates for testing | |
| sample_data.loc[sample_data.index[:50], 'sales'] = np.nan | |
| sample_data = pd.concat([sample_data, sample_data.head(10)]) | |
| # Data quality assessment | |
| quality_report = processor.validate_data_quality(sample_data, "Sales Data") | |
| print("Data Quality Report:") | |
| print(f"Quality Score: {quality_report.quality_score:.2f}/100") | |
| print(f"Recommendations: {quality_report.recommendations}") | |
| # Clean data | |
| cleaned_data = processor.clean_data(sample_data) | |
| # Generate statistical summary | |
| stats = processor.generate_statistical_summary(cleaned_data) | |
| print(f"\nDataset shape: {stats['shape']}") | |
| print(f"Missing values: {stats['missing_values']}") | |
| # Create time series features | |
| ts_data = processor.create_time_series_features(cleaned_data, 'date') | |
| print(f"\nTime series features added. New shape: {ts_data.shape}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment