Skip to content

Instantly share code, notes, and snippets.

@datavudeja
Forked from ernestprovo23/data_processing.py
Created November 18, 2025 15:44
Show Gist options
  • Select an option

  • Save datavudeja/44814db054296b2e5ddbfb7ac6c1f4c7 to your computer and use it in GitHub Desktop.

Select an option

Save datavudeja/44814db054296b2e5ddbfb7ac6c1f4c7 to your computer and use it in GitHub Desktop.
Advanced Data Processing and Analytics Pipeline
#!/usr/bin/env python3
"""
Advanced Data Processing and Analytics Pipeline
Features: ETL operations, data validation, statistical analysis
"""
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Optional, Any
from dataclasses import dataclass
import logging
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')
@dataclass
class DataQualityReport:
dataset_name: str
total_records: int
missing_values: Dict[str, int]
duplicates: int
data_types: Dict[str, str]
quality_score: float
recommendations: List[str]
class DataProcessor:
def __init__(self, log_level: str = "INFO"):
self.logger = self._setup_logger(log_level)
self.processing_history: List[Dict[str, Any]] = []
def _setup_logger(self, level: str) -> logging.Logger:
"""Set up logging configuration"""
logger = logging.getLogger('DataProcessor')
logger.setLevel(getattr(logging, level))
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def validate_data_quality(self, df: pd.DataFrame, dataset_name: str) -> DataQualityReport:
"""Comprehensive data quality assessment"""
self.logger.info(f"Validating data quality for {dataset_name}")
# Calculate missing values
missing_values = df.isnull().sum().to_dict()
missing_percentage = (df.isnull().sum() / len(df) * 100).to_dict()
# Check for duplicates
duplicates = df.duplicated().sum()
# Data types
data_types = df.dtypes.astype(str).to_dict()
# Calculate quality score
quality_score = self._calculate_quality_score(df, missing_percentage, duplicates)
# Generate recommendations
recommendations = self._generate_recommendations(df, missing_percentage, duplicates)
report = DataQualityReport(
dataset_name=dataset_name,
total_records=len(df),
missing_values=missing_values,
duplicates=duplicates,
data_types=data_types,
quality_score=quality_score,
recommendations=recommendations
)
self.logger.info(f"Data quality score: {quality_score:.2f}/100")
return report
def _calculate_quality_score(self, df: pd.DataFrame, missing_pct: Dict[str, float],
duplicates: int) -> float:
"""Calculate overall data quality score"""
# Base score
score = 100.0
# Penalize missing values
avg_missing = np.mean(list(missing_pct.values()))
score -= avg_missing * 2
# Penalize duplicates
duplicate_pct = (duplicates / len(df)) * 100
score -= duplicate_pct * 3
# Penalize inconsistent data types
# (This would require more sophisticated type checking)
return max(0, min(100, score))
def _generate_recommendations(self, df: pd.DataFrame, missing_pct: Dict[str, float],
duplicates: int) -> List[str]:
"""Generate data quality recommendations"""
recommendations = []
# Missing values recommendations
for col, pct in missing_pct.items():
if pct > 50:
recommendations.append(f"Consider removing column '{col}' due to {pct:.1f}% missing values")
elif pct > 10:
recommendations.append(f"Investigate and handle missing values in '{col}' ({pct:.1f}%)")
# Duplicates recommendations
if duplicates > 0:
recommendations.append(f"Remove {duplicates} duplicate records")
# Data type recommendations
for col in df.columns:
if df[col].dtype == 'object':
if df[col].str.isnumeric().all():
recommendations.append(f"Convert '{col}' to numeric type")
return recommendations
def clean_data(self, df: pd.DataFrame, config: Dict[str, Any] = None) -> pd.DataFrame:
"""Clean data based on configuration"""
self.logger.info("Starting data cleaning process")
if config is None:
config = {
'remove_duplicates': True,
'handle_missing': 'drop',
'standardize_columns': True,
'remove_outliers': False
}
df_cleaned = df.copy()
# Remove duplicates
if config.get('remove_duplicates', True):
initial_count = len(df_cleaned)
df_cleaned = df_cleaned.drop_duplicates()
removed = initial_count - len(df_cleaned)
self.logger.info(f"Removed {removed} duplicate records")
# Handle missing values
missing_strategy = config.get('handle_missing', 'drop')
if missing_strategy == 'drop':
df_cleaned = df_cleaned.dropna()
elif missing_strategy == 'forward_fill':
df_cleaned = df_cleaned.fillna(method='ffill')
elif missing_strategy == 'backward_fill':
df_cleaned = df_cleaned.fillna(method='bfill')
elif missing_strategy == 'mean':
numeric_cols = df_cleaned.select_dtypes(include=[np.number]).columns
df_cleaned[numeric_cols] = df_cleaned[numeric_cols].fillna(df_cleaned[numeric_cols].mean())
# Standardize column names
if config.get('standardize_columns', True):
df_cleaned.columns = df_cleaned.columns.str.lower().str.replace(' ', '_')
# Remove outliers (using IQR method)
if config.get('remove_outliers', False):
df_cleaned = self._remove_outliers(df_cleaned)
self.logger.info(f"Cleaning complete. Records: {len(df)} -> {len(df_cleaned)}")
return df_cleaned
def _remove_outliers(self, df: pd.DataFrame, method: str = 'iqr') -> pd.DataFrame:
"""Remove outliers using specified method"""
numeric_cols = df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
Q1 = df[col].quantile(0.25)
Q3 = df[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
df = df[(df[col] >= lower_bound) & (df[col] <= upper_bound)]
return df
def generate_statistical_summary(self, df: pd.DataFrame) -> Dict[str, Any]:
"""Generate comprehensive statistical summary"""
self.logger.info("Generating statistical summary")
summary = {
'basic_stats': df.describe().to_dict(),
'shape': df.shape,
'data_types': df.dtypes.to_dict(),
'missing_values': df.isnull().sum().to_dict(),
'memory_usage': df.memory_usage(deep=True).to_dict()
}
# Correlation analysis for numeric columns
numeric_cols = df.select_dtypes(include=[np.number]).columns
if len(numeric_cols) > 1:
summary['correlation_matrix'] = df[numeric_cols].corr().to_dict()
# Categorical analysis
categorical_cols = df.select_dtypes(include=['object', 'category']).columns
if len(categorical_cols) > 0:
summary['categorical_stats'] = {}
for col in categorical_cols:
summary['categorical_stats'][col] = {
'unique_values': df[col].nunique(),
'most_frequent': df[col].mode().iloc[0] if not df[col].mode().empty else None,
'value_counts': df[col].value_counts().head(10).to_dict()
}
return summary
def create_time_series_features(self, df: pd.DataFrame, date_column: str) -> pd.DataFrame:
"""Create time series features from date column"""
self.logger.info(f"Creating time series features from {date_column}")
df = df.copy()
df[date_column] = pd.to_datetime(df[date_column])
# Extract time components
df['year'] = df[date_column].dt.year
df['month'] = df[date_column].dt.month
df['day'] = df[date_column].dt.day
df['weekday'] = df[date_column].dt.weekday
df['quarter'] = df[date_column].dt.quarter
df['week_of_year'] = df[date_column].dt.isocalendar().week
df['is_weekend'] = df['weekday'].isin([5, 6]).astype(int)
# Create lag features
df = df.sort_values(date_column)
for lag in [1, 7, 30]:
for col in df.select_dtypes(include=[np.number]).columns:
if col not in ['year', 'month', 'day', 'weekday', 'quarter', 'week_of_year', 'is_weekend']:
df[f'{col}_lag_{lag}'] = df[col].shift(lag)
return df
# Example usage
if __name__ == "__main__":
# Initialize processor
processor = DataProcessor(log_level="INFO")
# Create sample data
np.random.seed(42)
sample_data = pd.DataFrame({
'date': pd.date_range('2023-01-01', periods=1000, freq='D'),
'sales': np.random.normal(1000, 200, 1000),
'customers': np.random.randint(50, 200, 1000),
'category': np.random.choice(['A', 'B', 'C'], 1000),
'region': np.random.choice(['North', 'South', 'East', 'West'], 1000)
})
# Add some missing values and duplicates for testing
sample_data.loc[sample_data.index[:50], 'sales'] = np.nan
sample_data = pd.concat([sample_data, sample_data.head(10)])
# Data quality assessment
quality_report = processor.validate_data_quality(sample_data, "Sales Data")
print("Data Quality Report:")
print(f"Quality Score: {quality_report.quality_score:.2f}/100")
print(f"Recommendations: {quality_report.recommendations}")
# Clean data
cleaned_data = processor.clean_data(sample_data)
# Generate statistical summary
stats = processor.generate_statistical_summary(cleaned_data)
print(f"\nDataset shape: {stats['shape']}")
print(f"Missing values: {stats['missing_values']}")
# Create time series features
ts_data = processor.create_time_series_features(cleaned_data, 'date')
print(f"\nTime series features added. New shape: {ts_data.shape}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment