Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save adampielak/f6f03b23c3268c028a4eb86b51a8f044 to your computer and use it in GitHub Desktop.

Select an option

Save adampielak/f6f03b23c3268c028a4eb86b51a8f044 to your computer and use it in GitHub Desktop.
KubeFence policy generator from Kubernetes audit logs
#!/usr/bin/env python3
"""
KubeFence Policy Generator
Analyzes K3s audit logs and generates fine-grained API policies
Usage:
python3 kubefence-policy-generator.py --namespace default --output policies/
"""
import argparse
import json
import yaml
from collections import defaultdict
from pathlib import Path
def parse_audit_logs(audit_file: str, namespace: str = None) -> dict:
"""Parse K3s audit logs and extract API call patterns"""
api_calls = defaultdict(lambda: defaultdict(set))
with open(audit_file, 'r') as f:
for line in f:
try:
event = json.loads(line)
# Filter by namespace if specified
ns = event.get('objectRef', {}).get('namespace')
if namespace and ns != namespace:
continue
# Extract metadata
user = event.get('user', {}).get('username', 'unknown')
verb = event.get('verb')
resource = event.get('objectRef', {}).get('resource')
subresource = event.get('objectRef', {}).get('subresource')
# Skip system components
if user.startswith('system:'):
continue
# Build resource path
resource_path = resource
if subresource:
resource_path = f"{resource}/{subresource}"
# Record API call pattern
api_calls[user][verb].add(resource_path)
except json.JSONDecodeError:
continue
return api_calls
def generate_policies(api_calls: dict, namespace: str = "default") -> list:
"""Generate KubeFence policy YAML from API call patterns"""
policies = []
for user, verbs in api_calls.items():
policy = {
'apiVersion': 'kubefence.io/v1',
'kind': 'APIPolicy',
'metadata': {
'name': f"policy-{user.replace(':', '-')}",
'namespace': namespace
},
'spec': {
'serviceAccount': user,
'rules': []
}
}
for verb, resources in verbs.items():
rule = {
'verbs': [verb],
'resources': sorted(list(resources))
}
policy['spec']['rules'].append(rule)
policies.append(policy)
return policies
def main():
parser = argparse.ArgumentParser(description='Generate KubeFence policies from audit logs')
parser.add_argument('--audit-log', default='/var/log/kubernetes/audit.log',
help='Path to K3s audit log file')
parser.add_argument('--namespace', default=None,
help='Filter by namespace')
parser.add_argument('--output', default='policies/',
help='Output directory for policy YAML files')
args = parser.parse_args()
# Parse audit logs
print(f"Parsing audit logs from {args.audit_log}...")
api_calls = parse_audit_logs(args.audit_log, args.namespace)
# Generate policies
print(f"Generating policies for {len(api_calls)} service accounts...")
policies = generate_policies(api_calls, args.namespace or 'default')
# Write policies to files
output_dir = Path(args.output)
output_dir.mkdir(parents=True, exist_ok=True)
for policy in policies:
filename = f"{policy['metadata']['name']}.yaml"
filepath = output_dir / filename
with open(filepath, 'w') as f:
yaml.dump(policy, f, default_flow_style=False)
print(f"Generated policy: {filepath}")
print(f"\nGenerated {len(policies)} policies in {output_dir}")
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment