Skip to content

Instantly share code, notes, and snippets.

@davidlu1001
Last active September 8, 2025 22:32
Show Gist options
  • Select an option

  • Save davidlu1001/432c6708c21a6e46a8fba9437429ac60 to your computer and use it in GitHub Desktop.

Select an option

Save davidlu1001/432c6708c21a6e46a8fba9437429ac60 to your computer and use it in GitHub Desktop.
k8s_yaml_analyzer.py
import sys
import yaml
import os
import argparse
import json
from typing import Any, Dict, List, Optional, Union
from collections import defaultdict
def find_value_by_path(data: Union[Dict, List], path_parts: List[str]) -> Optional[Any]:
"""
Recursively finds a value in a nested dictionary or list given a path.
Args:
data: The dictionary or list to search within.
path_parts: A list of strings representing the path to the desired value.
Returns:
The value found at the specified path, or None if the path does not exist.
If the path points to a list, it returns a list of all matching values.
"""
current_data = data
for part in path_parts:
if isinstance(current_data, dict):
if part in current_data:
current_data = current_data[part]
else:
return None
elif isinstance(current_data, list):
results = []
for item in current_data:
result = find_value_by_path(item, [part])
if result is not None:
if isinstance(result, list):
results.extend(result)
else:
results.append(result)
return results if results else None
else:
return None
return current_data
def count_containers(doc: Dict[str, Any]) -> Dict[str, int]:
"""
Count containers and initContainers for a given document.
Args:
doc: The Kubernetes resource document to analyze
Returns:
Dict with 'containers' and 'initContainers' counts
"""
kind = doc.get('kind')
containers_count = 0
init_containers_count = 0
# Define paths for different resource types
container_paths = {
'Deployment': 'spec.template.spec.containers',
'StatefulSet': 'spec.template.spec.containers',
'DaemonSet': 'spec.template.spec.containers',
'Job': 'spec.template.spec.containers',
'CronJob': 'spec.jobTemplate.spec.template.spec.containers',
'Pod': 'spec.containers'
}
init_container_paths = {
'Deployment': 'spec.template.spec.initContainers',
'StatefulSet': 'spec.template.spec.initContainers',
'DaemonSet': 'spec.template.spec.initContainers',
'Job': 'spec.template.spec.initContainers',
'CronJob': 'spec.jobTemplate.spec.template.spec.initContainers',
'Pod': 'spec.initContainers'
}
if kind in container_paths:
containers = find_value_by_path(doc, container_paths[kind].split('.'))
if isinstance(containers, list):
containers_count = len(containers)
if kind in init_container_paths:
init_containers = find_value_by_path(doc, init_container_paths[kind].split('.'))
if isinstance(init_containers, list):
init_containers_count = len(init_containers)
return {
'containers': containers_count,
'initContainers': init_containers_count
}
def check_security_context(doc: Dict[str, Any]) -> Dict[str, Any]:
"""
Check existing securityContext configurations.
Args:
doc: The Kubernetes resource document to analyze
Returns:
Dict with security context analysis results
"""
kind = doc.get('kind')
security_info = {
'has_pod_security_context': False,
'has_seccomp_profile': False,
'seccomp_type': None,
'containers_with_security_context': 0,
'containers_with_readonly_fs': 0,
'init_containers_with_security_context': 0,
'init_containers_with_readonly_fs': 0,
'needs_pod_seccomp_patch': False,
'containers_needing_readonly_fs': [],
'init_containers_needing_readonly_fs': []
}
# Define paths for pod-level security context
pod_security_paths = {
'Deployment': 'spec.template.spec.securityContext',
'StatefulSet': 'spec.template.spec.securityContext',
'DaemonSet': 'spec.template.spec.securityContext',
'Job': 'spec.template.spec.securityContext',
'CronJob': 'spec.jobTemplate.spec.template.spec.securityContext',
'Pod': 'spec.securityContext'
}
if kind in pod_security_paths:
pod_sc = find_value_by_path(doc, pod_security_paths[kind].split('.'))
if pod_sc and isinstance(pod_sc, dict):
security_info['has_pod_security_context'] = True
seccomp_profile = pod_sc.get('seccompProfile', {})
if seccomp_profile and isinstance(seccomp_profile, dict):
security_info['has_seccomp_profile'] = True
security_info['seccomp_type'] = seccomp_profile.get('type')
# Check if needs seccomp patch
if not security_info['has_seccomp_profile'] or security_info['seccomp_type'] != 'RuntimeDefault':
security_info['needs_pod_seccomp_patch'] = True
# Define base paths for containers
container_base_paths = {
'Deployment': 'spec.template.spec.containers',
'StatefulSet': 'spec.template.spec.containers',
'DaemonSet': 'spec.template.spec.containers',
'Job': 'spec.template.spec.containers',
'CronJob': 'spec.jobTemplate.spec.template.spec.containers',
'Pod': 'spec.containers'
}
init_container_base_paths = {
'Deployment': 'spec.template.spec.initContainers',
'StatefulSet': 'spec.template.spec.initContainers',
'DaemonSet': 'spec.template.spec.initContainers',
'Job': 'spec.template.spec.initContainers',
'CronJob': 'spec.jobTemplate.spec.template.spec.initContainers',
'Pod': 'spec.initContainers'
}
# Check main containers
if kind in container_base_paths:
containers = find_value_by_path(doc, container_base_paths[kind].split('.'))
if isinstance(containers, list):
for i, container in enumerate(containers):
if container.get('securityContext') and isinstance(container['securityContext'], dict):
security_info['containers_with_security_context'] += 1
if container['securityContext'].get('readOnlyRootFilesystem') is True:
security_info['containers_with_readonly_fs'] += 1
else:
security_info['containers_needing_readonly_fs'].append(i)
else:
security_info['containers_needing_readonly_fs'].append(i)
# Check init containers
if kind in init_container_base_paths:
init_containers = find_value_by_path(doc, init_container_base_paths[kind].split('.'))
if isinstance(init_containers, list):
for i, init_container in enumerate(init_containers):
if init_container.get('securityContext') and isinstance(init_container['securityContext'], dict):
security_info['init_containers_with_security_context'] += 1
if init_container['securityContext'].get('readOnlyRootFilesystem') is True:
security_info['init_containers_with_readonly_fs'] += 1
else:
security_info['init_containers_needing_readonly_fs'].append(i)
else:
security_info['init_containers_needing_readonly_fs'].append(i)
return security_info
def get_container_names(doc: Dict[str, Any]) -> Dict[str, List[str]]:
"""
Get container and initContainer names for a given document.
Args:
doc: The Kubernetes resource document to analyze
Returns:
Dict with 'containers' and 'initContainers' name lists
"""
kind = doc.get('kind')
container_names = []
init_container_names = []
# Define paths for different resource types
container_paths = {
'Deployment': 'spec.template.spec.containers',
'StatefulSet': 'spec.template.spec.containers',
'DaemonSet': 'spec.template.spec.containers',
'Job': 'spec.template.spec.containers',
'CronJob': 'spec.jobTemplate.spec.template.spec.containers',
'Pod': 'spec.containers'
}
init_container_paths = {
'Deployment': 'spec.template.spec.initContainers',
'StatefulSet': 'spec.template.spec.initContainers',
'DaemonSet': 'spec.template.spec.initContainers',
'Job': 'spec.template.spec.initContainers',
'CronJob': 'spec.jobTemplate.spec.template.spec.initContainers',
'Pod': 'spec.initContainers'
}
if kind in container_paths:
containers = find_value_by_path(doc, container_paths[kind].split('.'))
if isinstance(containers, list):
container_names = [c.get('name', f'unnamed-{i}') for i, c in enumerate(containers)]
if kind in init_container_paths:
init_containers = find_value_by_path(doc, init_container_paths[kind].split('.'))
if isinstance(init_containers, list):
init_container_names = [c.get('name', f'unnamed-init-{i}') for i, c in enumerate(init_containers)]
return {
'containers': container_names,
'initContainers': init_container_names
}
def get_default_search_rules() -> Dict[str, List[str]]:
"""Returns a dictionary of built-in search rules for common K8s resources."""
return {
'Deployment': [
'metadata.name',
'metadata.namespace',
'spec.replicas',
'spec.template.spec.containers.name',
'spec.template.spec.initContainers.name'
],
'StatefulSet': [
'metadata.name',
'metadata.namespace',
'spec.replicas',
'spec.template.spec.containers.name',
'spec.template.spec.initContainers.name'
],
'CronJob': [
'metadata.name',
'metadata.namespace',
'spec.schedule',
'spec.jobTemplate.spec.template.spec.containers.name',
'spec.jobTemplate.spec.template.spec.initContainers.name'
],
'Service': [
'metadata.name',
'metadata.namespace',
'spec.type',
'spec.ports.port'
],
'Pod': [
'metadata.name',
'metadata.namespace',
'spec.containers.name',
'spec.initContainers.name'
],
'Job': [
'metadata.name',
'metadata.namespace',
'spec.template.spec.containers.name',
'spec.template.spec.initContainers.name'
],
'DaemonSet': [
'metadata.name',
'metadata.namespace',
'spec.template.spec.containers.name',
'spec.template.spec.initContainers.name'
],
'ConfigMap': [
'metadata.name',
'metadata.namespace',
'data'
],
'Secret': [
'metadata.name',
'metadata.namespace',
'type'
]
}
def analyze_yaml(file_path: str, kind_filters: Optional[List[str]], config_file: Optional[str]) -> List[Dict[str, Any]]:
"""
Analyzes a multi-document YAML file to provide insights for kustomization creation.
Args:
file_path: The path to the multi-document YAML file
kind_filters: A list of K8s kinds to filter the analysis by
config_file: The path to a custom YAML configuration file
Returns:
A list of dictionaries containing analysis results for each document
Raises:
FileNotFoundError: If the specified file_path or config_file does not exist
ValueError: If the YAML file or config file is malformed
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"The file '{file_path}' was not found.")
search_rules = get_default_search_rules()
if config_file:
if not os.path.exists(config_file):
raise FileNotFoundError(f"The configuration file '{config_file}' was not found.")
try:
with open(config_file, 'r', encoding='utf-8') as f:
custom_rules = yaml.safe_load(f)
if isinstance(custom_rules, dict):
search_rules.update(custom_rules)
else:
raise ValueError("Configuration file must be a dictionary.")
except yaml.YAMLError as e:
raise ValueError(f"Error parsing configuration file: {e}")
try:
with open(file_path, 'r', encoding='utf-8') as file:
parsed_docs = yaml.load_all(file, Loader=yaml.FullLoader)
extracted_data = []
for i, doc in enumerate(parsed_docs):
if not doc or not isinstance(doc, dict):
continue
kind = doc.get('kind')
if kind_filters and kind not in kind_filters:
continue
name = doc.get('metadata', {}).get('name')
namespace = doc.get('metadata', {}).get('namespace')
document_data: Dict[str, Any] = {
'document_index': i,
'kind': kind,
'name': name,
'namespace': namespace
}
# Add container analysis
container_counts = count_containers(doc)
container_names = get_container_names(doc)
document_data['container_analysis'] = {
'counts': container_counts,
'names': container_names
}
# Add security context analysis
security_analysis = check_security_context(doc)
document_data['security_analysis'] = security_analysis
# Original path-based analysis
if kind in search_rules:
for path in search_rules[kind]:
if path not in ['metadata.name', 'metadata.namespace']: # Skip duplicates
path_parts = path.split('.')
value = find_value_by_path(doc, path_parts)
if value is not None:
document_data[path] = value
extracted_data.append(document_data)
return extracted_data
except yaml.YAMLError as e:
raise ValueError(f"Error parsing YAML file: {e}")
def generate_kustomization_guidance(data: List[Dict[str, Any]]) -> str:
"""
Generate guidance for creating kustomization.yaml based on analysis results.
Args:
data: List of analyzed documents
Returns:
String containing formatted guidance for kustomization creation
"""
guidance = []
# Summary statistics
kind_stats = defaultdict(int)
total_containers = 0
total_init_containers = 0
resources_needing_patches = []
for doc in data:
kind = doc.get('kind', 'Unknown')
kind_stats[kind] += 1
container_counts = doc.get('container_analysis', {}).get('counts', {})
total_containers += container_counts.get('containers', 0)
total_init_containers += container_counts.get('initContainers', 0)
security = doc.get('security_analysis', {})
if (security.get('needs_pod_seccomp_patch') or
security.get('containers_needing_readonly_fs') or
security.get('init_containers_needing_readonly_fs')):
resources_needing_patches.append({
'kind': kind,
'name': doc.get('name'),
'containers': container_counts.get('containers', 0),
'initContainers': container_counts.get('initContainers', 0),
'containers_needing_readonly': len(security.get('containers_needing_readonly_fs', [])),
'init_containers_needing_readonly': len(security.get('init_containers_needing_readonly_fs', []))
})
guidance.append("=== KUSTOMIZATION.YAML CREATION GUIDANCE ===\n")
# Resource summary
guidance.append("📊 Resource Summary:")
for kind, count in sorted(kind_stats.items()):
guidance.append(f" - {kind}: {count} resource(s)")
guidance.append(f" - Total Containers: {total_containers}")
guidance.append(f" - Total Init Containers: {total_init_containers}")
guidance.append("")
# Universal patches needed
guidance.append("🔧 Universal Patches Needed:")
workload_kinds = set(kind_stats.keys()) & {'Deployment', 'StatefulSet', 'Job', 'CronJob', 'DaemonSet', 'Pod'}
if workload_kinds:
guidance.append(" Pod-level seccompProfile patches apply to:")
for kind in sorted(workload_kinds):
guidance.append(f" - {kind} ({kind_stats[kind]} resource(s))")
else:
guidance.append(" ✅ No workload resources found requiring universal patches")
guidance.append("")
# Specific patches needed
guidance.append("🎯 Specific Patches Needed:")
if resources_needing_patches:
guidance.append(" The following resources need container-level readOnlyRootFilesystem patches:")
for resource in resources_needing_patches:
guidance.append(f" - {resource['kind']}/{resource['name']}:")
if resource['containers_needing_readonly'] > 0:
guidance.append(f" * Needs {resource['containers_needing_readonly']} container patch(es)")
if resource['init_containers_needing_readonly'] > 0:
guidance.append(f" * Needs {resource['init_containers_needing_readonly']} initContainer patch(es)")
else:
guidance.append(" ✅ All resources have proper readOnlyRootFilesystem configuration")
guidance.append("")
# Patch strategy recommendation
guidance.append("💡 Recommended Patch Strategy:")
if len(resources_needing_patches) <= 5:
guidance.append(" - Few resources detected, recommend using JSON Patch for precise control")
else:
guidance.append(" - Many resources detected, recommend combining Strategic Merge and JSON Patch")
if total_init_containers > 0:
guidance.append(" - InitContainers detected: create separate patches for each initContainer")
guidance.append(" - Organize universal and specific patches separately for maintainability")
guidance.append(" - Use target selectors (kind + name) for specific resource patches")
guidance.append("")
# Example patch structure
guidance.append("📝 Patch Structure Example:")
guidance.append(" 1. Universal patches (strategic merge) for pod-level securityContext")
guidance.append(" 2. Specific patches (JSON patch) for container-level readOnlyRootFilesystem")
guidance.append(" 3. Separate patches for each initContainer that needs modification")
guidance.append("")
return "\n".join(guidance)
def print_detailed_analysis(data: List[Dict[str, Any]]):
"""Print detailed analysis for each resource."""
print("=== DETAILED RESOURCE ANALYSIS ===\n")
for doc in data:
print(f"📄 Document {doc['document_index']}: {doc.get('kind', 'Unknown')}/{doc.get('name', 'unnamed')}")
if doc.get('namespace'):
print(f" Namespace: {doc['namespace']}")
# Container information
container_analysis = doc.get('container_analysis', {})
counts = container_analysis.get('counts', {})
names = container_analysis.get('names', {})
print(f" Containers: {counts.get('containers', 0)}")
if names.get('containers'):
print(f" Names: {', '.join(names['containers'])}")
print(f" Init Containers: {counts.get('initContainers', 0)}")
if names.get('initContainers'):
print(f" Names: {', '.join(names['initContainers'])}")
# Security analysis
security = doc.get('security_analysis', {})
print(f" Security Configuration:")
print(f" Pod seccompProfile: {'✅ Configured' if not security.get('needs_pod_seccomp_patch') else '❌ Needs patch'}")
containers_readonly = counts.get('containers', 0) - len(security.get('containers_needing_readonly_fs', []))
total_containers = counts.get('containers', 0)
print(f" Container readOnlyRootFilesystem: {containers_readonly}/{total_containers} ✅")
init_containers_readonly = counts.get('initContainers', 0) - len(security.get('init_containers_needing_readonly_fs', []))
total_init_containers = counts.get('initContainers', 0)
print(f" InitContainer readOnlyRootFilesystem: {init_containers_readonly}/{total_init_containers} ✅")
# Show what patches are needed
patches_needed = []
if security.get('needs_pod_seccomp_patch'):
patches_needed.append("Pod seccompProfile")
if security.get('containers_needing_readonly_fs'):
patches_needed.append(f"Container readOnlyRootFilesystem (indices: {security['containers_needing_readonly_fs']})")
if security.get('init_containers_needing_readonly_fs'):
patches_needed.append(f"InitContainer readOnlyRootFilesystem (indices: {security['init_containers_needing_readonly_fs']})")
if patches_needed:
print(f" 🔧 Patches Needed: {', '.join(patches_needed)}")
else:
print(f" ✅ No additional patches required")
print("-" * 80)
def print_summary_analysis(data: List[Dict[str, Any]]):
"""Print a concise summary of the analysis."""
if not data:
print("No resources found to analyze.")
return
kind_stats = defaultdict(int)
total_containers = 0
total_init_containers = 0
resources_needing_pod_patches = 0
resources_needing_container_patches = 0
for doc in data:
kind = doc.get('kind', 'Unknown')
kind_stats[kind] += 1
container_counts = doc.get('container_analysis', {}).get('counts', {})
total_containers += container_counts.get('containers', 0)
total_init_containers += container_counts.get('initContainers', 0)
security = doc.get('security_analysis', {})
if security.get('needs_pod_seccomp_patch'):
resources_needing_pod_patches += 1
if (security.get('containers_needing_readonly_fs') or
security.get('init_containers_needing_readonly_fs')):
resources_needing_container_patches += 1
print("=== ANALYSIS SUMMARY ===")
print(f"Total Resources: {len(data)}")
print(f"Resource Types: {', '.join(sorted(kind_stats.keys()))}")
print(f"Total Containers: {total_containers}")
print(f"Total Init Containers: {total_init_containers}")
print(f"Resources needing pod-level patches: {resources_needing_pod_patches}")
print(f"Resources needing container-level patches: {resources_needing_container_patches}")
print()
def main():
"""Main function to parse arguments and run the analysis."""
parser = argparse.ArgumentParser(
description="Analyze K8s multi-YAML files to guide kustomization.yaml creation",
formatter_class=argparse.RawTextHelpFormatter,
epilog="""
Usage Examples:
# Analyze all resources and get creation guidance
python k8s_analyzer.py manifests.yaml
# Analyze specific resource types
python k8s_analyzer.py manifests.yaml --kind Deployment CronJob
# Get detailed analysis of each resource
python k8s_analyzer.py manifests.yaml --detailed
# Get guidance for creating kustomization.yaml
python k8s_analyzer.py manifests.yaml --guidance
# Output JSON format for further processing
python k8s_analyzer.py manifests.yaml --json
# Use custom search rules
python k8s_analyzer.py manifests.yaml --config custom_rules.yaml
"""
)
parser.add_argument(
'file_path',
metavar='FILE',
help="Path to the multi-YAML file"
)
parser.add_argument(
'--json',
action='store_true',
help="Output results in JSON format"
)
parser.add_argument(
'--kind',
nargs='+',
metavar='KIND',
help="Filter analysis to specific resource kinds (e.g., 'Deployment', 'CronJob')"
)
parser.add_argument(
'--config',
metavar='CONFIG_FILE',
help="Path to custom YAML configuration file with search rules"
)
parser.add_argument(
'--detailed',
action='store_true',
help="Show detailed analysis for each resource"
)
parser.add_argument(
'--guidance',
action='store_true',
help="Show guidance for creating kustomization.yaml"
)
parser.add_argument(
'--summary',
action='store_true',
help="Show only a concise summary of the analysis"
)
args = parser.parse_args()
try:
results = analyze_yaml(args.file_path, args.kind, args.config)
if args.json:
print(json.dumps(results, indent=2, ensure_ascii=False))
else:
if args.summary:
print_summary_analysis(results)
elif args.guidance:
guidance = generate_kustomization_guidance(results)
print(guidance)
elif args.detailed:
print_detailed_analysis(results)
else:
# Default behavior: show guidance
guidance = generate_kustomization_guidance(results)
print(guidance)
print("Use --detailed for detailed resource analysis")
print("Use --summary for a concise summary")
print("Use --guidance to see only creation guidance")
except (FileNotFoundError, ValueError) as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()

Kubernetes YAML Analyzer

A powerful Python script to analyze multi-document Kubernetes YAML files and provide guidance for creating kustomization.yaml files with security context patches.

Features

  • Container Analysis: Accurately counts containers and initContainers for all Kubernetes workload types
  • Security Context Checking: Analyzes existing securityContext configurations
  • Kustomization Guidance: Provides detailed recommendations for creating kustomization.yaml patches
  • Flexible Search: Supports custom search rules via configuration files
  • Multiple Output Formats: Human-readable, JSON, summary, and detailed analysis modes

Supported Kubernetes Resources

  • Deployment
  • StatefulSet
  • DaemonSet
  • Job
  • CronJob
  • Pod
  • Service
  • ConfigMap
  • Secret
  • Custom resources (via configuration)

Installation

Requires Python 3.6+ and PyYAML:

pip install PyYAML

Usage

Basic Analysis

python k8s_analyzer.py manifests.yaml

Filter by Resource Type

python k8s_analyzer.py manifests.yaml --kind Deployment StatefulSet

Detailed Resource Analysis

python k8s_analyzer.py manifests.yaml --detailed

Summary Only

python k8s_analyzer.py manifests.yaml --summary

JSON Output

python k8s_analyzer.py manifests.yaml --json

Custom Search Rules

python k8s_analyzer.py manifests.yaml --config custom_rules.yaml

Output Examples

Default Guidance Output

=== KUSTOMIZATION.YAML CREATION GUIDANCE ===

📊 Resource Summary:
  - Deployment: 2 resource(s)
  - Job: 1 resource(s)
  - Total Containers: 3
  - Total Init Containers: 7

🔧 Universal Patches Needed:
  Pod-level seccompProfile patches apply to:
    - Deployment (2 resource(s))
    - Job (1 resource(s))

🎯 Specific Patches Needed:
  The following resources need container-level readOnlyRootFilesystem patches:
    - Deployment/temporal-proxy:
      * Needs 1 container patch(es)
      * Needs 3 initContainer patch(es)

💡 Recommended Patch Strategy:
  - InitContainers detected: create separate patches for each initContainer
  - Organize universal and specific patches separately for maintainability

Detailed Analysis Output

📄 Document 0: Deployment/temporal-proxy
   Namespace: temporal
   Containers: 1
     Names: temporal-proxy
   Init Containers: 3
     Names: setup, migrate, wait
   Security Configuration:
     Pod seccompProfile: ❌ Needs patch
     Container readOnlyRootFilesystem: 0/1 ✅
     InitContainer readOnlyRootFilesystem: 0/3 ✅
   🔧 Patches Needed: Pod seccompProfile, Container readOnlyRootFilesystem (indices: [0]), InitContainer readOnlyRootFilesystem (indices: [0, 1, 2])

Security Context Analysis

The script analyzes two levels of security context:

  1. Pod-level: Checks for seccompProfile.type: RuntimeDefault
  2. Container-level: Checks for readOnlyRootFilesystem: true on all containers and initContainers

Custom Configuration

Create a YAML file with custom search rules:

# custom_rules.yaml
Ingress:
  - metadata.name
  - metadata.namespace
  - spec.rules.host

PersistentVolumeClaim:
  - metadata.name
  - spec.accessModes
  - spec.resources.requests.storage

Integration with Kustomization

The analysis output helps you create targeted patches in your kustomization.yaml:

  1. Universal patches for pod-level securityContext (strategic merge)
  2. Specific patches for container-level readOnlyRootFilesystem (JSON patch)
  3. Individual patches for each initContainer requiring modification

Command Line Options

Option Description
--kind KIND [KIND ...] Filter analysis to specific resource kinds
--detailed Show detailed analysis for each resource
--summary Show only a concise summary
--guidance Show only kustomization creation guidance
--json Output results in JSON format
--config FILE Use custom search rules from YAML file

Error Handling

The script provides clear error messages for:

  • Missing files
  • Invalid YAML syntax
  • Malformed configuration files
  • Unsupported resource types

Example Workflow

  1. Analyze your existing manifests:

    python k8s_analyzer.py all.yaml --detailed
  2. Review the security analysis and patch recommendations

  3. Create your kustomization.yaml based on the guidance:

    • Add universal patches for pod-level securityContext
    • Add specific patches for containers needing readOnlyRootFilesystem
    • Create individual patches for each initContainer
  4. Test your kustomization:

    kubectl kustomize . | kubectl apply --dry-run=client

JSON Output Structure

When using --json, the output includes:

[
  {
    "document_index": 0,
    "kind": "Deployment",
    "name": "temporal-proxy",
    "namespace": "temporal",
    "container_analysis": {
      "counts": {
        "containers": 1,
        "initContainers": 3
      },
      "names": {
        "containers": ["temporal-proxy"],
        "initContainers": ["setup", "migrate", "wait"]
      }
    },
    "security_analysis": {
      "has_pod_security_context": false,
      "has_seccomp_profile": false,
      "needs_pod_seccomp_patch": true,
      "containers_needing_readonly_fs": [0],
      "init_containers_needing_readonly_fs": [0, 1, 2]
    }
  }
]

Common Use Cases

Security Hardening

Identify resources missing security contexts and generate patches to add:

  • seccompProfile.type: RuntimeDefault at pod level
  • readOnlyRootFilesystem: true at container level

Container Inventory

Get a complete inventory of all containers and initContainers across your manifests:

python k8s_analyzer.py manifests.yaml --json | jq '.[] | {kind, name, containers: .container_analysis.names}'

Compliance Checking

Check which resources need security patches for compliance:

python k8s_analyzer.py manifests.yaml --summary

Patch Planning

Generate a systematic plan for applying security patches using kustomize:

python k8s_analyzer.py manifests.yaml --guidance > patch-plan.txt

Limitations

  • Only analyzes static YAML files (not live cluster resources)
  • Focuses on securityContext patches (other patch types not covered)
  • Requires well-formed YAML input
  • Does not validate Kubernetes API versions or resource schemas

Contributing

To extend the analyzer:

  1. Add new resource types to get_default_search_rules()
  2. Update path mappings in count_containers() and check_security_context()
  3. Add custom analysis functions for specific resource types
  4. Update the guidance generation logic as needed

License

This script is provided as-is for analyzing Kubernetes manifests and generating kustomization guidance.

Version History

  • v1.0: Initial release with core analysis functionality
  • v1.1: Added security context checking and kustomization guidance
  • v1.2: Enhanced error handling and added multiple output formats
  • v1.3: Added custom configuration support and improved container analysis
apiVersion: apps/v1
kind: Deployment
metadata:
name: temporal-proxy
namespace: temporal
spec:
replicas: 1
selector:
matchLabels:
app: temporal-proxy
template:
metadata:
labels:
app: temporal-proxy
spec:
initContainers:
- name: setup
image: busybox
- name: migrate
image: temporal/migrate
- name: wait
image: alpine
containers:
- name: temporal-proxy
image: temporal/proxy
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: temporal-history
namespace: temporal
spec:
replicas: 3
selector:
matchLabels:
app: temporal-history
template:
metadata:
labels:
app: temporal-history
spec:
securityContext:
seccompProfile:
type: RuntimeDefault
initContainers:
- name: setup
image: busybox
securityContext:
readOnlyRootFilesystem: true
containers:
- name: temporal-history
image: temporal/history
securityContext:
readOnlyRootFilesystem: true
---
apiVersion: batch/v1
kind: Job
metadata:
name: database-migrate
namespace: temporal
spec:
template:
spec:
initContainers:
- name: wait-db
image: postgres
- name: setup-schema
image: migrate/migrate
- name: check-health
image: curl
- name: backup
image: postgres
containers:
- name: migrate
image: migrate/migrate
restartPolicy: Never
---
apiVersion: v1
kind: Pod
metadata:
name: debug-pod
namespace: temporal
spec:
containers:
- name: debug
image: busybox
securityContext:
readOnlyRootFilesystem: true
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cleanup-job
namespace: temporal
spec:
schedule: "0 2 * * *"
jobTemplate:
spec:
template:
spec:
securityContext:
seccompProfile:
type: RuntimeDefault
containers:
- name: cleanup
image: busybox
restartPolicy: OnFailure
---
apiVersion: v1
kind: Service
metadata:
name: temporal-frontend
namespace: temporal
spec:
type: ClusterIP
ports:
- port: 7233
targetPort: 7233
selector:
app: temporal-frontend
---
apiVersion: v1
kind: ConfigMap
metadata:
name: temporal-config
namespace: temporal
data:
config.yaml: |
log:
level: info
persistence:
defaultStore: postgres
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: log-collector
namespace: kube-system
spec:
selector:
matchLabels:
name: log-collector
template:
metadata:
labels:
name: log-collector
spec:
containers:
- name: fluentd
image: fluentd:v1.14
securityContext:
readOnlyRootFilesystem: false
initContainers:
- name: init-config
image: busybox
# Custom search rules for additional Kubernetes resources
# This file demonstrates how to extend the analyzer with custom search patterns
# Custom Resource Definitions
CustomResourceDefinition:
- metadata.name
- spec.group
- spec.versions.name
# Ingress resources
Ingress:
- metadata.name
- metadata.namespace
- spec.rules.host
- spec.tls.secretName
# PersistentVolumeClaim resources
PersistentVolumeClaim:
- metadata.name
- metadata.namespace
- spec.accessModes
- spec.resources.requests.storage
# NetworkPolicy resources
NetworkPolicy:
- metadata.name
- metadata.namespace
- spec.podSelector
- spec.policyTypes
# ServiceAccount resources
ServiceAccount:
- metadata.name
- metadata.namespace
- secrets.name
- imagePullSecrets.name
# ClusterRole and ClusterRoleBinding
ClusterRole:
- metadata.name
- rules.apiGroups
- rules.resources
- rules.verbs
ClusterRoleBinding:
- metadata.name
- roleRef.name
- subjects.name
# HorizontalPodAutoscaler
HorizontalPodAutoscaler:
- metadata.name
- metadata.namespace
- spec.scaleTargetRef.name
- spec.minReplicas
- spec.maxReplicas
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment