Created
July 15, 2025 05:50
-
-
Save davidlu1001/78a7b6b25322503444f3db251950389d to your computer and use it in GitHub Desktop.
aks_vul_report_duckdb.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
""" | |
AKS Vulnerability Reporting Tool | |
Description: | |
This script queries Azure KQL for AKS cluster vulnerabilities, performs historical trend | |
analysis using a local DuckDB database, and sends a formatted HTML email summary | |
with a full CSV report as an attachment. | |
Setup: | |
1. Install required packages: | |
pip install -r requirements.txt | |
2. Set the following environment variables for email configuration: | |
- AKS_VUL_REPORT_SENDER: Sender's email address | |
- AKS_VUL_REPORT_RECIPIENT: Primary recipient's email address | |
- AKS_VUL_REPORT_CC: CC recipients (comma-separated) | |
- AKS_VUL_REPORT_BCC: BCC recipients (comma-separated) | |
- AKS_VUL_REPORT_SMTP: SMTP server hostname | |
- AKS_VUL_REPORT_SMTP_PORT: SMTP server port (default: 25) | |
- AKS_VUL_REPORT_SMTP_USE_TLS: Set to '1' to use STARTTLS | |
Usage Examples: | |
# Run for PROD environment and email the report | |
python3 aks_vul_report.py --env PROD | |
# Perform a dry run for the QA environment (prints email to console) | |
python3 aks_vul_report.py --env QA --dry-run | |
# Generate report with all historical periods | |
python3 aks_vul_report.py --env PROD --compare-period all | |
# See all options | |
python3 aks_vul_report.py --help | |
""" | |
import argparse | |
import json | |
import logging | |
import os | |
import smtplib | |
import ssl | |
import subprocess | |
import sys | |
import tempfile | |
from datetime import datetime, date | |
from email.message import EmailMessage | |
from typing import Dict, List, Optional, Tuple, Union | |
import pandas as pd | |
import requests | |
from dateutil.relativedelta import relativedelta | |
# --- DuckDB Setup (Conditional Import) --- | |
try: | |
import duckdb | |
DUCKDB_AVAILABLE = True | |
except ImportError: | |
duckdb = None | |
DUCKDB_AVAILABLE = False | |
# --- Constants and Configuration --- | |
# DuckDB database configuration | |
DUCKDB_PATH = os.path.join(os.path.dirname(__file__), "aks_vuln_scans.duckdb") | |
DUCKDB_TABLE = "aks_vulnerability_scans" | |
# Risk level color mapping for consistent styling | |
RISK_COLORS = { | |
'critical': '#dc3545', # Red | |
'high': '#fd7e14', # Orange | |
'medium': '#ffc107', # Yellow | |
'low': '#198754' # Green | |
} | |
# Request timeout settings | |
REQUEST_TIMEOUT = 10 # seconds | |
AZURE_CLI_TIMEOUT = 300 # 5 minutes | |
# --- Azure Resource Graph Query Functions --- | |
def run_az_graph_query(kql_query: str, output_json_path: str) -> bool: | |
""" | |
Executes an Azure Resource Graph query using the 'az' CLI tool. | |
Args: | |
kql_query: The KQL (Kusto Query Language) query string to execute | |
output_json_path: The file path to save the JSON query results | |
Returns: | |
bool: True if the query executed successfully, False otherwise | |
Raises: | |
No exceptions are raised; errors are logged and False is returned | |
""" | |
logging.info("π Executing Azure Resource Graph query...") | |
kql_file_path = None | |
try: | |
# Create temporary file for KQL query | |
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.kql', encoding='utf-8') as kql_file: | |
kql_file.write(kql_query) | |
kql_file_path = kql_file.name | |
logging.debug(f"Created temporary KQL file: {kql_file_path}") | |
# Build Azure CLI command | |
az_cmd = [ | |
"az", "graph", "query", | |
"-q", f"@{kql_file_path}", | |
"--output", "json" | |
] | |
logging.debug(f"Executing command: {' '.join(az_cmd)}") | |
# Execute Azure CLI command with timeout | |
result = subprocess.run( | |
az_cmd, | |
check=True, | |
capture_output=True, | |
text=True, | |
timeout=AZURE_CLI_TIMEOUT, | |
encoding='utf-8' | |
) | |
# Save query results to JSON file | |
with open(output_json_path, 'w', encoding='utf-8') as f: | |
f.write(result.stdout) | |
logging.info(f"β Successfully saved query results to {output_json_path}") | |
return True | |
except subprocess.CalledProcessError as e: | |
logging.error(f"β Azure CLI command failed with exit code {e.returncode}") | |
logging.error(f" Error output: {e.stderr}") | |
return False | |
except subprocess.TimeoutExpired: | |
logging.error(f"β Azure CLI command timed out after {AZURE_CLI_TIMEOUT} seconds") | |
return False | |
except FileNotFoundError: | |
logging.error("β Azure CLI (az) command not found. Please ensure Azure CLI is installed and in PATH") | |
return False | |
except Exception as e: | |
logging.error(f"β Unexpected error executing Azure query: {e}") | |
return False | |
finally: | |
# Clean up temporary KQL file | |
if kql_file_path and os.path.exists(kql_file_path): | |
try: | |
os.remove(kql_file_path) | |
logging.debug(f"Cleaned up temporary KQL file: {kql_file_path}") | |
except OSError as e: | |
logging.warning(f"β οΈ Failed to cleanup temporary file {kql_file_path}: {e}") | |
def process_vulnerability_data(json_path: str) -> Optional[pd.DataFrame]: | |
""" | |
Loads and processes vulnerability data from Azure Resource Graph JSON output. | |
Args: | |
json_path: Path to the JSON file containing query results | |
Returns: | |
pd.DataFrame: Processed DataFrame with vulnerability data, or None if no data | |
Note: | |
The function normalizes column names and ensures all required columns exist | |
with appropriate default values. | |
""" | |
logging.info("π Processing vulnerability data...") | |
try: | |
# Load JSON data | |
with open(json_path, 'r', encoding='utf-8') as f: | |
data = json.load(f) | |
# Handle different JSON response structures from Azure Resource Graph | |
if isinstance(data, dict): | |
# Standard Azure Resource Graph response format | |
records = data.get('data', []) | |
if 'data' not in data: | |
logging.warning("JSON response missing 'data' field") | |
elif isinstance(data, list): | |
# Direct list format | |
records = data | |
else: | |
logging.warning(f"Unexpected JSON structure: {type(data)}") | |
return None | |
if not records: | |
logging.warning("β οΈ No vulnerability records found in query output") | |
return None | |
# Convert to DataFrame using pandas json_normalize for nested objects | |
df = pd.json_normalize(records) | |
logging.debug(f"Raw DataFrame shape: {df.shape}") | |
logging.debug(f"Raw columns: {df.columns.tolist()}") | |
# Define required columns with their default values | |
required_columns = { | |
"clusterName": "Unknown-Cluster", | |
"environmentRisk": "Unknown-Environment", | |
"cveId": "Unknown-CVE", | |
"riskLevel": "Unknown", | |
"repositoryName": "Unknown-Repository", | |
"imageTags": "Unknown-Tag", | |
"publishedDate": None, | |
"ageInDays": 0, | |
"subAssessmentDescription": "No description available" | |
} | |
# Ensure all required columns exist, add missing ones with defaults | |
for col, default_val in required_columns.items(): | |
if col not in df.columns: | |
df[col] = default_val | |
logging.debug(f"Added missing column '{col}' with default value") | |
# Select only required columns in correct order | |
df = df[list(required_columns.keys())].copy() | |
# Data cleaning and standardization | |
# Clean and standardize risk levels for consistent processing | |
df["riskLevel_clean"] = df["riskLevel"].astype(str).str.strip().str.lower() | |
# Handle date conversion with error handling | |
if 'publishedDate' in df.columns: | |
df['publishedDate'] = pd.to_datetime(df['publishedDate'], errors='coerce') | |
# Ensure ageInDays is numeric, replace invalid values with 0 | |
df['ageInDays'] = pd.to_numeric(df['ageInDays'], errors='coerce').fillna(0).astype(int) | |
# Clean text fields | |
for col in ['clusterName', 'repositoryName', 'imageTags', 'subAssessmentDescription']: | |
if col in df.columns: | |
df[col] = df[col].astype(str).str.strip() | |
logging.info(f"β Processed {len(df)} vulnerability records") | |
logging.info(f" Risk level distribution: {df['riskLevel_clean'].value_counts().to_dict()}") | |
return df | |
except (FileNotFoundError, json.JSONDecodeError) as e: | |
logging.error(f"β Failed to load/parse JSON file {json_path}: {e}") | |
return None | |
except KeyError as e: | |
logging.error(f"β Missing expected field in JSON data: {e}") | |
return None | |
except Exception as e: | |
logging.error(f"β Unexpected error processing vulnerability data: {e}") | |
return None | |
# --- DuckDB Historical Analysis Functions --- | |
def ensure_duckdb_table_exists(conn) -> None: | |
""" | |
Creates the DuckDB table for storing vulnerability scan history if it doesn't exist. | |
Args: | |
conn: DuckDB connection object | |
Raises: | |
Exception: If table creation fails | |
""" | |
try: | |
create_table_sql = f""" | |
CREATE TABLE IF NOT EXISTS {DUCKDB_TABLE} ( | |
scan_date DATE NOT NULL, | |
clusterName VARCHAR, | |
environmentRisk VARCHAR, | |
cveId VARCHAR, | |
riskLevel VARCHAR, | |
repositoryName VARCHAR, | |
imageTags VARCHAR, | |
publishedDate TIMESTAMP, | |
ageInDays INTEGER, | |
subAssessmentDescription VARCHAR, | |
-- Create composite index for better query performance | |
PRIMARY KEY (scan_date, clusterName, cveId) | |
); | |
""" | |
conn.execute(create_table_sql) | |
logging.debug("β DuckDB table structure verified/created") | |
# Verify table exists and get row count | |
row_count = conn.execute(f"SELECT COUNT(*) FROM {DUCKDB_TABLE}").fetchone()[0] | |
logging.debug(f" Current table contains {row_count} historical records") | |
except Exception as e: | |
logging.error(f"β Failed to create/verify DuckDB table: {e}") | |
raise | |
def append_scan_to_duckdb(conn, df: pd.DataFrame, scan_date: str) -> None: | |
""" | |
Appends the latest vulnerability scan data to DuckDB, preventing duplicates. | |
Args: | |
conn: DuckDB connection object | |
df: DataFrame containing vulnerability data | |
scan_date: Date string in YYYY-MM-DD format | |
Raises: | |
Exception: If data insertion fails | |
""" | |
logging.info(f"πΎ Storing scan data for {scan_date} in DuckDB...") | |
try: | |
# Prepare data for storage | |
df_to_store = df.copy() | |
df_to_store['scan_date'] = scan_date | |
# Ensure correct column order matching table schema | |
cols_order = [ | |
'scan_date', 'clusterName', 'environmentRisk', 'cveId', 'riskLevel', | |
'repositoryName', 'imageTags', 'publishedDate', 'ageInDays', 'subAssessmentDescription' | |
] | |
df_to_store = df_to_store[cols_order] | |
# Remove any existing data for this scan date to prevent duplicates | |
delete_count = conn.execute( | |
f"DELETE FROM {DUCKDB_TABLE} WHERE scan_date = ?", | |
[scan_date] | |
).fetchone() | |
if delete_count and delete_count[0] > 0: | |
logging.info(f" Removed {delete_count[0]} existing records for {scan_date}") | |
# Insert new data | |
conn.execute(f"INSERT INTO {DUCKDB_TABLE} SELECT * FROM df_to_store") | |
# Verify insertion | |
row_count = conn.execute( | |
f"SELECT COUNT(*) FROM {DUCKDB_TABLE} WHERE scan_date = ?", | |
[scan_date] | |
).fetchone()[0] | |
logging.info(f"β Successfully stored {row_count} records for {scan_date}") | |
except Exception as e: | |
logging.error(f"β Failed to append scan data to DuckDB: {e}") | |
raise | |
def get_scan_df(conn, scan_date: str) -> pd.DataFrame: | |
""" | |
Retrieves vulnerability scan data from DuckDB for a specific date. | |
Args: | |
conn: DuckDB connection object | |
scan_date: Date string in YYYY-MM-DD format | |
Returns: | |
pd.DataFrame: Vulnerability data for the specified date, empty if not found | |
""" | |
try: | |
query = f""" | |
SELECT * FROM {DUCKDB_TABLE} | |
WHERE scan_date = ? | |
ORDER BY clusterName, riskLevel, cveId | |
""" | |
result_df = conn.execute(query, [scan_date]).df() | |
logging.debug(f"Retrieved {len(result_df)} records for {scan_date}") | |
return result_df | |
except Exception as e: | |
logging.error(f"β Failed to retrieve scan data for {scan_date}: {e}") | |
return pd.DataFrame() | |
def compute_vuln_diffs(df_this: pd.DataFrame, df_last: pd.DataFrame) -> Dict[str, int]: | |
""" | |
Computes differences between two vulnerability scans. | |
Args: | |
df_this: Current vulnerability scan DataFrame | |
df_last: Previous vulnerability scan DataFrame | |
Returns: | |
Dict containing counts of new, fixed, persistent, and total vulnerabilities | |
Note: | |
Vulnerabilities are compared using a composite key of: | |
[cveId, repositoryName, clusterName, environmentRisk] | |
""" | |
# Handle empty DataFrames | |
if df_this.empty and df_last.empty: | |
return {"new": 0, "fixed": 0, "persistent": 0, "total_this": 0, "total_last": 0} | |
# Define key columns for vulnerability comparison | |
key_cols = ['cveId', 'repositoryName', 'clusterName', 'environmentRisk'] | |
# Ensure all key columns exist in both DataFrames | |
for col in key_cols: | |
if col not in df_this.columns: | |
df_this[col] = "Unknown" | |
if col not in df_last.columns: | |
df_last[col] = "Unknown" | |
# Create sets of vulnerability signatures for comparison | |
set_this = set(tuple(row) for row in df_this[key_cols].to_numpy()) | |
set_last = set(tuple(row) for row in df_last[key_cols].to_numpy()) | |
# Calculate differences | |
new_vulns = set_this - set_last | |
fixed_vulns = set_last - set_this | |
persistent_vulns = set_this & set_last | |
return { | |
"new": len(new_vulns), | |
"fixed": len(fixed_vulns), | |
"persistent": len(persistent_vulns), | |
"total_this": len(set_this), | |
"total_last": len(set_last) | |
} | |
def get_period_scan_dates(conn, period: str, scan_date: date) -> Tuple[Optional[str], Optional[str]]: | |
""" | |
Determines current and previous scan dates based on the comparison period. | |
Args: | |
conn: DuckDB connection object | |
period: Comparison period ('day', 'week', 'month', 'quarter', 'year') | |
scan_date: Current scan date | |
Returns: | |
Tuple of (current_date_str, previous_date_str) or (None, None) if not found | |
""" | |
# Define period mappings | |
period_mapping = { | |
'day': relativedelta(days=1), | |
'week': relativedelta(weeks=1), | |
'month': relativedelta(months=1), | |
'quarter': relativedelta(months=3), | |
'year': relativedelta(years=1) | |
} | |
# Calculate previous date | |
delta = period_mapping.get(period, relativedelta(days=1)) | |
prev_date_obj = scan_date - delta | |
# Convert to string format | |
scan_date_str = scan_date.strftime('%Y-%m-%d') | |
prev_date_str = prev_date_obj.strftime('%Y-%m-%d') | |
try: | |
# Get all available scan dates from database | |
available_dates_query = f"SELECT DISTINCT scan_date FROM {DUCKDB_TABLE} ORDER BY scan_date DESC" | |
all_scan_dates = {str(r[0]) for r in conn.execute(available_dates_query).fetchall()} | |
# Check if requested dates exist | |
curr_date = scan_date_str if scan_date_str in all_scan_dates else None | |
prev_date = prev_date_str if prev_date_str in all_scan_dates else None | |
logging.debug(f"Period {period}: Current={curr_date}, Previous={prev_date}") | |
logging.debug(f"Available dates: {sorted(all_scan_dates, reverse=True)[:5]}...") | |
return curr_date, prev_date | |
except Exception as e: | |
logging.error(f"β Failed to get period scan dates: {e}") | |
return None, None | |
def compute_periodic_diffs(conn, period_label: str, period: str, scan_date: date) -> Dict: | |
""" | |
Computes vulnerability differences for a specific time period. | |
Args: | |
conn: DuckDB connection object | |
period_label: Human-readable period description | |
period: Period type ('day', 'week', etc.) | |
scan_date: Current scan date | |
Returns: | |
Dictionary containing period comparison results | |
""" | |
curr_date, prev_date = get_period_scan_dates(conn, period, scan_date) | |
result = { | |
"label": period_label, | |
"curr_date": curr_date, | |
"prev_date": prev_date, | |
"period": period, | |
"diffs": None | |
} | |
if curr_date and prev_date: | |
# Get data for both dates | |
df_this = get_scan_df(conn, curr_date) | |
df_last = get_scan_df(conn, prev_date) | |
# Compute differences | |
diffs = compute_vuln_diffs(df_this, df_last) | |
result["diffs"] = diffs | |
logging.debug(f"{period_label}: {diffs}") | |
else: | |
logging.debug(f"{period_label}: Insufficient data for comparison") | |
return result | |
# --- External API Functions --- | |
def get_latest_tags(repo_url: str, version_pattern: str, headers: Dict) -> List[str]: | |
""" | |
Fetches and filters the latest image tags from a container registry API. | |
Args: | |
repo_url: API endpoint URL for the container repository | |
version_pattern: Regex pattern to filter tags | |
headers: HTTP headers for the API request | |
Returns: | |
List of up to 5 latest matching tags, or error messages if failed | |
Note: | |
This function supports both Docker Hub and Microsoft Container Registry APIs | |
""" | |
try: | |
logging.debug(f"Fetching tags from {repo_url}") | |
# Make API request with timeout | |
response = requests.get(repo_url, headers=headers, timeout=REQUEST_TIMEOUT) | |
response.raise_for_status() | |
data = response.json() | |
# Extract tags based on different API response structures | |
tags = [] | |
if 'results' in data: | |
# Docker Hub format | |
tags = [item.get('name', '') for item in data['results'] if isinstance(item, dict)] | |
elif 'tags' in data: | |
# Generic tags array format | |
tags = data['tags'] if isinstance(data['tags'], list) else [] | |
else: | |
logging.warning(f"Unknown API response structure from {repo_url}") | |
return ["Unknown API response format"] | |
# Filter tags using regex pattern | |
import re | |
try: | |
filtered_tags = [tag for tag in tags if tag and re.match(version_pattern, tag)] | |
except re.error as e: | |
logging.warning(f"Invalid regex pattern '{version_pattern}': {e}") | |
filtered_tags = tags[:10] # Fallback to first 10 tags | |
# Sort tags using version-aware sorting if possible | |
try: | |
from packaging import version | |
filtered_tags.sort(key=version.parse, reverse=True) | |
except (ImportError, Exception): | |
# Fallback to lexical sorting | |
filtered_tags.sort(reverse=True) | |
result = filtered_tags[:5] | |
logging.debug(f"Found {len(result)} matching tags") | |
return result | |
except requests.RequestException as e: | |
error_msg = f"API request failed: {str(e)[:50]}..." | |
logging.warning(f"Failed to fetch tags from {repo_url}: {e}") | |
return [error_msg] | |
except Exception as e: | |
error_msg = f"Unexpected error: {str(e)[:50]}..." | |
logging.warning(f"Unexpected error fetching tags: {e}") | |
return [error_msg] | |
# --- HTML Email Generation Functions --- | |
def get_html_styles() -> str: | |
""" | |
Returns modern CSS styles for the HTML email template. | |
Returns: | |
String containing CSS styles optimized for email clients | |
Note: | |
Styles use inline-compatible CSS for maximum email client compatibility | |
""" | |
return """ | |
body { | |
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; | |
margin: 0; padding: 0; background-color: #f8f9fa; line-height: 1.6; | |
color: #333; | |
} | |
.container { | |
max-width: 1200px; margin: 0 auto; padding: 20px; background-color: white; | |
box-shadow: 0 0 10px rgba(0,0,0,0.1); border-radius: 8px; | |
} | |
h2 { | |
color: #003366; border-bottom: 3px solid #003366; padding-bottom: 12px; | |
margin-bottom: 20px; font-size: 24px; font-weight: 600; | |
} | |
h3 { | |
color: #005691; margin-top: 30px; margin-bottom: 15px; font-size: 18px; | |
font-weight: 600; | |
} | |
h4 { | |
color: #005691; margin: 15px 0 8px 0; font-size: 14px; font-weight: 600; | |
} | |
.summary-note { | |
font-size: 14px; margin-bottom: 20px; background-color: #e7f3ff; | |
padding: 20px; border-radius: 8px; border-left: 4px solid #007bff; | |
} | |
.summary-note ul { | |
margin: 10px 0; padding-left: 20px; | |
} | |
.summary-note li { | |
margin: 8px 0; | |
} | |
.table-container { | |
margin: 25px 0; overflow-x: auto; border-radius: 8px; | |
box-shadow: 0 2px 8px rgba(0,0,0,0.1); background-color: white; | |
} | |
table { | |
border-collapse: collapse; width: 100%; min-width: 700px; | |
background-color: white; font-size: 13px; | |
} | |
th, td { | |
border: 1px solid #dee2e6; padding: 12px 15px; text-align: left; | |
vertical-align: top; | |
} | |
th { | |
background: linear-gradient(135deg, #003366, #004488); | |
color: white; font-size: 14px; font-weight: 600; | |
text-transform: uppercase; letter-spacing: 0.5px; | |
} | |
tr:nth-child(even) { | |
background-color: #f8f9fa; | |
} | |
tr:hover { | |
background-color: #e3f2fd; | |
} | |
caption { | |
caption-side: top; font-weight: bold; font-size: 16px; | |
padding: 15px; text-align: left; color: #003366; | |
background-color: #f8f9fa; border-radius: 8px 8px 0 0; | |
} | |
.footer { | |
font-size: 12px; color: #6c757d; margin-top: 40px; | |
text-align: center; padding: 25px 20px; border-top: 2px solid #dee2e6; | |
background-color: #f8f9fa; border-radius: 8px; | |
} | |
.footer a { | |
color: #007bff; text-decoration: none; | |
} | |
.footer a:hover { | |
text-decoration: underline; | |
} | |
.legend { | |
margin-top: 15px; | |
} | |
.legend span { | |
padding: 6px 12px; border-radius: 20px; font-weight: bold; | |
margin-right: 10px; font-size: 12px; display: inline-block; | |
margin-bottom: 6px; color: white; | |
} | |
.status-badge { | |
padding: 4px 10px; border-radius: 6px; font-weight: bold; | |
font-size: 11px; text-transform: uppercase; color: white; | |
display: inline-block; | |
} | |
.critical-bg { background-color: #dc3545; } | |
.high-bg { background-color: #fd7e14; } | |
.medium-bg { background-color: #ffc107; color: #000 !important; } | |
.low-bg { background-color: #198754; } | |
.age-warning { background-color: #fff3cd; color: #856404; font-weight: bold; } | |
.age-critical { background-color: #f8d7da; color: #721c24; font-weight: bold; } | |
.text-center { text-align: center; } | |
.text-muted { color: #6c757d; } | |
.mb-3 { margin-bottom: 1rem; } | |
.alert { | |
padding: 15px; border-radius: 6px; margin: 15px 0; | |
} | |
.alert-info { | |
background-color: #d1ecf1; color: #0c5460; border-left: 4px solid #bee5eb; | |
} | |
.alert-warning { | |
background-color: #fff3cd; color: #856404; border-left: 4px solid #ffeaa7; | |
} | |
.flex-container { | |
display: flex; flex-wrap: wrap; gap: 20px; | |
} | |
.flex-item { | |
flex: 1; min-width: 300px; | |
} | |
.tag-list { | |
margin-top: 8px; padding-left: 20px; font-size: 13px; | |
} | |
.tag-list li { | |
margin: 4px 0; | |
} | |
.tag-code { | |
background: #f8f9fa; padding: 3px 8px; border-radius: 4px; | |
font-family: 'Courier New', monospace; font-size: 12px; | |
border: 1px solid #e9ecef; | |
} | |
.trend-indicator { | |
font-size: 16px; margin-right: 5px; | |
} | |
""" | |
def render_summary_box(env: str, df: pd.DataFrame) -> str: | |
""" | |
Renders an enhanced vulnerability severity summary table. | |
Args: | |
env: Environment name (QA/PROD) | |
df: DataFrame containing vulnerability data | |
Returns: | |
HTML string for the summary table | |
""" | |
# Calculate vulnerability counts by severity | |
counts = { | |
'Critical': int((df['riskLevel_clean'] == 'critical').sum()), | |
'High': int((df['riskLevel_clean'] == 'high').sum()), | |
'Medium': int((df['riskLevel_clean'] == 'medium').sum()), | |
'Low': int((df['riskLevel_clean'] == 'low').sum()), | |
} | |
total = sum(counts.values()) | |
# Generate colored cells for each severity level | |
cells = "" | |
for severity, color in RISK_COLORS.items(): | |
count = counts[severity.title()] | |
cells += f'<td class="text-center" style="background-color:{color}; color:white; font-weight:bold; font-size:14px;">{count}</td>' | |
return f""" | |
<div class="table-container"> | |
<table> | |
<caption>π‘οΈ Vulnerability Severity Summary</caption> | |
<tr> | |
<th>Environment</th><th>Critical</th><th>High</th><th>Medium</th><th>Low</th><th>Total</th> | |
</tr> | |
<tr> | |
<td class="text-center" style="font-weight:bold; font-size:14px;">{env}</td> | |
{cells} | |
<td class="text-center" style="font-weight:bold; font-size:14px; background-color:#6c757d; color:white;">{total}</td> | |
</tr> | |
</table> | |
</div> | |
""" | |
def render_multi_period_summary(period_diffs: List[Dict]) -> str: | |
""" | |
Renders historical trend analysis table with visual indicators. | |
Args: | |
period_diffs: List of period comparison dictionaries | |
Returns: | |
HTML string for the trend analysis table | |
""" | |
if not period_diffs: | |
return '<div class="alert alert-info">π No historical trend data available.</div>' | |
rows = "" | |
for pdiff in period_diffs: | |
label = pdiff.get("label", "Unknown") | |
if pdiff.get("diffs"): | |
d = pdiff["diffs"] | |
new_count = d.get('new', 0) | |
fixed_count = d.get('fixed', 0) | |
persistent_count = d.get('persistent', 0) | |
total_current = d.get('total_this', 0) | |
total_previous = d.get('total_last', 0) | |
# Add trend indicators and styling | |
if new_count > 0: | |
new_indicator = '<span class="trend-indicator">πΊ</span>' | |
new_style = 'color:#dc3545; font-weight:bold;' | |
else: | |
new_indicator = '<span class="trend-indicator">β‘οΈ</span>' | |
new_style = 'color:#6c757d;' | |
if fixed_count > 0: | |
fixed_indicator = '<span class="trend-indicator">π»</span>' | |
fixed_style = 'color:#198754; font-weight:bold;' | |
else: | |
fixed_indicator = '<span class="trend-indicator">β‘οΈ</span>' | |
fixed_style = 'color:#6c757d;' | |
# Calculate trend percentage | |
if total_previous > 0: | |
trend_pct = ((total_current - total_previous) / total_previous) * 100 | |
if trend_pct > 0: | |
trend_text = f'(+{trend_pct:.1f}%)' | |
trend_color = '#dc3545' | |
elif trend_pct < 0: | |
trend_text = f'({trend_pct:.1f}%)' | |
trend_color = '#198754' | |
else: | |
trend_text = '(0%)' | |
trend_color = '#6c757d' | |
else: | |
trend_text = '' | |
trend_color = '#6c757d' | |
rows += f""" | |
<tr> | |
<td style="font-weight:bold;">{label}</td> | |
<td class="text-center">{pdiff.get('curr_date', 'N/A')}</td> | |
<td class="text-center">{pdiff.get('prev_date', 'N/A')}</td> | |
<td class="text-center" style="{new_style}">{new_indicator}{new_count}</td> | |
<td class="text-center" style="{fixed_style}">{fixed_indicator}{fixed_count}</td> | |
<td class="text-center" style="color:#6c757d;">{persistent_count}</td> | |
<td class="text-center" style="font-weight:bold;">{total_current}</td> | |
<td class="text-center" style="color:#6c757d;">{total_previous}</td> | |
<td class="text-center" style="color:{trend_color}; font-size:12px; font-weight:bold;">{trend_text}</td> | |
</tr> | |
""" | |
else: | |
rows += f""" | |
<tr> | |
<td style="font-weight:bold;">{label}</td> | |
<td colspan="8" class="text-center text-muted" style="font-style:italic;"> | |
βΉοΈ Insufficient data for comparison | |
</td> | |
</tr> | |
""" | |
return f""" | |
<div class="table-container"> | |
<table> | |
<caption>π Vulnerability Trend Analysis</caption> | |
<tr> | |
<th>Period</th><th>Current Date</th><th>Previous Date</th> | |
<th>New</th><th>Fixed</th><th>Persistent</th> | |
<th>Total (Now)</th><th>Total (Prev)</th><th>Trend</th> | |
</tr> | |
{rows} | |
</table> | |
</div> | |
""" | |
def render_main_table(df: pd.DataFrame) -> str: | |
""" | |
Renders the main vulnerability data table with enhanced styling and filtering. | |
Args: | |
df: DataFrame containing vulnerability data | |
Returns: | |
HTML string for the detailed vulnerability table | |
Note: | |
Only displays Critical and High severity vulnerabilities in the main table | |
""" | |
if df.empty: | |
return '<div class="alert alert-info text-center">β No vulnerability data available.</div>' | |
# Filter for Critical and High severity vulnerabilities only | |
df_filtered = df[df['riskLevel_clean'].isin(['critical', 'high'])].copy() | |
if df_filtered.empty: | |
return '<div class="alert alert-info text-center">β No Critical or High severity vulnerabilities found.</div>' | |
logging.info(f"π Rendering detailed table with {len(df_filtered)} Critical/High vulnerabilities") | |
# Prepare data for display | |
df_display = df_filtered.drop(columns=["riskLevel_clean"], errors='ignore').copy() | |
# Format published dates | |
if 'publishedDate' in df_display.columns: | |
df_display['publishedDate'] = pd.to_datetime( | |
df_display['publishedDate'], errors='coerce' | |
).dt.strftime('%Y-%m-%d') | |
df_display['publishedDate'] = df_display['publishedDate'].fillna('Unknown') | |
# Truncate long descriptions for better table formatting | |
if 'subAssessmentDescription' in df_display.columns: | |
df_display['subAssessmentDescription'] = df_display['subAssessmentDescription'].astype(str).apply( | |
lambda x: (x[:120] + "...") if len(str(x)) > 120 else x | |
) | |
# Generate HTML table rows with enhanced styling | |
html_rows = "" | |
for idx, row in df_display.iterrows(): | |
risk_level = str(row.get('riskLevel', '')).lower() | |
# Apply risk-based styling | |
if risk_level == 'critical': | |
risk_badge_class = 'critical-bg' | |
elif risk_level == 'high': | |
risk_badge_class = 'high-bg' | |
elif risk_level == 'medium': | |
risk_badge_class = 'medium-bg' | |
elif risk_level == 'low': | |
risk_badge_class = 'low-bg' | |
else: | |
risk_badge_class = 'text-muted' | |
html_rows += "<tr>" | |
for col in df_display.columns: | |
cell_value = str(row[col]) if pd.notna(row[col]) else "N/A" | |
if col == 'riskLevel': | |
# Render risk level as colored badge | |
html_rows += f'<td class="text-center"><span class="status-badge {risk_badge_class}">{cell_value}</span></td>' | |
elif col == 'ageInDays': | |
# Apply age-based styling | |
age = int(row[col]) if pd.notna(row[col]) and str(row[col]).isdigit() else 0 | |
if age > 365: | |
age_class = 'age-critical' | |
elif age > 90: | |
age_class = 'age-warning' | |
else: | |
age_class = '' | |
html_rows += f'<td class="text-center {age_class}">{age}</td>' | |
elif col == 'cveId': | |
# Make CVE IDs clickable links to NIST database | |
if cell_value.startswith('CVE-'): | |
cve_url = f"https://nvd.nist.gov/vuln/detail/{cell_value}" | |
html_rows += f'<td><a href="{cve_url}" target="_blank" style="color:#007bff; text-decoration:none;">{cell_value}</a></td>' | |
else: | |
html_rows += f'<td>{cell_value}</td>' | |
else: | |
# Standard cell | |
html_rows += f'<td>{cell_value}</td>' | |
html_rows += "</tr>" | |
# Generate table headers with proper formatting | |
headers = "" | |
for col in df_display.columns: | |
# Format column names for display | |
display_name = col.replace('_', ' ').title() | |
if col == 'cveId': | |
display_name = 'CVE ID' | |
elif col == 'ageInDays': | |
display_name = 'Age (Days)' | |
elif col == 'riskLevel': | |
display_name = 'Risk Level' | |
elif col == 'publishedDate': | |
display_name = 'Published Date' | |
elif col == 'clusterName': | |
display_name = 'Cluster Name' | |
elif col == 'repositoryName': | |
display_name = 'Repository' | |
elif col == 'imageTags': | |
display_name = 'Image Tags' | |
elif col == 'subAssessmentDescription': | |
display_name = 'Description' | |
elif col == 'environmentRisk': | |
display_name = 'Environment' | |
headers += f"<th>{display_name}</th>" | |
return f""" | |
<div class="table-container"> | |
<table> | |
<caption>π¨ Critical & High Severity Vulnerabilities</caption> | |
<tr>{headers}</tr> | |
{html_rows} | |
</table> | |
</div> | |
""" | |
def render_tag_list(tags: List[str], image_name: str) -> str: | |
""" | |
Renders a formatted list of container image tags. | |
Args: | |
tags: List of image tags | |
image_name: Name of the container image | |
Returns: | |
HTML string for the tag list | |
""" | |
if not tags or any("Error" in str(tag) for tag in tags): | |
error_msg = tags[0] if tags else f"No tags found for {image_name}" | |
return f'<div class="alert alert-warning">β οΈ {error_msg}</div>' | |
# Generate list items with enhanced styling | |
items = "" | |
for tag in tags: | |
items += f'<li><span class="tag-code">{tag}</span></li>' | |
return f'<ul class="tag-list">{items}</ul>' | |
# --- Email Composition and Sending --- | |
def create_and_send_email(subject: str, body_html: str, csv_df: pd.DataFrame, config: Dict, dry_run: bool = False) -> None: | |
""" | |
Creates and sends the vulnerability report email with attachments. | |
Args: | |
subject: Email subject line | |
body_html: HTML email body content | |
csv_df: DataFrame to attach as CSV | |
config: Email configuration dictionary | |
dry_run: If True, only log email details without sending | |
Raises: | |
SystemExit: If email sending fails in production mode | |
""" | |
# Create email message object | |
msg = EmailMessage() | |
msg['Subject'] = subject | |
msg['From'] = config['SENDER'] | |
msg['To'] = config['RECIPIENT'] | |
# Add CC and BCC recipients if specified | |
if config.get('CC'): | |
msg['Cc'] = config['CC'] | |
if config.get('BCC'): | |
msg['Bcc'] = config['BCC'] | |
# Calculate summary statistics for plain text version | |
total_vulns = len(csv_df) | |
critical_count = int((csv_df['riskLevel_clean'] == 'critical').sum()) if 'riskLevel_clean' in csv_df.columns else 0 | |
high_count = int((csv_df['riskLevel_clean'] == 'high').sum()) if 'riskLevel_clean' in csv_df.columns else 0 | |
medium_count = int((csv_df['riskLevel_clean'] == 'medium').sum()) if 'riskLevel_clean' in csv_df.columns else 0 | |
low_count = int((csv_df['riskLevel_clean'] == 'low').sum()) if 'riskLevel_clean' in csv_df.columns else 0 | |
# Create plain text fallback for email clients that don't support HTML | |
plain_text = f""" | |
AKS Vulnerability Report - {config['env_display']} Environment | |
=========================================================== | |
This email contains a vulnerability assessment report for AKS clusters. | |
Please use an HTML-compatible email client to view the formatted report. | |
SUMMARY: | |
-------- | |
Total Vulnerabilities: {total_vulns} | |
- Critical: {critical_count} | |
- High: {high_count} | |
- Medium: {medium_count} | |
- Low: {low_count} | |
The complete vulnerability data is available in the attached CSV file. | |
Report Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S UTC')} | |
Environment: {config['env_display']} | |
For questions or issues, please contact: {config['SENDER']} | |
""" | |
# Set email content | |
msg.set_content(plain_text) | |
msg.add_alternative(body_html, subtype='html') | |
# Create and attach CSV file | |
csv_filename = f"aks_vuln_results_{config['env'].lower()}_{datetime.now().strftime('%Y%m%d_%H%M')}.csv" | |
try: | |
# Generate CSV data with proper encoding | |
csv_data = csv_df.to_csv(index=False, encoding='utf-8') | |
msg.add_attachment( | |
csv_data.encode('utf-8'), | |
maintype='text', | |
subtype='csv', | |
filename=csv_filename | |
) | |
logging.info(f"π Attached CSV report: {csv_filename} ({len(csv_df)} records)") | |
except Exception as e: | |
logging.error(f"β Failed to create CSV attachment: {e}") | |
# Log email preparation details | |
logging.info(f"π§ Preparing AKS Vulnerability Report for {config['RECIPIENT']}") | |
logging.info(f" Environment: {config['env_display']}") | |
logging.info(f" Vulnerabilities: {total_vulns} total ({critical_count} critical, {high_count} high)") | |
# Handle dry run mode | |
if dry_run: | |
logging.info("π§ͺ DRY RUN MODE: Email will not be sent") | |
logging.info(f" π§ Subject: {msg['Subject']}") | |
logging.info(f" π¬ To: {msg['To']}") | |
if config.get('CC'): | |
logging.info(f" π CC: {config['CC']}") | |
if config.get('BCC'): | |
logging.info(f" π BCC: {config['BCC']}") | |
logging.info(f" π Attachment: {csv_filename}") | |
logging.info(f" π SMTP Server: {config['SMTP_SERVER']}:{config['SMTP_PORT']}") | |
logging.info(f" π TLS Enabled: {config.get('SMTP_USE_TLS', False)}") | |
# Show HTML preview in debug mode | |
if os.environ.get("SCRIPT_DEBUG"): | |
logging.debug("π HTML Body Preview:") | |
logging.debug(body_html[:1000] + "..." if len(body_html) > 1000 else body_html) | |
return | |
# Send email in production mode | |
try: | |
# Create SSL context for secure connections | |
context = ssl.create_default_context() | |
# Connect to SMTP server and send email | |
with smtplib.SMTP(config['SMTP_SERVER'], config['SMTP_PORT'], timeout=30) as server: | |
# Enable debug output if in debug mode | |
if os.environ.get("SCRIPT_DEBUG"): | |
server.set_debuglevel(1) | |
# Use STARTTLS if configured | |
if config.get('SMTP_USE_TLS'): | |
server.starttls(context=context) | |
logging.debug("π STARTTLS encryption enabled") | |
# Send the message | |
server.send_message(msg) | |
logging.info("β Email sent successfully") | |
except smtplib.SMTPException as e: | |
logging.error(f"β SMTP error occurred: {e}") | |
sys.exit(1) | |
except Exception as e: | |
logging.error(f"β Failed to send email: {e}") | |
sys.exit(1) | |
# --- Utility Functions --- | |
def validate_environment(env: str) -> str: | |
""" | |
Validates and normalizes the environment parameter. | |
Args: | |
env: Environment string (case-insensitive) | |
Returns: | |
Normalized environment string (uppercase) | |
Raises: | |
ValueError: If environment is not valid | |
""" | |
env_upper = env.upper().strip() | |
valid_environments = ['QA', 'PROD', 'PRODUCTION'] | |
# Handle common variations | |
if env_upper in ['PRODUCTION']: | |
return 'PROD' | |
elif env_upper not in ['QA', 'PROD']: | |
raise ValueError(f"Invalid environment '{env}'. Must be 'QA' or 'PROD'") | |
return env_upper | |
def get_env_or_arg(arg_val: Optional[str], env_var: str, default: str = "") -> str: | |
""" | |
Gets configuration value from CLI argument or environment variable. | |
Args: | |
arg_val: Command line argument value | |
env_var: Environment variable name | |
default: Default value if neither is provided | |
Returns: | |
The configuration value | |
""" | |
if arg_val is not None: | |
return arg_val.strip() | |
return os.environ.get(env_var, default).strip() | |
def print_duckdb_table(db_path: str) -> None: | |
""" | |
Prints the contents of the DuckDB vulnerability history table for debugging. | |
Args: | |
db_path: Path to the DuckDB database file | |
""" | |
if not DUCKDB_AVAILABLE: | |
print("β DuckDB is not installed. Cannot display table contents.") | |
print(" Install with: pip install duckdb") | |
return | |
try: | |
with duckdb.connect(db_path) as conn: | |
# Get table schema | |
schema_query = f"DESCRIBE {DUCKDB_TABLE}" | |
try: | |
schema_df = conn.execute(schema_query).df() | |
print("π Table Schema:") | |
print(schema_df.to_string(index=False)) | |
print() | |
except: | |
print("β οΈ Table does not exist yet") | |
return | |
# Get table contents | |
content_query = f""" | |
SELECT scan_date, COUNT(*) as vulnerability_count, | |
COUNT(CASE WHEN riskLevel = 'Critical' THEN 1 END) as critical, | |
COUNT(CASE WHEN riskLevel = 'High' THEN 1 END) as high, | |
COUNT(CASE WHEN riskLevel = 'Medium' THEN 1 END) as medium, | |
COUNT(CASE WHEN riskLevel = 'Low' THEN 1 END) as low | |
FROM {DUCKDB_TABLE} | |
GROUP BY scan_date | |
ORDER BY scan_date DESC | |
LIMIT 20 | |
""" | |
summary_df = conn.execute(content_query).df() | |
if summary_df.empty: | |
print("π DuckDB table exists but contains no data.") | |
else: | |
print(f"π DuckDB Historical Summary (Last 20 scan dates):") | |
print("=" * 80) | |
print(summary_df.to_string(index=False)) | |
# Show total records | |
total_records = conn.execute(f"SELECT COUNT(*) FROM {DUCKDB_TABLE}").fetchone()[0] | |
print(f"\nTotal historical records: {total_records}") | |
# Show date range | |
date_range = conn.execute(f"SELECT MIN(scan_date), MAX(scan_date) FROM {DUCKDB_TABLE}").fetchone() | |
print(f"Date range: {date_range[0]} to {date_range[1]}") | |
except Exception as e: | |
print(f"β Error reading DuckDB table: {e}") | |
# --- Help and Documentation --- | |
def print_help() -> None: | |
""" | |
Prints comprehensive help information for the script. | |
""" | |
help_text = """ | |
π‘οΈ AKS Vulnerability Report Generator | |
==================================== | |
DESCRIPTION: | |
Automated tool for generating and emailing AKS vulnerability reports. | |
Queries Azure Resource Graph for vulnerability data, performs historical | |
trend analysis using DuckDB, and sends formatted HTML emails with CSV attachments. | |
USAGE: | |
python3 aks_vul_report.py --env <ENV> [options] | |
REQUIRED ARGUMENTS: | |
-e, --env Environment: QA or PROD (case-insensitive) | |
OPTIONAL ARGUMENTS: | |
--sender Email sender address (overrides AKS_VUL_REPORT_SENDER) | |
--recipient Main recipient email (overrides AKS_VUL_REPORT_RECIPIENT) | |
--cc CC recipients, comma-separated (overrides AKS_VUL_REPORT_CC) | |
--bcc BCC recipients, comma-separated (overrides AKS_VUL_REPORT_BCC) | |
--dry-run Generate report without sending email (testing mode) | |
--compare-period Historical comparison period: | |
β’ day - Day-over-day comparison | |
β’ week - Week-over-week comparison (default) | |
β’ month - Month-over-month comparison | |
β’ quarter - Quarter-over-quarter comparison | |
β’ year - Year-over-year comparison | |
β’ all - All periods in one report | |
--scan-date Override scan date for testing (format: YYYY-MM-DD) | |
--print-duckdb Display DuckDB table contents and exit | |
-h, --help Show this help message and exit | |
ENVIRONMENT VARIABLES: | |
AKS_VUL_REPORT_SENDER π§ Sender email address | |
AKS_VUL_REPORT_RECIPIENT π¬ Primary recipient email address | |
AKS_VUL_REPORT_CC π CC recipients (comma-separated) | |
AKS_VUL_REPORT_BCC π BCC recipients (comma-separated) | |
AKS_VUL_REPORT_SMTP π SMTP server hostname (default: localhost) | |
AKS_VUL_REPORT_SMTP_PORT π SMTP server port (default: 25) | |
AKS_VUL_REPORT_SMTP_USE_TLS π Enable STARTTLS (1/true to enable) | |
SCRIPT_DEBUG π Enable debug logging (any value) | |
EXAMPLES: | |
# Basic production report with weekly trends | |
python3 aks_vul_report.py --env PROD | |
# QA environment with monthly comparison, dry run mode | |
python3 aks_vul_report.py --env QA --compare-period month --dry-run | |
# Comprehensive report with all trend periods | |
python3 aks_vul_report.py --env PROD --compare-period all | |
# Test with specific historical date | |
python3 aks_vul_report.py --env QA --scan-date 2024-06-15 --dry-run | |
# Override email configuration | |
python3 aks_vul_report.py --env PROD \\ | |
--sender [email protected] \\ | |
--recipient [email protected] \\ | |
--cc [email protected] | |
# Debug mode with verbose logging | |
SCRIPT_DEBUG=1 python3 aks_vul_report.py --env QA --dry-run | |
# Inspect historical vulnerability data | |
python3 aks_vul_report.py --env QA --print-duckdb | |
REQUIREMENTS: | |
β’ Azure CLI (az) installed and authenticated | |
β’ Python 3.8+ with required packages: | |
- pandas>=1.3.0 | |
- duckdb>=0.8.0 (optional, for historical analysis) | |
- requests>=2.25.0 | |
- python-dateutil>=2.8.0 | |
FEATURES: | |
β Azure Resource Graph integration | |
β Historical trend analysis with DuckDB | |
β Modern HTML email templates | |
β CSV data exports | |
β Upstream image reference tracking | |
β Comprehensive error handling | |
β Dry-run testing mode | |
β Flexible scheduling support | |
NOTES: | |
β’ The 'ageInDays' field represents days since CVE publication, not detection time | |
β’ Critical and High severity vulnerabilities are highlighted in email reports | |
β’ All vulnerabilities (including Medium/Low) are included in CSV attachments | |
β’ DuckDB is optional; script runs without historical analysis if not installed | |
β’ Historical data accumulates over time for better trend analysis | |
SECURITY: | |
β’ All external API calls use proper timeout handling | |
β’ Email content is properly encoded to prevent injection attacks | |
β’ Sensitive configuration is read from environment variables | |
β’ Debug mode excludes sensitive data from logs | |
For technical support or feature requests, contact the DevOps team. | |
""" | |
print(help_text) | |
# --- Main Application Logic --- | |
def main() -> None: | |
""" | |
Main application entry point that orchestrates the vulnerability report generation. | |
This function: | |
1. Configures logging and parses command line arguments | |
2. Validates configuration and environment settings | |
3. Executes Azure Resource Graph queries for vulnerability data | |
4. Performs historical trend analysis using DuckDB | |
5. Fetches upstream container image references | |
6. Generates and sends HTML email reports with CSV attachments | |
7. Handles cleanup and error reporting | |
""" | |
# Configure logging based on environment | |
log_level = logging.DEBUG if os.environ.get("SCRIPT_DEBUG") else logging.INFO | |
log_format = '[%(asctime)s] [%(levelname)s] %(message)s' | |
logging.basicConfig( | |
level=log_level, | |
format=log_format, | |
datefmt='%Y-%m-%d %H:%M:%S' | |
) | |
logging.info("π Starting AKS Vulnerability Report Generator v4.0") | |
# Parse command line arguments | |
parser = argparse.ArgumentParser( | |
description="AKS Vulnerability Report Generator", | |
add_help=False | |
) | |
parser.add_argument('-e', '--env', type=str, required=True, | |
help='Environment: QA or PROD') | |
parser.add_argument('--sender', type=str, | |
help='Email sender address') | |
parser.add_argument('--recipient', type=str, | |
help='Main recipient email address') | |
parser.add_argument('--cc', type=str, | |
help='CC email addresses (comma-separated)') | |
parser.add_argument('--bcc', type=str, | |
help='BCC email addresses (comma-separated)') | |
parser.add_argument('--dry-run', action='store_true', | |
help='Generate report without sending email') | |
parser.add_argument('--compare-period', type=str, | |
choices=['day', 'week', 'month', 'quarter', 'year', 'all'], | |
default='week', help='Historical comparison period') | |
parser.add_argument('--scan-date', type=str, | |
help='Override scan date (YYYY-MM-DD)') | |
parser.add_argument('--print-duckdb', action='store_true', | |
help='Print DuckDB table contents and exit') | |
parser.add_argument('-h', '--help', action='store_true', | |
help='Show detailed help message') | |
args = parser.parse_args() | |
# Handle help request | |
if args.help: | |
print_help() | |
sys.exit(0) | |
# Handle DuckDB table inspection | |
if args.print_duckdb: | |
print_duckdb_table(DUCKDB_PATH) | |
sys.exit(0) | |
# Validate and normalize environment | |
try: | |
env = validate_environment(args.env) | |
logging.info(f"π― Target environment: {env}") | |
except ValueError as e: | |
logging.error(f"β {e}") | |
sys.exit(1) | |
# Load and validate email configuration | |
config = { | |
"env": env, | |
"env_display": env, | |
"SENDER": get_env_or_arg(args.sender, 'AKS_VUL_REPORT_SENDER'), | |
"RECIPIENT": get_env_or_arg(args.recipient, 'AKS_VUL_REPORT_RECIPIENT'), | |
"CC": get_env_or_arg(args.cc, 'AKS_VUL_REPORT_CC'), | |
"BCC": get_env_or_arg(args.bcc, 'AKS_VUL_REPORT_BCC'), | |
"SMTP_SERVER": os.environ.get('AKS_VUL_REPORT_SMTP', 'localhost'), | |
"SMTP_PORT": int(os.environ.get('AKS_VUL_REPORT_SMTP_PORT', 25)), | |
"SMTP_USE_TLS": os.environ.get('AKS_VUL_REPORT_SMTP_USE_TLS', '0').lower() in ['1', 'true', 'yes'] | |
} | |
# Validate required email configuration | |
if not config['SENDER'] or not config['RECIPIENT']: | |
logging.error("β Missing required email configuration") | |
logging.error(" Set AKS_VUL_REPORT_SENDER and AKS_VUL_REPORT_RECIPIENT environment variables") | |
logging.error(" Or use --sender and --recipient command line arguments") | |
sys.exit(1) | |
# Parse and validate scan date | |
try: | |
if args.scan_date: | |
scan_date = datetime.strptime(args.scan_date, '%Y-%m-%d').date() | |
logging.info(f"π Using custom scan date: {scan_date}") | |
else: | |
scan_date = datetime.now().date() | |
logging.info(f"π Using current date: {scan_date}") | |
except ValueError: | |
logging.error(f"β Invalid scan date format: {args.scan_date}") | |
logging.error(" Use YYYY-MM-DD format (e.g., 2024-06-15)") | |
sys.exit(1) | |
# Display configuration summary | |
logging.info(f"π Configuration summary:") | |
logging.info(f" Environment: {env}") | |
logging.info(f" Comparison period: {args.compare_period}") | |
logging.info(f" Sender: {config['SENDER']}") | |
logging.info(f" Recipient: {config['RECIPIENT']}") | |
logging.info(f" SMTP: {config['SMTP_SERVER']}:{config['SMTP_PORT']} (TLS: {config['SMTP_USE_TLS']})") | |
logging.info(f" Dry run: {args.dry_run}") | |
# Define KQL query for Azure Resource Graph | |
# Note: This is a comprehensive query for AKS vulnerability assessment | |
kql_query = f""" | |
// AKS Container Vulnerability Assessment Query | |
// This query retrieves vulnerability data from Azure Security Center assessments | |
SecurityResources | |
| where type == "microsoft.security/assessments/subassessments" | |
| extend assessmentKey = extract(".*assessments/(.+?)/.*", 1, id) | |
| extend resourceId = tolower(extract(".*assessments/.+?/(.+)", 1, id)) | |
| where assessmentKey == "dbd0cb49-b563-45e7-9724-889e799fa648" | |
// Container registry images should be scanned for vulnerabilities | |
| extend additionalData = parse_json(tostring(properties.additionalData)) | |
| extend repositoryName = tostring(additionalData.repositoryName) | |
| extend imageTags = tostring(additionalData.imageDigest) | |
| extend cveId = tostring(properties.id) | |
| extend riskLevel = tostring(properties.status.severity) | |
| extend publishedDate = todatetime(properties.timeGenerated) | |
| extend ageInDays = datetime_diff('day', publishedDate, now()) | |
| extend subAssessmentDescription = tostring(properties.description) | |
| extend clusterName = extract("resourceGroups/(.+?)/providers", 1, resourceId) | |
| extend environmentRisk = "{env}" | |
| project | |
clusterName, | |
environmentRisk, | |
cveId, | |
riskLevel, | |
repositoryName, | |
imageTags, | |
publishedDate, | |
ageInDays, | |
subAssessmentDescription | |
| where isnotempty(cveId) | |
| order by riskLevel desc, ageInDays desc | |
""" | |
logging.info("π Executing Azure Resource Graph query...") | |
# Execute Azure Resource Graph query | |
json_filename = f"aks_vuln_results_{env.lower()}_{scan_date.strftime('%Y%m%d')}.json" | |
if not run_az_graph_query(kql_query, json_filename): | |
logging.error("β Failed to execute Azure Resource Graph query") | |
logging.error(" Please ensure Azure CLI is installed, authenticated, and has proper permissions") | |
sys.exit(1) | |
# Process vulnerability data | |
df = process_vulnerability_data(json_filename) | |
if df is None or df.empty: | |
logging.warning("β οΈ No vulnerability data found") | |
# Send a "no vulnerabilities" notification if not in dry-run mode | |
if not args.dry_run: | |
no_vuln_subject = f"β AKS Vulnerability Report ({env}) - No Issues Found - {scan_date.strftime('%Y-%m-%d')}" | |
no_vuln_html = f""" | |
<!DOCTYPE html> | |
<html lang="en"> | |
<head> | |
<meta charset="UTF-8"> | |
<style>{get_html_styles()}</style> | |
</head> | |
<body> | |
<div class="container"> | |
<h2>β AKS Vulnerability Report ({env})</h2> | |
<div class="alert alert-info"> | |
<h3>π Good News!</h3> | |
<p>No vulnerabilities were found in the {env} environment during the scan on {scan_date.strftime('%Y-%m-%d')}.</p> | |
<p>This could mean:</p> | |
<ul> | |
<li>All container images are up to date with security patches</li> | |
<li>No new vulnerabilities have been published</li> | |
<li>The scanning system may be experiencing issues (please verify)</li> | |
</ul> | |
</div> | |
<div class="footer"> | |
<p>Report generated on {datetime.now().strftime('%Y-%m-%d %H:%M:%S UTC')}</p> | |
<p>Contact: <a href="mailto:{config['SENDER']}">{config['SENDER']}</a></p> | |
</div> | |
</div> | |
</body> | |
</html> | |
""" | |
# Create minimal DataFrame for the no-vulnerabilities case | |
empty_df = pd.DataFrame(columns=['clusterName', 'environmentRisk', 'cveId', 'riskLevel']) | |
empty_df['riskLevel_clean'] = [] | |
try: | |
create_and_send_email(no_vuln_subject, no_vuln_html, empty_df, config, False) | |
logging.info("β Sent 'no vulnerabilities found' notification") | |
except Exception as e: | |
logging.error(f"β Failed to send no-vulnerabilities notification: {e}") | |
logging.info("β Process completed - no vulnerabilities to report") | |
return | |
logging.info(f"π Processing {len(df)} vulnerability records") | |
# Display vulnerability summary | |
risk_summary = df['riskLevel_clean'].value_counts().to_dict() | |
for risk, count in sorted(risk_summary.items()): | |
logging.info(f" {risk.title()}: {count}") | |
# Perform DuckDB historical analysis | |
duckdb_summary_html = "" | |
if DUCKDB_AVAILABLE: | |
logging.info("π Performing historical trend analysis...") | |
try: | |
with duckdb.connect(DUCKDB_PATH) as conn: | |
# Ensure table exists and store current scan data | |
ensure_duckdb_table_exists(conn) | |
append_scan_to_duckdb(conn, df, scan_date.strftime('%Y-%m-%d')) | |
# Define comparison periods | |
periods_config = { | |
"day": "Day-over-Day", | |
"week": "Week-over-Week", | |
"month": "Month-over-Month", | |
"quarter": "Quarter-over-Quarter", | |
"year": "Year-over-Year" | |
} | |
# Generate period comparison data | |
period_diffs_data = [] | |
if args.compare_period == 'all': | |
# Include all comparison periods | |
for period, label in periods_config.items(): | |
period_diffs_data.append(compute_periodic_diffs(conn, label, period, scan_date)) | |
else: | |
# Single period comparison | |
label = periods_config.get(args.compare_period, "Week-over-Week") | |
period_diffs_data.append(compute_periodic_diffs(conn, label, args.compare_period, scan_date)) | |
# Generate HTML summary | |
duckdb_summary_html = render_multi_period_summary(period_diffs_data) | |
logging.info("β Historical trend analysis completed") | |
except Exception as e: | |
logging.warning(f"β οΈ DuckDB historical analysis failed: {e}") | |
duckdb_summary_html = f""" | |
<div class="alert alert-warning"> | |
β οΈ Historical trend analysis is temporarily unavailable due to a technical issue: {str(e)[:100]} | |
</div> | |
""" | |
else: | |
logging.info("βΉοΈ DuckDB not available - skipping historical analysis") | |
duckdb_summary_html = """ | |
<div class="alert alert-info"> | |
βΉοΈ Historical trend analysis is disabled. Install DuckDB with: <code>pip install duckdb</code> | |
</div> | |
""" | |
# Fetch upstream container image references | |
logging.info("π Fetching upstream container image references...") | |
upstream_images = {} | |
try: | |
# Fetch nginx-ingress tags (UBI builds) | |
upstream_images['nginx_ingress'] = get_latest_tags( | |
"https://hub.docker.com/v2/repositories/nginx/nginx-ingress/tags?page_size=50&ordering=-last_updated", | |
r'.*-ubi\d*$', # Match UBI-based tags | |
{ | |
'Accept': 'application/json', | |
'User-Agent': 'AKS-VulnerabilityReport/4.0' | |
} | |
) | |
# Fetch Azure CLI tags | |
upstream_images['azure_cli'] = get_latest_tags( | |
"https://mcr.microsoft.com/v2/azure-cli/tags/list", | |
r'^\d+\.\d+\.\d+$', # Match semantic version tags | |
{ | |
'Accept': 'application/json', | |
'User-Agent': 'AKS-VulnerabilityReport/4.0' | |
} | |
) | |
logging.info("β Successfully fetched upstream image references") | |
except Exception as e: | |
logging.warning(f"β οΈ Failed to fetch some upstream image references: {e}") | |
upstream_images['nginx_ingress'] = ["Error fetching nginx-ingress tags"] | |
upstream_images['azure_cli'] = ["Error fetching azure-cli tags"] | |
# Generate HTML email content | |
logging.info("π¨ Generating HTML email content...") | |
# Calculate statistics for email content | |
total_vulns = len(df) | |
critical_count = int((df['riskLevel_clean'] == 'critical').sum()) | |
high_count = int((df['riskLevel_clean'] == 'high').sum()) | |
medium_count = int((df['riskLevel_clean'] == 'medium').sum()) | |
low_count = int((df['riskLevel_clean'] == 'low').sum()) | |
critical_high_count = critical_count + high_count | |
# Create informational notes section | |
notes_section = f""" | |
<div class="summary-note"> | |
<h3 style="margin-top:0; color:#003366;">π Report Information</h3> | |
<ul style="margin: 8px 0; padding-left: 20px; font-size:14px;"> | |
<li><strong>Environment:</strong> {config['env_display']} AKS clusters</li> | |
<li><strong>Scan Date:</strong> {scan_date.strftime('%B %d, %Y')}</li> | |
<li><strong>Scope:</strong> Container image vulnerabilities from Azure Security Center</li> | |
<li><strong>Detailed Table:</strong> Shows <strong>Critical and High</strong> severity vulnerabilities only</li> | |
<li><strong>Vulnerability Age:</strong> Days since CVE publication date (not detection time)</li> | |
<li><strong>Complete Data:</strong> See attached CSV file for all {total_vulns} vulnerabilities</li> | |
<li class="legend"><strong>Severity Legend:</strong> | |
<span style="background-color:#dc3545; color:white;">Critical</span> | |
<span style="background-color:#fd7e14; color:white;">High</span> | |
<span style="background-color:#ffc107; color:#000;">Medium</span> | |
<span style="background-color:#198754; color:white;">Low</span> | |
</li> | |
</ul> | |
</div> | |
""" | |
# Create upstream image references section | |
upstream_section = f""" | |
<h3>π Upstream Container Image References</h3> | |
<p style="color:#6c757d; font-size:14px; margin-bottom:20px;"> | |
Latest available versions for commonly used base images (reference for remediation planning): | |
</p> | |
<div class="flex-container"> | |
<div class="flex-item"> | |
<h4>π¦ nginx/nginx-ingress (UBI)</h4> | |
<p style="margin:0; font-size:12px; color:#6c757d;"> | |
Source: <a href="https://hub.docker.com/r/nginx/nginx-ingress/tags" target="_blank">Docker Hub</a> | |
</p> | |
{render_tag_list(upstream_images['nginx_ingress'], "nginx-ingress")} | |
</div> | |
<div class="flex-item"> | |
<h4>β‘ microsoft/azure-cli</h4> | |
<p style="margin:0; font-size:12px; color:#6c757d;"> | |
Source: <a href="https://mcr.microsoft.com/en-us/product/azure-cli/tags" target="_blank">Microsoft Container Registry</a> | |
</p> | |
{render_tag_list(upstream_images['azure_cli'], "azure-cli")} | |
</div> | |
</div> | |
""" | |
# Create footer section | |
footer = f""" | |
<div class="footer"> | |
<p><strong>π‘οΈ AKS Vulnerability Monitoring System</strong></p> | |
<p>Generated on {datetime.now().strftime('%Y-%m-%d at %H:%M:%S UTC')} | Environment: {config['env_display']}</p> | |
<p> | |
π§ Questions: <a href="mailto:{config['SENDER']}">{config['SENDER']}</a> | | |
π§ DevOps Team | π‘οΈ Security Operations | |
</p> | |
<p style="font-size:11px; color:#868e96; margin-top:15px;"> | |
This automated report helps maintain security compliance across AKS environments.<br> | |
For urgent security issues, please follow your organization's incident response procedures.<br> | |
Report generated by AKS Vulnerability Monitor v4.0 | |
</p> | |
</div> | |
""" | |
# Compose complete HTML email body | |
body_html = f""" | |
<!DOCTYPE html> | |
<html lang="en"> | |
<head> | |
<meta charset="UTF-8"> | |
<meta name="viewport" content="width=device-width, initial-scale=1.0"> | |
<title>AKS Vulnerability Report - {config['env_display']} - {scan_date.strftime('%Y-%m-%d')}</title> | |
<style>{get_html_styles()}</style> | |
</head> | |
<body> | |
<div class="container"> | |
<h2>π‘οΈ AKS Vulnerability Report - {config['env_display']}</h2> | |
{notes_section} | |
{render_summary_box(config['env_display'], df)} | |
{duckdb_summary_html} | |
{render_main_table(df)} | |
<div class="alert alert-info text-center" style="margin:25px 0;"> | |
π <strong>Complete Vulnerability Data:</strong> | |
See attached CSV file for all {total_vulns} vulnerabilities<br> | |
<small>Critical: {critical_count} | High: {high_count} | Medium: {medium_count} | Low: {low_count}</small> | |
</div> | |
{upstream_section} | |
{footer} | |
</div> | |
</body> | |
</html> | |
""" | |
# Create email subject with priority indicator | |
priority_indicator = "π¨" if critical_count > 0 else "β οΈ" if high_count > 0 else "βΉοΈ" | |
email_subject = f"{priority_indicator} AKS Vulnerability Report ({config['env_display']}) - {scan_date.strftime('%Y-%m-%d')}" | |
# Send email report | |
logging.info("π€ Sending vulnerability report...") | |
try: | |
create_and_send_email(email_subject, body_html, df, config, args.dry_run) | |
if args.dry_run: | |
logging.info("π§ͺ Dry run completed successfully") | |
else: | |
logging.info("β Email report sent successfully") | |
except Exception as e: | |
logging.error(f"β Failed to send email report: {e}") | |
sys.exit(1) | |
# Cleanup temporary files | |
try: | |
if os.path.exists(json_filename): | |
os.remove(json_filename) | |
logging.debug(f"π§Ή Cleaned up temporary file: {json_filename}") | |
except OSError as e: | |
logging.warning(f"β οΈ Could not remove temporary file {json_filename}: {e}") | |
# Final summary | |
logging.info("π AKS Vulnerability Report generation completed successfully!") | |
logging.info(f" π Total vulnerabilities processed: {total_vulns}") | |
logging.info(f" π¨ Critical/High severity issues: {critical_high_count}") | |
logging.info(f" π Scan date: {scan_date}") | |
logging.info(f" π Environment: {config['env_display']}") | |
if critical_high_count > 0: | |
logging.warning(f"β οΈ Action required: {critical_high_count} critical/high severity vulnerabilities found") | |
else: | |
logging.info("β No critical or high severity vulnerabilities found") | |
if __name__ == "__main__": | |
""" | |
Script entry point with comprehensive error handling. | |
Handles: | |
- Keyboard interruption (Ctrl+C) | |
- Unexpected exceptions with optional debug tracebacks | |
- Proper exit codes for automation integration | |
""" | |
try: | |
main() | |
except KeyboardInterrupt: | |
logging.info("β οΈ Script interrupted by user (Ctrl+C)") | |
print("\nπ Operation cancelled by user") | |
sys.exit(130) # Standard exit code for SIGINT | |
except Exception as e: | |
logging.error(f"π₯ Unexpected error occurred: {e}") | |
# Show full traceback in debug mode | |
if os.environ.get("SCRIPT_DEBUG"): | |
import traceback | |
logging.debug("π Full traceback:") | |
logging.debug(traceback.format_exc()) | |
print(f"\nβ Error: {e}") | |
print("π‘ Try running with --help for usage information") | |
print("π Set SCRIPT_DEBUG=1 for detailed error information") | |
sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment