Forked from williamzujkowski/kubefence-policy-generator.py
Created
December 3, 2025 11:15
-
-
Save adampielak/f6f03b23c3268c028a4eb86b51a8f044 to your computer and use it in GitHub Desktop.
KubeFence policy generator from Kubernetes audit logs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| KubeFence Policy Generator | |
| Analyzes K3s audit logs and generates fine-grained API policies | |
| Usage: | |
| python3 kubefence-policy-generator.py --namespace default --output policies/ | |
| """ | |
| import argparse | |
| import json | |
| import yaml | |
| from collections import defaultdict | |
| from pathlib import Path | |
| def parse_audit_logs(audit_file: str, namespace: str = None) -> dict: | |
| """Parse K3s audit logs and extract API call patterns""" | |
| api_calls = defaultdict(lambda: defaultdict(set)) | |
| with open(audit_file, 'r') as f: | |
| for line in f: | |
| try: | |
| event = json.loads(line) | |
| # Filter by namespace if specified | |
| ns = event.get('objectRef', {}).get('namespace') | |
| if namespace and ns != namespace: | |
| continue | |
| # Extract metadata | |
| user = event.get('user', {}).get('username', 'unknown') | |
| verb = event.get('verb') | |
| resource = event.get('objectRef', {}).get('resource') | |
| subresource = event.get('objectRef', {}).get('subresource') | |
| # Skip system components | |
| if user.startswith('system:'): | |
| continue | |
| # Build resource path | |
| resource_path = resource | |
| if subresource: | |
| resource_path = f"{resource}/{subresource}" | |
| # Record API call pattern | |
| api_calls[user][verb].add(resource_path) | |
| except json.JSONDecodeError: | |
| continue | |
| return api_calls | |
| def generate_policies(api_calls: dict, namespace: str = "default") -> list: | |
| """Generate KubeFence policy YAML from API call patterns""" | |
| policies = [] | |
| for user, verbs in api_calls.items(): | |
| policy = { | |
| 'apiVersion': 'kubefence.io/v1', | |
| 'kind': 'APIPolicy', | |
| 'metadata': { | |
| 'name': f"policy-{user.replace(':', '-')}", | |
| 'namespace': namespace | |
| }, | |
| 'spec': { | |
| 'serviceAccount': user, | |
| 'rules': [] | |
| } | |
| } | |
| for verb, resources in verbs.items(): | |
| rule = { | |
| 'verbs': [verb], | |
| 'resources': sorted(list(resources)) | |
| } | |
| policy['spec']['rules'].append(rule) | |
| policies.append(policy) | |
| return policies | |
| def main(): | |
| parser = argparse.ArgumentParser(description='Generate KubeFence policies from audit logs') | |
| parser.add_argument('--audit-log', default='/var/log/kubernetes/audit.log', | |
| help='Path to K3s audit log file') | |
| parser.add_argument('--namespace', default=None, | |
| help='Filter by namespace') | |
| parser.add_argument('--output', default='policies/', | |
| help='Output directory for policy YAML files') | |
| args = parser.parse_args() | |
| # Parse audit logs | |
| print(f"Parsing audit logs from {args.audit_log}...") | |
| api_calls = parse_audit_logs(args.audit_log, args.namespace) | |
| # Generate policies | |
| print(f"Generating policies for {len(api_calls)} service accounts...") | |
| policies = generate_policies(api_calls, args.namespace or 'default') | |
| # Write policies to files | |
| output_dir = Path(args.output) | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| for policy in policies: | |
| filename = f"{policy['metadata']['name']}.yaml" | |
| filepath = output_dir / filename | |
| with open(filepath, 'w') as f: | |
| yaml.dump(policy, f, default_flow_style=False) | |
| print(f"Generated policy: {filepath}") | |
| print(f"\nGenerated {len(policies)} policies in {output_dir}") | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment