Skip to content

Instantly share code, notes, and snippets.

@davidlu1001
Created July 15, 2025 05:50
Show Gist options
  • Save davidlu1001/78a7b6b25322503444f3db251950389d to your computer and use it in GitHub Desktop.
Save davidlu1001/78a7b6b25322503444f3db251950389d to your computer and use it in GitHub Desktop.
aks_vul_report_duckdb.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
AKS Vulnerability Reporting Tool
Description:
This script queries Azure KQL for AKS cluster vulnerabilities, performs historical trend
analysis using a local DuckDB database, and sends a formatted HTML email summary
with a full CSV report as an attachment.
Setup:
1. Install required packages:
pip install -r requirements.txt
2. Set the following environment variables for email configuration:
- AKS_VUL_REPORT_SENDER: Sender's email address
- AKS_VUL_REPORT_RECIPIENT: Primary recipient's email address
- AKS_VUL_REPORT_CC: CC recipients (comma-separated)
- AKS_VUL_REPORT_BCC: BCC recipients (comma-separated)
- AKS_VUL_REPORT_SMTP: SMTP server hostname
- AKS_VUL_REPORT_SMTP_PORT: SMTP server port (default: 25)
- AKS_VUL_REPORT_SMTP_USE_TLS: Set to '1' to use STARTTLS
Usage Examples:
# Run for PROD environment and email the report
python3 aks_vul_report.py --env PROD
# Perform a dry run for the QA environment (prints email to console)
python3 aks_vul_report.py --env QA --dry-run
# Generate report with all historical periods
python3 aks_vul_report.py --env PROD --compare-period all
# See all options
python3 aks_vul_report.py --help
"""
import argparse
import json
import logging
import os
import smtplib
import ssl
import subprocess
import sys
import tempfile
from datetime import datetime, date
from email.message import EmailMessage
from typing import Dict, List, Optional, Tuple, Union
import pandas as pd
import requests
from dateutil.relativedelta import relativedelta
# --- DuckDB Setup (Conditional Import) ---
try:
import duckdb
DUCKDB_AVAILABLE = True
except ImportError:
duckdb = None
DUCKDB_AVAILABLE = False
# --- Constants and Configuration ---
# DuckDB database configuration
DUCKDB_PATH = os.path.join(os.path.dirname(__file__), "aks_vuln_scans.duckdb")
DUCKDB_TABLE = "aks_vulnerability_scans"
# Risk level color mapping for consistent styling
RISK_COLORS = {
'critical': '#dc3545', # Red
'high': '#fd7e14', # Orange
'medium': '#ffc107', # Yellow
'low': '#198754' # Green
}
# Request timeout settings
REQUEST_TIMEOUT = 10 # seconds
AZURE_CLI_TIMEOUT = 300 # 5 minutes
# --- Azure Resource Graph Query Functions ---
def run_az_graph_query(kql_query: str, output_json_path: str) -> bool:
"""
Executes an Azure Resource Graph query using the 'az' CLI tool.
Args:
kql_query: The KQL (Kusto Query Language) query string to execute
output_json_path: The file path to save the JSON query results
Returns:
bool: True if the query executed successfully, False otherwise
Raises:
No exceptions are raised; errors are logged and False is returned
"""
logging.info("πŸ” Executing Azure Resource Graph query...")
kql_file_path = None
try:
# Create temporary file for KQL query
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.kql', encoding='utf-8') as kql_file:
kql_file.write(kql_query)
kql_file_path = kql_file.name
logging.debug(f"Created temporary KQL file: {kql_file_path}")
# Build Azure CLI command
az_cmd = [
"az", "graph", "query",
"-q", f"@{kql_file_path}",
"--output", "json"
]
logging.debug(f"Executing command: {' '.join(az_cmd)}")
# Execute Azure CLI command with timeout
result = subprocess.run(
az_cmd,
check=True,
capture_output=True,
text=True,
timeout=AZURE_CLI_TIMEOUT,
encoding='utf-8'
)
# Save query results to JSON file
with open(output_json_path, 'w', encoding='utf-8') as f:
f.write(result.stdout)
logging.info(f"βœ… Successfully saved query results to {output_json_path}")
return True
except subprocess.CalledProcessError as e:
logging.error(f"❌ Azure CLI command failed with exit code {e.returncode}")
logging.error(f" Error output: {e.stderr}")
return False
except subprocess.TimeoutExpired:
logging.error(f"❌ Azure CLI command timed out after {AZURE_CLI_TIMEOUT} seconds")
return False
except FileNotFoundError:
logging.error("❌ Azure CLI (az) command not found. Please ensure Azure CLI is installed and in PATH")
return False
except Exception as e:
logging.error(f"❌ Unexpected error executing Azure query: {e}")
return False
finally:
# Clean up temporary KQL file
if kql_file_path and os.path.exists(kql_file_path):
try:
os.remove(kql_file_path)
logging.debug(f"Cleaned up temporary KQL file: {kql_file_path}")
except OSError as e:
logging.warning(f"⚠️ Failed to cleanup temporary file {kql_file_path}: {e}")
def process_vulnerability_data(json_path: str) -> Optional[pd.DataFrame]:
"""
Loads and processes vulnerability data from Azure Resource Graph JSON output.
Args:
json_path: Path to the JSON file containing query results
Returns:
pd.DataFrame: Processed DataFrame with vulnerability data, or None if no data
Note:
The function normalizes column names and ensures all required columns exist
with appropriate default values.
"""
logging.info("πŸ“Š Processing vulnerability data...")
try:
# Load JSON data
with open(json_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Handle different JSON response structures from Azure Resource Graph
if isinstance(data, dict):
# Standard Azure Resource Graph response format
records = data.get('data', [])
if 'data' not in data:
logging.warning("JSON response missing 'data' field")
elif isinstance(data, list):
# Direct list format
records = data
else:
logging.warning(f"Unexpected JSON structure: {type(data)}")
return None
if not records:
logging.warning("⚠️ No vulnerability records found in query output")
return None
# Convert to DataFrame using pandas json_normalize for nested objects
df = pd.json_normalize(records)
logging.debug(f"Raw DataFrame shape: {df.shape}")
logging.debug(f"Raw columns: {df.columns.tolist()}")
# Define required columns with their default values
required_columns = {
"clusterName": "Unknown-Cluster",
"environmentRisk": "Unknown-Environment",
"cveId": "Unknown-CVE",
"riskLevel": "Unknown",
"repositoryName": "Unknown-Repository",
"imageTags": "Unknown-Tag",
"publishedDate": None,
"ageInDays": 0,
"subAssessmentDescription": "No description available"
}
# Ensure all required columns exist, add missing ones with defaults
for col, default_val in required_columns.items():
if col not in df.columns:
df[col] = default_val
logging.debug(f"Added missing column '{col}' with default value")
# Select only required columns in correct order
df = df[list(required_columns.keys())].copy()
# Data cleaning and standardization
# Clean and standardize risk levels for consistent processing
df["riskLevel_clean"] = df["riskLevel"].astype(str).str.strip().str.lower()
# Handle date conversion with error handling
if 'publishedDate' in df.columns:
df['publishedDate'] = pd.to_datetime(df['publishedDate'], errors='coerce')
# Ensure ageInDays is numeric, replace invalid values with 0
df['ageInDays'] = pd.to_numeric(df['ageInDays'], errors='coerce').fillna(0).astype(int)
# Clean text fields
for col in ['clusterName', 'repositoryName', 'imageTags', 'subAssessmentDescription']:
if col in df.columns:
df[col] = df[col].astype(str).str.strip()
logging.info(f"βœ… Processed {len(df)} vulnerability records")
logging.info(f" Risk level distribution: {df['riskLevel_clean'].value_counts().to_dict()}")
return df
except (FileNotFoundError, json.JSONDecodeError) as e:
logging.error(f"❌ Failed to load/parse JSON file {json_path}: {e}")
return None
except KeyError as e:
logging.error(f"❌ Missing expected field in JSON data: {e}")
return None
except Exception as e:
logging.error(f"❌ Unexpected error processing vulnerability data: {e}")
return None
# --- DuckDB Historical Analysis Functions ---
def ensure_duckdb_table_exists(conn) -> None:
"""
Creates the DuckDB table for storing vulnerability scan history if it doesn't exist.
Args:
conn: DuckDB connection object
Raises:
Exception: If table creation fails
"""
try:
create_table_sql = f"""
CREATE TABLE IF NOT EXISTS {DUCKDB_TABLE} (
scan_date DATE NOT NULL,
clusterName VARCHAR,
environmentRisk VARCHAR,
cveId VARCHAR,
riskLevel VARCHAR,
repositoryName VARCHAR,
imageTags VARCHAR,
publishedDate TIMESTAMP,
ageInDays INTEGER,
subAssessmentDescription VARCHAR,
-- Create composite index for better query performance
PRIMARY KEY (scan_date, clusterName, cveId)
);
"""
conn.execute(create_table_sql)
logging.debug("βœ… DuckDB table structure verified/created")
# Verify table exists and get row count
row_count = conn.execute(f"SELECT COUNT(*) FROM {DUCKDB_TABLE}").fetchone()[0]
logging.debug(f" Current table contains {row_count} historical records")
except Exception as e:
logging.error(f"❌ Failed to create/verify DuckDB table: {e}")
raise
def append_scan_to_duckdb(conn, df: pd.DataFrame, scan_date: str) -> None:
"""
Appends the latest vulnerability scan data to DuckDB, preventing duplicates.
Args:
conn: DuckDB connection object
df: DataFrame containing vulnerability data
scan_date: Date string in YYYY-MM-DD format
Raises:
Exception: If data insertion fails
"""
logging.info(f"πŸ’Ύ Storing scan data for {scan_date} in DuckDB...")
try:
# Prepare data for storage
df_to_store = df.copy()
df_to_store['scan_date'] = scan_date
# Ensure correct column order matching table schema
cols_order = [
'scan_date', 'clusterName', 'environmentRisk', 'cveId', 'riskLevel',
'repositoryName', 'imageTags', 'publishedDate', 'ageInDays', 'subAssessmentDescription'
]
df_to_store = df_to_store[cols_order]
# Remove any existing data for this scan date to prevent duplicates
delete_count = conn.execute(
f"DELETE FROM {DUCKDB_TABLE} WHERE scan_date = ?",
[scan_date]
).fetchone()
if delete_count and delete_count[0] > 0:
logging.info(f" Removed {delete_count[0]} existing records for {scan_date}")
# Insert new data
conn.execute(f"INSERT INTO {DUCKDB_TABLE} SELECT * FROM df_to_store")
# Verify insertion
row_count = conn.execute(
f"SELECT COUNT(*) FROM {DUCKDB_TABLE} WHERE scan_date = ?",
[scan_date]
).fetchone()[0]
logging.info(f"βœ… Successfully stored {row_count} records for {scan_date}")
except Exception as e:
logging.error(f"❌ Failed to append scan data to DuckDB: {e}")
raise
def get_scan_df(conn, scan_date: str) -> pd.DataFrame:
"""
Retrieves vulnerability scan data from DuckDB for a specific date.
Args:
conn: DuckDB connection object
scan_date: Date string in YYYY-MM-DD format
Returns:
pd.DataFrame: Vulnerability data for the specified date, empty if not found
"""
try:
query = f"""
SELECT * FROM {DUCKDB_TABLE}
WHERE scan_date = ?
ORDER BY clusterName, riskLevel, cveId
"""
result_df = conn.execute(query, [scan_date]).df()
logging.debug(f"Retrieved {len(result_df)} records for {scan_date}")
return result_df
except Exception as e:
logging.error(f"❌ Failed to retrieve scan data for {scan_date}: {e}")
return pd.DataFrame()
def compute_vuln_diffs(df_this: pd.DataFrame, df_last: pd.DataFrame) -> Dict[str, int]:
"""
Computes differences between two vulnerability scans.
Args:
df_this: Current vulnerability scan DataFrame
df_last: Previous vulnerability scan DataFrame
Returns:
Dict containing counts of new, fixed, persistent, and total vulnerabilities
Note:
Vulnerabilities are compared using a composite key of:
[cveId, repositoryName, clusterName, environmentRisk]
"""
# Handle empty DataFrames
if df_this.empty and df_last.empty:
return {"new": 0, "fixed": 0, "persistent": 0, "total_this": 0, "total_last": 0}
# Define key columns for vulnerability comparison
key_cols = ['cveId', 'repositoryName', 'clusterName', 'environmentRisk']
# Ensure all key columns exist in both DataFrames
for col in key_cols:
if col not in df_this.columns:
df_this[col] = "Unknown"
if col not in df_last.columns:
df_last[col] = "Unknown"
# Create sets of vulnerability signatures for comparison
set_this = set(tuple(row) for row in df_this[key_cols].to_numpy())
set_last = set(tuple(row) for row in df_last[key_cols].to_numpy())
# Calculate differences
new_vulns = set_this - set_last
fixed_vulns = set_last - set_this
persistent_vulns = set_this & set_last
return {
"new": len(new_vulns),
"fixed": len(fixed_vulns),
"persistent": len(persistent_vulns),
"total_this": len(set_this),
"total_last": len(set_last)
}
def get_period_scan_dates(conn, period: str, scan_date: date) -> Tuple[Optional[str], Optional[str]]:
"""
Determines current and previous scan dates based on the comparison period.
Args:
conn: DuckDB connection object
period: Comparison period ('day', 'week', 'month', 'quarter', 'year')
scan_date: Current scan date
Returns:
Tuple of (current_date_str, previous_date_str) or (None, None) if not found
"""
# Define period mappings
period_mapping = {
'day': relativedelta(days=1),
'week': relativedelta(weeks=1),
'month': relativedelta(months=1),
'quarter': relativedelta(months=3),
'year': relativedelta(years=1)
}
# Calculate previous date
delta = period_mapping.get(period, relativedelta(days=1))
prev_date_obj = scan_date - delta
# Convert to string format
scan_date_str = scan_date.strftime('%Y-%m-%d')
prev_date_str = prev_date_obj.strftime('%Y-%m-%d')
try:
# Get all available scan dates from database
available_dates_query = f"SELECT DISTINCT scan_date FROM {DUCKDB_TABLE} ORDER BY scan_date DESC"
all_scan_dates = {str(r[0]) for r in conn.execute(available_dates_query).fetchall()}
# Check if requested dates exist
curr_date = scan_date_str if scan_date_str in all_scan_dates else None
prev_date = prev_date_str if prev_date_str in all_scan_dates else None
logging.debug(f"Period {period}: Current={curr_date}, Previous={prev_date}")
logging.debug(f"Available dates: {sorted(all_scan_dates, reverse=True)[:5]}...")
return curr_date, prev_date
except Exception as e:
logging.error(f"❌ Failed to get period scan dates: {e}")
return None, None
def compute_periodic_diffs(conn, period_label: str, period: str, scan_date: date) -> Dict:
"""
Computes vulnerability differences for a specific time period.
Args:
conn: DuckDB connection object
period_label: Human-readable period description
period: Period type ('day', 'week', etc.)
scan_date: Current scan date
Returns:
Dictionary containing period comparison results
"""
curr_date, prev_date = get_period_scan_dates(conn, period, scan_date)
result = {
"label": period_label,
"curr_date": curr_date,
"prev_date": prev_date,
"period": period,
"diffs": None
}
if curr_date and prev_date:
# Get data for both dates
df_this = get_scan_df(conn, curr_date)
df_last = get_scan_df(conn, prev_date)
# Compute differences
diffs = compute_vuln_diffs(df_this, df_last)
result["diffs"] = diffs
logging.debug(f"{period_label}: {diffs}")
else:
logging.debug(f"{period_label}: Insufficient data for comparison")
return result
# --- External API Functions ---
def get_latest_tags(repo_url: str, version_pattern: str, headers: Dict) -> List[str]:
"""
Fetches and filters the latest image tags from a container registry API.
Args:
repo_url: API endpoint URL for the container repository
version_pattern: Regex pattern to filter tags
headers: HTTP headers for the API request
Returns:
List of up to 5 latest matching tags, or error messages if failed
Note:
This function supports both Docker Hub and Microsoft Container Registry APIs
"""
try:
logging.debug(f"Fetching tags from {repo_url}")
# Make API request with timeout
response = requests.get(repo_url, headers=headers, timeout=REQUEST_TIMEOUT)
response.raise_for_status()
data = response.json()
# Extract tags based on different API response structures
tags = []
if 'results' in data:
# Docker Hub format
tags = [item.get('name', '') for item in data['results'] if isinstance(item, dict)]
elif 'tags' in data:
# Generic tags array format
tags = data['tags'] if isinstance(data['tags'], list) else []
else:
logging.warning(f"Unknown API response structure from {repo_url}")
return ["Unknown API response format"]
# Filter tags using regex pattern
import re
try:
filtered_tags = [tag for tag in tags if tag and re.match(version_pattern, tag)]
except re.error as e:
logging.warning(f"Invalid regex pattern '{version_pattern}': {e}")
filtered_tags = tags[:10] # Fallback to first 10 tags
# Sort tags using version-aware sorting if possible
try:
from packaging import version
filtered_tags.sort(key=version.parse, reverse=True)
except (ImportError, Exception):
# Fallback to lexical sorting
filtered_tags.sort(reverse=True)
result = filtered_tags[:5]
logging.debug(f"Found {len(result)} matching tags")
return result
except requests.RequestException as e:
error_msg = f"API request failed: {str(e)[:50]}..."
logging.warning(f"Failed to fetch tags from {repo_url}: {e}")
return [error_msg]
except Exception as e:
error_msg = f"Unexpected error: {str(e)[:50]}..."
logging.warning(f"Unexpected error fetching tags: {e}")
return [error_msg]
# --- HTML Email Generation Functions ---
def get_html_styles() -> str:
"""
Returns modern CSS styles for the HTML email template.
Returns:
String containing CSS styles optimized for email clients
Note:
Styles use inline-compatible CSS for maximum email client compatibility
"""
return """
body {
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
margin: 0; padding: 0; background-color: #f8f9fa; line-height: 1.6;
color: #333;
}
.container {
max-width: 1200px; margin: 0 auto; padding: 20px; background-color: white;
box-shadow: 0 0 10px rgba(0,0,0,0.1); border-radius: 8px;
}
h2 {
color: #003366; border-bottom: 3px solid #003366; padding-bottom: 12px;
margin-bottom: 20px; font-size: 24px; font-weight: 600;
}
h3 {
color: #005691; margin-top: 30px; margin-bottom: 15px; font-size: 18px;
font-weight: 600;
}
h4 {
color: #005691; margin: 15px 0 8px 0; font-size: 14px; font-weight: 600;
}
.summary-note {
font-size: 14px; margin-bottom: 20px; background-color: #e7f3ff;
padding: 20px; border-radius: 8px; border-left: 4px solid #007bff;
}
.summary-note ul {
margin: 10px 0; padding-left: 20px;
}
.summary-note li {
margin: 8px 0;
}
.table-container {
margin: 25px 0; overflow-x: auto; border-radius: 8px;
box-shadow: 0 2px 8px rgba(0,0,0,0.1); background-color: white;
}
table {
border-collapse: collapse; width: 100%; min-width: 700px;
background-color: white; font-size: 13px;
}
th, td {
border: 1px solid #dee2e6; padding: 12px 15px; text-align: left;
vertical-align: top;
}
th {
background: linear-gradient(135deg, #003366, #004488);
color: white; font-size: 14px; font-weight: 600;
text-transform: uppercase; letter-spacing: 0.5px;
}
tr:nth-child(even) {
background-color: #f8f9fa;
}
tr:hover {
background-color: #e3f2fd;
}
caption {
caption-side: top; font-weight: bold; font-size: 16px;
padding: 15px; text-align: left; color: #003366;
background-color: #f8f9fa; border-radius: 8px 8px 0 0;
}
.footer {
font-size: 12px; color: #6c757d; margin-top: 40px;
text-align: center; padding: 25px 20px; border-top: 2px solid #dee2e6;
background-color: #f8f9fa; border-radius: 8px;
}
.footer a {
color: #007bff; text-decoration: none;
}
.footer a:hover {
text-decoration: underline;
}
.legend {
margin-top: 15px;
}
.legend span {
padding: 6px 12px; border-radius: 20px; font-weight: bold;
margin-right: 10px; font-size: 12px; display: inline-block;
margin-bottom: 6px; color: white;
}
.status-badge {
padding: 4px 10px; border-radius: 6px; font-weight: bold;
font-size: 11px; text-transform: uppercase; color: white;
display: inline-block;
}
.critical-bg { background-color: #dc3545; }
.high-bg { background-color: #fd7e14; }
.medium-bg { background-color: #ffc107; color: #000 !important; }
.low-bg { background-color: #198754; }
.age-warning { background-color: #fff3cd; color: #856404; font-weight: bold; }
.age-critical { background-color: #f8d7da; color: #721c24; font-weight: bold; }
.text-center { text-align: center; }
.text-muted { color: #6c757d; }
.mb-3 { margin-bottom: 1rem; }
.alert {
padding: 15px; border-radius: 6px; margin: 15px 0;
}
.alert-info {
background-color: #d1ecf1; color: #0c5460; border-left: 4px solid #bee5eb;
}
.alert-warning {
background-color: #fff3cd; color: #856404; border-left: 4px solid #ffeaa7;
}
.flex-container {
display: flex; flex-wrap: wrap; gap: 20px;
}
.flex-item {
flex: 1; min-width: 300px;
}
.tag-list {
margin-top: 8px; padding-left: 20px; font-size: 13px;
}
.tag-list li {
margin: 4px 0;
}
.tag-code {
background: #f8f9fa; padding: 3px 8px; border-radius: 4px;
font-family: 'Courier New', monospace; font-size: 12px;
border: 1px solid #e9ecef;
}
.trend-indicator {
font-size: 16px; margin-right: 5px;
}
"""
def render_summary_box(env: str, df: pd.DataFrame) -> str:
"""
Renders an enhanced vulnerability severity summary table.
Args:
env: Environment name (QA/PROD)
df: DataFrame containing vulnerability data
Returns:
HTML string for the summary table
"""
# Calculate vulnerability counts by severity
counts = {
'Critical': int((df['riskLevel_clean'] == 'critical').sum()),
'High': int((df['riskLevel_clean'] == 'high').sum()),
'Medium': int((df['riskLevel_clean'] == 'medium').sum()),
'Low': int((df['riskLevel_clean'] == 'low').sum()),
}
total = sum(counts.values())
# Generate colored cells for each severity level
cells = ""
for severity, color in RISK_COLORS.items():
count = counts[severity.title()]
cells += f'<td class="text-center" style="background-color:{color}; color:white; font-weight:bold; font-size:14px;">{count}</td>'
return f"""
<div class="table-container">
<table>
<caption>πŸ›‘οΈ Vulnerability Severity Summary</caption>
<tr>
<th>Environment</th><th>Critical</th><th>High</th><th>Medium</th><th>Low</th><th>Total</th>
</tr>
<tr>
<td class="text-center" style="font-weight:bold; font-size:14px;">{env}</td>
{cells}
<td class="text-center" style="font-weight:bold; font-size:14px; background-color:#6c757d; color:white;">{total}</td>
</tr>
</table>
</div>
"""
def render_multi_period_summary(period_diffs: List[Dict]) -> str:
"""
Renders historical trend analysis table with visual indicators.
Args:
period_diffs: List of period comparison dictionaries
Returns:
HTML string for the trend analysis table
"""
if not period_diffs:
return '<div class="alert alert-info">πŸ“Š No historical trend data available.</div>'
rows = ""
for pdiff in period_diffs:
label = pdiff.get("label", "Unknown")
if pdiff.get("diffs"):
d = pdiff["diffs"]
new_count = d.get('new', 0)
fixed_count = d.get('fixed', 0)
persistent_count = d.get('persistent', 0)
total_current = d.get('total_this', 0)
total_previous = d.get('total_last', 0)
# Add trend indicators and styling
if new_count > 0:
new_indicator = '<span class="trend-indicator">πŸ”Ί</span>'
new_style = 'color:#dc3545; font-weight:bold;'
else:
new_indicator = '<span class="trend-indicator">➑️</span>'
new_style = 'color:#6c757d;'
if fixed_count > 0:
fixed_indicator = '<span class="trend-indicator">πŸ”»</span>'
fixed_style = 'color:#198754; font-weight:bold;'
else:
fixed_indicator = '<span class="trend-indicator">➑️</span>'
fixed_style = 'color:#6c757d;'
# Calculate trend percentage
if total_previous > 0:
trend_pct = ((total_current - total_previous) / total_previous) * 100
if trend_pct > 0:
trend_text = f'(+{trend_pct:.1f}%)'
trend_color = '#dc3545'
elif trend_pct < 0:
trend_text = f'({trend_pct:.1f}%)'
trend_color = '#198754'
else:
trend_text = '(0%)'
trend_color = '#6c757d'
else:
trend_text = ''
trend_color = '#6c757d'
rows += f"""
<tr>
<td style="font-weight:bold;">{label}</td>
<td class="text-center">{pdiff.get('curr_date', 'N/A')}</td>
<td class="text-center">{pdiff.get('prev_date', 'N/A')}</td>
<td class="text-center" style="{new_style}">{new_indicator}{new_count}</td>
<td class="text-center" style="{fixed_style}">{fixed_indicator}{fixed_count}</td>
<td class="text-center" style="color:#6c757d;">{persistent_count}</td>
<td class="text-center" style="font-weight:bold;">{total_current}</td>
<td class="text-center" style="color:#6c757d;">{total_previous}</td>
<td class="text-center" style="color:{trend_color}; font-size:12px; font-weight:bold;">{trend_text}</td>
</tr>
"""
else:
rows += f"""
<tr>
<td style="font-weight:bold;">{label}</td>
<td colspan="8" class="text-center text-muted" style="font-style:italic;">
ℹ️ Insufficient data for comparison
</td>
</tr>
"""
return f"""
<div class="table-container">
<table>
<caption>πŸ“ˆ Vulnerability Trend Analysis</caption>
<tr>
<th>Period</th><th>Current Date</th><th>Previous Date</th>
<th>New</th><th>Fixed</th><th>Persistent</th>
<th>Total (Now)</th><th>Total (Prev)</th><th>Trend</th>
</tr>
{rows}
</table>
</div>
"""
def render_main_table(df: pd.DataFrame) -> str:
"""
Renders the main vulnerability data table with enhanced styling and filtering.
Args:
df: DataFrame containing vulnerability data
Returns:
HTML string for the detailed vulnerability table
Note:
Only displays Critical and High severity vulnerabilities in the main table
"""
if df.empty:
return '<div class="alert alert-info text-center">βœ… No vulnerability data available.</div>'
# Filter for Critical and High severity vulnerabilities only
df_filtered = df[df['riskLevel_clean'].isin(['critical', 'high'])].copy()
if df_filtered.empty:
return '<div class="alert alert-info text-center">βœ… No Critical or High severity vulnerabilities found.</div>'
logging.info(f"πŸ“‹ Rendering detailed table with {len(df_filtered)} Critical/High vulnerabilities")
# Prepare data for display
df_display = df_filtered.drop(columns=["riskLevel_clean"], errors='ignore').copy()
# Format published dates
if 'publishedDate' in df_display.columns:
df_display['publishedDate'] = pd.to_datetime(
df_display['publishedDate'], errors='coerce'
).dt.strftime('%Y-%m-%d')
df_display['publishedDate'] = df_display['publishedDate'].fillna('Unknown')
# Truncate long descriptions for better table formatting
if 'subAssessmentDescription' in df_display.columns:
df_display['subAssessmentDescription'] = df_display['subAssessmentDescription'].astype(str).apply(
lambda x: (x[:120] + "...") if len(str(x)) > 120 else x
)
# Generate HTML table rows with enhanced styling
html_rows = ""
for idx, row in df_display.iterrows():
risk_level = str(row.get('riskLevel', '')).lower()
# Apply risk-based styling
if risk_level == 'critical':
risk_badge_class = 'critical-bg'
elif risk_level == 'high':
risk_badge_class = 'high-bg'
elif risk_level == 'medium':
risk_badge_class = 'medium-bg'
elif risk_level == 'low':
risk_badge_class = 'low-bg'
else:
risk_badge_class = 'text-muted'
html_rows += "<tr>"
for col in df_display.columns:
cell_value = str(row[col]) if pd.notna(row[col]) else "N/A"
if col == 'riskLevel':
# Render risk level as colored badge
html_rows += f'<td class="text-center"><span class="status-badge {risk_badge_class}">{cell_value}</span></td>'
elif col == 'ageInDays':
# Apply age-based styling
age = int(row[col]) if pd.notna(row[col]) and str(row[col]).isdigit() else 0
if age > 365:
age_class = 'age-critical'
elif age > 90:
age_class = 'age-warning'
else:
age_class = ''
html_rows += f'<td class="text-center {age_class}">{age}</td>'
elif col == 'cveId':
# Make CVE IDs clickable links to NIST database
if cell_value.startswith('CVE-'):
cve_url = f"https://nvd.nist.gov/vuln/detail/{cell_value}"
html_rows += f'<td><a href="{cve_url}" target="_blank" style="color:#007bff; text-decoration:none;">{cell_value}</a></td>'
else:
html_rows += f'<td>{cell_value}</td>'
else:
# Standard cell
html_rows += f'<td>{cell_value}</td>'
html_rows += "</tr>"
# Generate table headers with proper formatting
headers = ""
for col in df_display.columns:
# Format column names for display
display_name = col.replace('_', ' ').title()
if col == 'cveId':
display_name = 'CVE ID'
elif col == 'ageInDays':
display_name = 'Age (Days)'
elif col == 'riskLevel':
display_name = 'Risk Level'
elif col == 'publishedDate':
display_name = 'Published Date'
elif col == 'clusterName':
display_name = 'Cluster Name'
elif col == 'repositoryName':
display_name = 'Repository'
elif col == 'imageTags':
display_name = 'Image Tags'
elif col == 'subAssessmentDescription':
display_name = 'Description'
elif col == 'environmentRisk':
display_name = 'Environment'
headers += f"<th>{display_name}</th>"
return f"""
<div class="table-container">
<table>
<caption>🚨 Critical & High Severity Vulnerabilities</caption>
<tr>{headers}</tr>
{html_rows}
</table>
</div>
"""
def render_tag_list(tags: List[str], image_name: str) -> str:
"""
Renders a formatted list of container image tags.
Args:
tags: List of image tags
image_name: Name of the container image
Returns:
HTML string for the tag list
"""
if not tags or any("Error" in str(tag) for tag in tags):
error_msg = tags[0] if tags else f"No tags found for {image_name}"
return f'<div class="alert alert-warning">⚠️ {error_msg}</div>'
# Generate list items with enhanced styling
items = ""
for tag in tags:
items += f'<li><span class="tag-code">{tag}</span></li>'
return f'<ul class="tag-list">{items}</ul>'
# --- Email Composition and Sending ---
def create_and_send_email(subject: str, body_html: str, csv_df: pd.DataFrame, config: Dict, dry_run: bool = False) -> None:
"""
Creates and sends the vulnerability report email with attachments.
Args:
subject: Email subject line
body_html: HTML email body content
csv_df: DataFrame to attach as CSV
config: Email configuration dictionary
dry_run: If True, only log email details without sending
Raises:
SystemExit: If email sending fails in production mode
"""
# Create email message object
msg = EmailMessage()
msg['Subject'] = subject
msg['From'] = config['SENDER']
msg['To'] = config['RECIPIENT']
# Add CC and BCC recipients if specified
if config.get('CC'):
msg['Cc'] = config['CC']
if config.get('BCC'):
msg['Bcc'] = config['BCC']
# Calculate summary statistics for plain text version
total_vulns = len(csv_df)
critical_count = int((csv_df['riskLevel_clean'] == 'critical').sum()) if 'riskLevel_clean' in csv_df.columns else 0
high_count = int((csv_df['riskLevel_clean'] == 'high').sum()) if 'riskLevel_clean' in csv_df.columns else 0
medium_count = int((csv_df['riskLevel_clean'] == 'medium').sum()) if 'riskLevel_clean' in csv_df.columns else 0
low_count = int((csv_df['riskLevel_clean'] == 'low').sum()) if 'riskLevel_clean' in csv_df.columns else 0
# Create plain text fallback for email clients that don't support HTML
plain_text = f"""
AKS Vulnerability Report - {config['env_display']} Environment
===========================================================
This email contains a vulnerability assessment report for AKS clusters.
Please use an HTML-compatible email client to view the formatted report.
SUMMARY:
--------
Total Vulnerabilities: {total_vulns}
- Critical: {critical_count}
- High: {high_count}
- Medium: {medium_count}
- Low: {low_count}
The complete vulnerability data is available in the attached CSV file.
Report Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S UTC')}
Environment: {config['env_display']}
For questions or issues, please contact: {config['SENDER']}
"""
# Set email content
msg.set_content(plain_text)
msg.add_alternative(body_html, subtype='html')
# Create and attach CSV file
csv_filename = f"aks_vuln_results_{config['env'].lower()}_{datetime.now().strftime('%Y%m%d_%H%M')}.csv"
try:
# Generate CSV data with proper encoding
csv_data = csv_df.to_csv(index=False, encoding='utf-8')
msg.add_attachment(
csv_data.encode('utf-8'),
maintype='text',
subtype='csv',
filename=csv_filename
)
logging.info(f"πŸ“Ž Attached CSV report: {csv_filename} ({len(csv_df)} records)")
except Exception as e:
logging.error(f"❌ Failed to create CSV attachment: {e}")
# Log email preparation details
logging.info(f"πŸ“§ Preparing AKS Vulnerability Report for {config['RECIPIENT']}")
logging.info(f" Environment: {config['env_display']}")
logging.info(f" Vulnerabilities: {total_vulns} total ({critical_count} critical, {high_count} high)")
# Handle dry run mode
if dry_run:
logging.info("πŸ§ͺ DRY RUN MODE: Email will not be sent")
logging.info(f" πŸ“§ Subject: {msg['Subject']}")
logging.info(f" πŸ“¬ To: {msg['To']}")
if config.get('CC'):
logging.info(f" πŸ“‹ CC: {config['CC']}")
if config.get('BCC'):
logging.info(f" πŸ”’ BCC: {config['BCC']}")
logging.info(f" πŸ“Ž Attachment: {csv_filename}")
logging.info(f" 🌐 SMTP Server: {config['SMTP_SERVER']}:{config['SMTP_PORT']}")
logging.info(f" πŸ” TLS Enabled: {config.get('SMTP_USE_TLS', False)}")
# Show HTML preview in debug mode
if os.environ.get("SCRIPT_DEBUG"):
logging.debug("πŸ“ HTML Body Preview:")
logging.debug(body_html[:1000] + "..." if len(body_html) > 1000 else body_html)
return
# Send email in production mode
try:
# Create SSL context for secure connections
context = ssl.create_default_context()
# Connect to SMTP server and send email
with smtplib.SMTP(config['SMTP_SERVER'], config['SMTP_PORT'], timeout=30) as server:
# Enable debug output if in debug mode
if os.environ.get("SCRIPT_DEBUG"):
server.set_debuglevel(1)
# Use STARTTLS if configured
if config.get('SMTP_USE_TLS'):
server.starttls(context=context)
logging.debug("πŸ” STARTTLS encryption enabled")
# Send the message
server.send_message(msg)
logging.info("βœ… Email sent successfully")
except smtplib.SMTPException as e:
logging.error(f"❌ SMTP error occurred: {e}")
sys.exit(1)
except Exception as e:
logging.error(f"❌ Failed to send email: {e}")
sys.exit(1)
# --- Utility Functions ---
def validate_environment(env: str) -> str:
"""
Validates and normalizes the environment parameter.
Args:
env: Environment string (case-insensitive)
Returns:
Normalized environment string (uppercase)
Raises:
ValueError: If environment is not valid
"""
env_upper = env.upper().strip()
valid_environments = ['QA', 'PROD', 'PRODUCTION']
# Handle common variations
if env_upper in ['PRODUCTION']:
return 'PROD'
elif env_upper not in ['QA', 'PROD']:
raise ValueError(f"Invalid environment '{env}'. Must be 'QA' or 'PROD'")
return env_upper
def get_env_or_arg(arg_val: Optional[str], env_var: str, default: str = "") -> str:
"""
Gets configuration value from CLI argument or environment variable.
Args:
arg_val: Command line argument value
env_var: Environment variable name
default: Default value if neither is provided
Returns:
The configuration value
"""
if arg_val is not None:
return arg_val.strip()
return os.environ.get(env_var, default).strip()
def print_duckdb_table(db_path: str) -> None:
"""
Prints the contents of the DuckDB vulnerability history table for debugging.
Args:
db_path: Path to the DuckDB database file
"""
if not DUCKDB_AVAILABLE:
print("❌ DuckDB is not installed. Cannot display table contents.")
print(" Install with: pip install duckdb")
return
try:
with duckdb.connect(db_path) as conn:
# Get table schema
schema_query = f"DESCRIBE {DUCKDB_TABLE}"
try:
schema_df = conn.execute(schema_query).df()
print("πŸ“‹ Table Schema:")
print(schema_df.to_string(index=False))
print()
except:
print("⚠️ Table does not exist yet")
return
# Get table contents
content_query = f"""
SELECT scan_date, COUNT(*) as vulnerability_count,
COUNT(CASE WHEN riskLevel = 'Critical' THEN 1 END) as critical,
COUNT(CASE WHEN riskLevel = 'High' THEN 1 END) as high,
COUNT(CASE WHEN riskLevel = 'Medium' THEN 1 END) as medium,
COUNT(CASE WHEN riskLevel = 'Low' THEN 1 END) as low
FROM {DUCKDB_TABLE}
GROUP BY scan_date
ORDER BY scan_date DESC
LIMIT 20
"""
summary_df = conn.execute(content_query).df()
if summary_df.empty:
print("πŸ“Š DuckDB table exists but contains no data.")
else:
print(f"πŸ“Š DuckDB Historical Summary (Last 20 scan dates):")
print("=" * 80)
print(summary_df.to_string(index=False))
# Show total records
total_records = conn.execute(f"SELECT COUNT(*) FROM {DUCKDB_TABLE}").fetchone()[0]
print(f"\nTotal historical records: {total_records}")
# Show date range
date_range = conn.execute(f"SELECT MIN(scan_date), MAX(scan_date) FROM {DUCKDB_TABLE}").fetchone()
print(f"Date range: {date_range[0]} to {date_range[1]}")
except Exception as e:
print(f"❌ Error reading DuckDB table: {e}")
# --- Help and Documentation ---
def print_help() -> None:
"""
Prints comprehensive help information for the script.
"""
help_text = """
πŸ›‘οΈ AKS Vulnerability Report Generator
====================================
DESCRIPTION:
Automated tool for generating and emailing AKS vulnerability reports.
Queries Azure Resource Graph for vulnerability data, performs historical
trend analysis using DuckDB, and sends formatted HTML emails with CSV attachments.
USAGE:
python3 aks_vul_report.py --env <ENV> [options]
REQUIRED ARGUMENTS:
-e, --env Environment: QA or PROD (case-insensitive)
OPTIONAL ARGUMENTS:
--sender Email sender address (overrides AKS_VUL_REPORT_SENDER)
--recipient Main recipient email (overrides AKS_VUL_REPORT_RECIPIENT)
--cc CC recipients, comma-separated (overrides AKS_VUL_REPORT_CC)
--bcc BCC recipients, comma-separated (overrides AKS_VUL_REPORT_BCC)
--dry-run Generate report without sending email (testing mode)
--compare-period Historical comparison period:
β€’ day - Day-over-day comparison
β€’ week - Week-over-week comparison (default)
β€’ month - Month-over-month comparison
β€’ quarter - Quarter-over-quarter comparison
β€’ year - Year-over-year comparison
β€’ all - All periods in one report
--scan-date Override scan date for testing (format: YYYY-MM-DD)
--print-duckdb Display DuckDB table contents and exit
-h, --help Show this help message and exit
ENVIRONMENT VARIABLES:
AKS_VUL_REPORT_SENDER πŸ“§ Sender email address
AKS_VUL_REPORT_RECIPIENT πŸ“¬ Primary recipient email address
AKS_VUL_REPORT_CC πŸ“‹ CC recipients (comma-separated)
AKS_VUL_REPORT_BCC πŸ”’ BCC recipients (comma-separated)
AKS_VUL_REPORT_SMTP 🌐 SMTP server hostname (default: localhost)
AKS_VUL_REPORT_SMTP_PORT πŸ”Œ SMTP server port (default: 25)
AKS_VUL_REPORT_SMTP_USE_TLS πŸ” Enable STARTTLS (1/true to enable)
SCRIPT_DEBUG πŸ› Enable debug logging (any value)
EXAMPLES:
# Basic production report with weekly trends
python3 aks_vul_report.py --env PROD
# QA environment with monthly comparison, dry run mode
python3 aks_vul_report.py --env QA --compare-period month --dry-run
# Comprehensive report with all trend periods
python3 aks_vul_report.py --env PROD --compare-period all
# Test with specific historical date
python3 aks_vul_report.py --env QA --scan-date 2024-06-15 --dry-run
# Override email configuration
python3 aks_vul_report.py --env PROD \\
--sender [email protected] \\
--recipient [email protected] \\
--cc [email protected]
# Debug mode with verbose logging
SCRIPT_DEBUG=1 python3 aks_vul_report.py --env QA --dry-run
# Inspect historical vulnerability data
python3 aks_vul_report.py --env QA --print-duckdb
REQUIREMENTS:
β€’ Azure CLI (az) installed and authenticated
β€’ Python 3.8+ with required packages:
- pandas>=1.3.0
- duckdb>=0.8.0 (optional, for historical analysis)
- requests>=2.25.0
- python-dateutil>=2.8.0
FEATURES:
βœ… Azure Resource Graph integration
βœ… Historical trend analysis with DuckDB
βœ… Modern HTML email templates
βœ… CSV data exports
βœ… Upstream image reference tracking
βœ… Comprehensive error handling
βœ… Dry-run testing mode
βœ… Flexible scheduling support
NOTES:
β€’ The 'ageInDays' field represents days since CVE publication, not detection time
β€’ Critical and High severity vulnerabilities are highlighted in email reports
β€’ All vulnerabilities (including Medium/Low) are included in CSV attachments
β€’ DuckDB is optional; script runs without historical analysis if not installed
β€’ Historical data accumulates over time for better trend analysis
SECURITY:
β€’ All external API calls use proper timeout handling
β€’ Email content is properly encoded to prevent injection attacks
β€’ Sensitive configuration is read from environment variables
β€’ Debug mode excludes sensitive data from logs
For technical support or feature requests, contact the DevOps team.
"""
print(help_text)
# --- Main Application Logic ---
def main() -> None:
"""
Main application entry point that orchestrates the vulnerability report generation.
This function:
1. Configures logging and parses command line arguments
2. Validates configuration and environment settings
3. Executes Azure Resource Graph queries for vulnerability data
4. Performs historical trend analysis using DuckDB
5. Fetches upstream container image references
6. Generates and sends HTML email reports with CSV attachments
7. Handles cleanup and error reporting
"""
# Configure logging based on environment
log_level = logging.DEBUG if os.environ.get("SCRIPT_DEBUG") else logging.INFO
log_format = '[%(asctime)s] [%(levelname)s] %(message)s'
logging.basicConfig(
level=log_level,
format=log_format,
datefmt='%Y-%m-%d %H:%M:%S'
)
logging.info("πŸš€ Starting AKS Vulnerability Report Generator v4.0")
# Parse command line arguments
parser = argparse.ArgumentParser(
description="AKS Vulnerability Report Generator",
add_help=False
)
parser.add_argument('-e', '--env', type=str, required=True,
help='Environment: QA or PROD')
parser.add_argument('--sender', type=str,
help='Email sender address')
parser.add_argument('--recipient', type=str,
help='Main recipient email address')
parser.add_argument('--cc', type=str,
help='CC email addresses (comma-separated)')
parser.add_argument('--bcc', type=str,
help='BCC email addresses (comma-separated)')
parser.add_argument('--dry-run', action='store_true',
help='Generate report without sending email')
parser.add_argument('--compare-period', type=str,
choices=['day', 'week', 'month', 'quarter', 'year', 'all'],
default='week', help='Historical comparison period')
parser.add_argument('--scan-date', type=str,
help='Override scan date (YYYY-MM-DD)')
parser.add_argument('--print-duckdb', action='store_true',
help='Print DuckDB table contents and exit')
parser.add_argument('-h', '--help', action='store_true',
help='Show detailed help message')
args = parser.parse_args()
# Handle help request
if args.help:
print_help()
sys.exit(0)
# Handle DuckDB table inspection
if args.print_duckdb:
print_duckdb_table(DUCKDB_PATH)
sys.exit(0)
# Validate and normalize environment
try:
env = validate_environment(args.env)
logging.info(f"🎯 Target environment: {env}")
except ValueError as e:
logging.error(f"❌ {e}")
sys.exit(1)
# Load and validate email configuration
config = {
"env": env,
"env_display": env,
"SENDER": get_env_or_arg(args.sender, 'AKS_VUL_REPORT_SENDER'),
"RECIPIENT": get_env_or_arg(args.recipient, 'AKS_VUL_REPORT_RECIPIENT'),
"CC": get_env_or_arg(args.cc, 'AKS_VUL_REPORT_CC'),
"BCC": get_env_or_arg(args.bcc, 'AKS_VUL_REPORT_BCC'),
"SMTP_SERVER": os.environ.get('AKS_VUL_REPORT_SMTP', 'localhost'),
"SMTP_PORT": int(os.environ.get('AKS_VUL_REPORT_SMTP_PORT', 25)),
"SMTP_USE_TLS": os.environ.get('AKS_VUL_REPORT_SMTP_USE_TLS', '0').lower() in ['1', 'true', 'yes']
}
# Validate required email configuration
if not config['SENDER'] or not config['RECIPIENT']:
logging.error("❌ Missing required email configuration")
logging.error(" Set AKS_VUL_REPORT_SENDER and AKS_VUL_REPORT_RECIPIENT environment variables")
logging.error(" Or use --sender and --recipient command line arguments")
sys.exit(1)
# Parse and validate scan date
try:
if args.scan_date:
scan_date = datetime.strptime(args.scan_date, '%Y-%m-%d').date()
logging.info(f"πŸ“… Using custom scan date: {scan_date}")
else:
scan_date = datetime.now().date()
logging.info(f"πŸ“… Using current date: {scan_date}")
except ValueError:
logging.error(f"❌ Invalid scan date format: {args.scan_date}")
logging.error(" Use YYYY-MM-DD format (e.g., 2024-06-15)")
sys.exit(1)
# Display configuration summary
logging.info(f"πŸ“Š Configuration summary:")
logging.info(f" Environment: {env}")
logging.info(f" Comparison period: {args.compare_period}")
logging.info(f" Sender: {config['SENDER']}")
logging.info(f" Recipient: {config['RECIPIENT']}")
logging.info(f" SMTP: {config['SMTP_SERVER']}:{config['SMTP_PORT']} (TLS: {config['SMTP_USE_TLS']})")
logging.info(f" Dry run: {args.dry_run}")
# Define KQL query for Azure Resource Graph
# Note: This is a comprehensive query for AKS vulnerability assessment
kql_query = f"""
// AKS Container Vulnerability Assessment Query
// This query retrieves vulnerability data from Azure Security Center assessments
SecurityResources
| where type == "microsoft.security/assessments/subassessments"
| extend assessmentKey = extract(".*assessments/(.+?)/.*", 1, id)
| extend resourceId = tolower(extract(".*assessments/.+?/(.+)", 1, id))
| where assessmentKey == "dbd0cb49-b563-45e7-9724-889e799fa648"
// Container registry images should be scanned for vulnerabilities
| extend additionalData = parse_json(tostring(properties.additionalData))
| extend repositoryName = tostring(additionalData.repositoryName)
| extend imageTags = tostring(additionalData.imageDigest)
| extend cveId = tostring(properties.id)
| extend riskLevel = tostring(properties.status.severity)
| extend publishedDate = todatetime(properties.timeGenerated)
| extend ageInDays = datetime_diff('day', publishedDate, now())
| extend subAssessmentDescription = tostring(properties.description)
| extend clusterName = extract("resourceGroups/(.+?)/providers", 1, resourceId)
| extend environmentRisk = "{env}"
| project
clusterName,
environmentRisk,
cveId,
riskLevel,
repositoryName,
imageTags,
publishedDate,
ageInDays,
subAssessmentDescription
| where isnotempty(cveId)
| order by riskLevel desc, ageInDays desc
"""
logging.info("πŸ” Executing Azure Resource Graph query...")
# Execute Azure Resource Graph query
json_filename = f"aks_vuln_results_{env.lower()}_{scan_date.strftime('%Y%m%d')}.json"
if not run_az_graph_query(kql_query, json_filename):
logging.error("❌ Failed to execute Azure Resource Graph query")
logging.error(" Please ensure Azure CLI is installed, authenticated, and has proper permissions")
sys.exit(1)
# Process vulnerability data
df = process_vulnerability_data(json_filename)
if df is None or df.empty:
logging.warning("⚠️ No vulnerability data found")
# Send a "no vulnerabilities" notification if not in dry-run mode
if not args.dry_run:
no_vuln_subject = f"βœ… AKS Vulnerability Report ({env}) - No Issues Found - {scan_date.strftime('%Y-%m-%d')}"
no_vuln_html = f"""
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<style>{get_html_styles()}</style>
</head>
<body>
<div class="container">
<h2>βœ… AKS Vulnerability Report ({env})</h2>
<div class="alert alert-info">
<h3>πŸŽ‰ Good News!</h3>
<p>No vulnerabilities were found in the {env} environment during the scan on {scan_date.strftime('%Y-%m-%d')}.</p>
<p>This could mean:</p>
<ul>
<li>All container images are up to date with security patches</li>
<li>No new vulnerabilities have been published</li>
<li>The scanning system may be experiencing issues (please verify)</li>
</ul>
</div>
<div class="footer">
<p>Report generated on {datetime.now().strftime('%Y-%m-%d %H:%M:%S UTC')}</p>
<p>Contact: <a href="mailto:{config['SENDER']}">{config['SENDER']}</a></p>
</div>
</div>
</body>
</html>
"""
# Create minimal DataFrame for the no-vulnerabilities case
empty_df = pd.DataFrame(columns=['clusterName', 'environmentRisk', 'cveId', 'riskLevel'])
empty_df['riskLevel_clean'] = []
try:
create_and_send_email(no_vuln_subject, no_vuln_html, empty_df, config, False)
logging.info("βœ… Sent 'no vulnerabilities found' notification")
except Exception as e:
logging.error(f"❌ Failed to send no-vulnerabilities notification: {e}")
logging.info("βœ… Process completed - no vulnerabilities to report")
return
logging.info(f"πŸ“‹ Processing {len(df)} vulnerability records")
# Display vulnerability summary
risk_summary = df['riskLevel_clean'].value_counts().to_dict()
for risk, count in sorted(risk_summary.items()):
logging.info(f" {risk.title()}: {count}")
# Perform DuckDB historical analysis
duckdb_summary_html = ""
if DUCKDB_AVAILABLE:
logging.info("πŸ“ˆ Performing historical trend analysis...")
try:
with duckdb.connect(DUCKDB_PATH) as conn:
# Ensure table exists and store current scan data
ensure_duckdb_table_exists(conn)
append_scan_to_duckdb(conn, df, scan_date.strftime('%Y-%m-%d'))
# Define comparison periods
periods_config = {
"day": "Day-over-Day",
"week": "Week-over-Week",
"month": "Month-over-Month",
"quarter": "Quarter-over-Quarter",
"year": "Year-over-Year"
}
# Generate period comparison data
period_diffs_data = []
if args.compare_period == 'all':
# Include all comparison periods
for period, label in periods_config.items():
period_diffs_data.append(compute_periodic_diffs(conn, label, period, scan_date))
else:
# Single period comparison
label = periods_config.get(args.compare_period, "Week-over-Week")
period_diffs_data.append(compute_periodic_diffs(conn, label, args.compare_period, scan_date))
# Generate HTML summary
duckdb_summary_html = render_multi_period_summary(period_diffs_data)
logging.info("βœ… Historical trend analysis completed")
except Exception as e:
logging.warning(f"⚠️ DuckDB historical analysis failed: {e}")
duckdb_summary_html = f"""
<div class="alert alert-warning">
⚠️ Historical trend analysis is temporarily unavailable due to a technical issue: {str(e)[:100]}
</div>
"""
else:
logging.info("ℹ️ DuckDB not available - skipping historical analysis")
duckdb_summary_html = """
<div class="alert alert-info">
ℹ️ Historical trend analysis is disabled. Install DuckDB with: <code>pip install duckdb</code>
</div>
"""
# Fetch upstream container image references
logging.info("πŸ”— Fetching upstream container image references...")
upstream_images = {}
try:
# Fetch nginx-ingress tags (UBI builds)
upstream_images['nginx_ingress'] = get_latest_tags(
"https://hub.docker.com/v2/repositories/nginx/nginx-ingress/tags?page_size=50&ordering=-last_updated",
r'.*-ubi\d*$', # Match UBI-based tags
{
'Accept': 'application/json',
'User-Agent': 'AKS-VulnerabilityReport/4.0'
}
)
# Fetch Azure CLI tags
upstream_images['azure_cli'] = get_latest_tags(
"https://mcr.microsoft.com/v2/azure-cli/tags/list",
r'^\d+\.\d+\.\d+$', # Match semantic version tags
{
'Accept': 'application/json',
'User-Agent': 'AKS-VulnerabilityReport/4.0'
}
)
logging.info("βœ… Successfully fetched upstream image references")
except Exception as e:
logging.warning(f"⚠️ Failed to fetch some upstream image references: {e}")
upstream_images['nginx_ingress'] = ["Error fetching nginx-ingress tags"]
upstream_images['azure_cli'] = ["Error fetching azure-cli tags"]
# Generate HTML email content
logging.info("🎨 Generating HTML email content...")
# Calculate statistics for email content
total_vulns = len(df)
critical_count = int((df['riskLevel_clean'] == 'critical').sum())
high_count = int((df['riskLevel_clean'] == 'high').sum())
medium_count = int((df['riskLevel_clean'] == 'medium').sum())
low_count = int((df['riskLevel_clean'] == 'low').sum())
critical_high_count = critical_count + high_count
# Create informational notes section
notes_section = f"""
<div class="summary-note">
<h3 style="margin-top:0; color:#003366;">πŸ“‹ Report Information</h3>
<ul style="margin: 8px 0; padding-left: 20px; font-size:14px;">
<li><strong>Environment:</strong> {config['env_display']} AKS clusters</li>
<li><strong>Scan Date:</strong> {scan_date.strftime('%B %d, %Y')}</li>
<li><strong>Scope:</strong> Container image vulnerabilities from Azure Security Center</li>
<li><strong>Detailed Table:</strong> Shows <strong>Critical and High</strong> severity vulnerabilities only</li>
<li><strong>Vulnerability Age:</strong> Days since CVE publication date (not detection time)</li>
<li><strong>Complete Data:</strong> See attached CSV file for all {total_vulns} vulnerabilities</li>
<li class="legend"><strong>Severity Legend:</strong>
<span style="background-color:#dc3545; color:white;">Critical</span>
<span style="background-color:#fd7e14; color:white;">High</span>
<span style="background-color:#ffc107; color:#000;">Medium</span>
<span style="background-color:#198754; color:white;">Low</span>
</li>
</ul>
</div>
"""
# Create upstream image references section
upstream_section = f"""
<h3>πŸ”— Upstream Container Image References</h3>
<p style="color:#6c757d; font-size:14px; margin-bottom:20px;">
Latest available versions for commonly used base images (reference for remediation planning):
</p>
<div class="flex-container">
<div class="flex-item">
<h4>πŸ“¦ nginx/nginx-ingress (UBI)</h4>
<p style="margin:0; font-size:12px; color:#6c757d;">
Source: <a href="https://hub.docker.com/r/nginx/nginx-ingress/tags" target="_blank">Docker Hub</a>
</p>
{render_tag_list(upstream_images['nginx_ingress'], "nginx-ingress")}
</div>
<div class="flex-item">
<h4>⚑ microsoft/azure-cli</h4>
<p style="margin:0; font-size:12px; color:#6c757d;">
Source: <a href="https://mcr.microsoft.com/en-us/product/azure-cli/tags" target="_blank">Microsoft Container Registry</a>
</p>
{render_tag_list(upstream_images['azure_cli'], "azure-cli")}
</div>
</div>
"""
# Create footer section
footer = f"""
<div class="footer">
<p><strong>πŸ›‘οΈ AKS Vulnerability Monitoring System</strong></p>
<p>Generated on {datetime.now().strftime('%Y-%m-%d at %H:%M:%S UTC')} | Environment: {config['env_display']}</p>
<p>
πŸ“§ Questions: <a href="mailto:{config['SENDER']}">{config['SENDER']}</a> |
πŸ”§ DevOps Team | πŸ›‘οΈ Security Operations
</p>
<p style="font-size:11px; color:#868e96; margin-top:15px;">
This automated report helps maintain security compliance across AKS environments.<br>
For urgent security issues, please follow your organization's incident response procedures.<br>
Report generated by AKS Vulnerability Monitor v4.0
</p>
</div>
"""
# Compose complete HTML email body
body_html = f"""
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>AKS Vulnerability Report - {config['env_display']} - {scan_date.strftime('%Y-%m-%d')}</title>
<style>{get_html_styles()}</style>
</head>
<body>
<div class="container">
<h2>πŸ›‘οΈ AKS Vulnerability Report - {config['env_display']}</h2>
{notes_section}
{render_summary_box(config['env_display'], df)}
{duckdb_summary_html}
{render_main_table(df)}
<div class="alert alert-info text-center" style="margin:25px 0;">
πŸ“Ž <strong>Complete Vulnerability Data:</strong>
See attached CSV file for all {total_vulns} vulnerabilities<br>
<small>Critical: {critical_count} | High: {high_count} | Medium: {medium_count} | Low: {low_count}</small>
</div>
{upstream_section}
{footer}
</div>
</body>
</html>
"""
# Create email subject with priority indicator
priority_indicator = "🚨" if critical_count > 0 else "⚠️" if high_count > 0 else "ℹ️"
email_subject = f"{priority_indicator} AKS Vulnerability Report ({config['env_display']}) - {scan_date.strftime('%Y-%m-%d')}"
# Send email report
logging.info("πŸ“€ Sending vulnerability report...")
try:
create_and_send_email(email_subject, body_html, df, config, args.dry_run)
if args.dry_run:
logging.info("πŸ§ͺ Dry run completed successfully")
else:
logging.info("βœ… Email report sent successfully")
except Exception as e:
logging.error(f"❌ Failed to send email report: {e}")
sys.exit(1)
# Cleanup temporary files
try:
if os.path.exists(json_filename):
os.remove(json_filename)
logging.debug(f"🧹 Cleaned up temporary file: {json_filename}")
except OSError as e:
logging.warning(f"⚠️ Could not remove temporary file {json_filename}: {e}")
# Final summary
logging.info("πŸŽ‰ AKS Vulnerability Report generation completed successfully!")
logging.info(f" πŸ“Š Total vulnerabilities processed: {total_vulns}")
logging.info(f" 🚨 Critical/High severity issues: {critical_high_count}")
logging.info(f" πŸ“… Scan date: {scan_date}")
logging.info(f" 🌐 Environment: {config['env_display']}")
if critical_high_count > 0:
logging.warning(f"⚠️ Action required: {critical_high_count} critical/high severity vulnerabilities found")
else:
logging.info("βœ… No critical or high severity vulnerabilities found")
if __name__ == "__main__":
"""
Script entry point with comprehensive error handling.
Handles:
- Keyboard interruption (Ctrl+C)
- Unexpected exceptions with optional debug tracebacks
- Proper exit codes for automation integration
"""
try:
main()
except KeyboardInterrupt:
logging.info("⚠️ Script interrupted by user (Ctrl+C)")
print("\nπŸ›‘ Operation cancelled by user")
sys.exit(130) # Standard exit code for SIGINT
except Exception as e:
logging.error(f"πŸ’₯ Unexpected error occurred: {e}")
# Show full traceback in debug mode
if os.environ.get("SCRIPT_DEBUG"):
import traceback
logging.debug("πŸ› Full traceback:")
logging.debug(traceback.format_exc())
print(f"\n❌ Error: {e}")
print("πŸ’‘ Try running with --help for usage information")
print("πŸ› Set SCRIPT_DEBUG=1 for detailed error information")
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment