Skip to content

Instantly share code, notes, and snippets.

@hiyosi
Created September 20, 2025 12:04
Show Gist options
  • Save hiyosi/861c70c014ebd58f63675c726797a93d to your computer and use it in GitHub Desktop.
Save hiyosi/861c70c014ebd58f63675c726797a93d to your computer and use it in GitHub Desktop.
Pod Identity Extension Extractor and Decoder
#!/usr/bin/env python3
"""
Pod Identity Extension Extractor and Decoder
Extracts and decodes the Pod Identity extension from a certificate
"""
import subprocess
import binascii
import sys
import re
import json
from typing import Optional, Dict, Any
def get_certificate_from_pod(namespace: str = "default", pod_name: str = "pod-cert-test") -> str:
"""Get certificate from Pod"""
cmd = [
"kubectl", "exec", pod_name, "-n", namespace, "--",
"cat", "/etc/certs/credential-bundle.pem"
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"Error getting certificate from pod: {result.stderr}", file=sys.stderr)
sys.exit(1)
return result.stdout
def extract_pod_identity_hex(cert_pem: str) -> Optional[str]:
"""Extract Pod Identity extension HEX data from certificate"""
# Convert PEM to DER and parse with openssl asn1parse
openssl_cmd = ["openssl", "x509", "-outform", "DER"]
der_result = subprocess.run(openssl_cmd, input=cert_pem.encode(),
capture_output=True)
if der_result.returncode != 0:
print(f"Error converting to DER: {der_result.stderr.decode()}", file=sys.stderr)
return None
# Parse ASN.1 structure
asn1_cmd = ["openssl", "asn1parse", "-inform", "DER"]
asn1_result = subprocess.run(asn1_cmd, input=der_result.stdout,
capture_output=True)
if asn1_result.returncode != 0:
print(f"Error parsing ASN.1: {asn1_result.stderr.decode()}", file=sys.stderr)
return None
# Find Pod Identity OID (1.3.6.1.4.1.57683.1) and extract HEX data
lines = asn1_result.stdout.decode().split('\n')
for i, line in enumerate(lines):
if '1.3.6.1.4.1.57683.1' in line:
# Look for the OCTET STRING with HEX DUMP in next few lines
for j in range(i+1, min(i+5, len(lines))):
if 'OCTET STRING' in lines[j] and '[HEX DUMP]:' in lines[j]:
hex_match = re.search(r'\[HEX DUMP\]:([0-9A-F]+)', lines[j])
if hex_match:
return hex_match.group(1)
print("Pod Identity extension not found in certificate", file=sys.stderr)
return None
def decode_pod_identity(hex_data: str) -> Dict[str, str]:
"""Decode Pod Identity ASN.1 structure from HEX data"""
# Convert hex to bytes
try:
data = binascii.unhexlify(hex_data)
except Exception as e:
print(f"Error decoding hex: {e}", file=sys.stderr)
return {}
# Skip SEQUENCE header (0x30 0x81/0x82 LENGTH)
pos = 0
if data[pos] != 0x30:
print(f"Expected SEQUENCE (0x30), got {hex(data[pos])}", file=sys.stderr)
return {}
pos += 1
# Handle length encoding
if data[pos] == 0x81:
pos += 2 # 0x81 + 1 byte length
elif data[pos] == 0x82:
pos += 3 # 0x82 + 2 byte length
else:
pos += 1 # Direct length
# Field definitions
fields = [
("Namespace", 0x80),
("NamespaceUID", 0x81),
("ServiceAccountName", 0x82),
("ServiceAccountUID", 0x83),
("PodName", 0x84),
("PodUID", 0x85),
("NodeName", 0x86),
("NodeUID", 0x87)
]
result = {}
for field_name, expected_tag in fields:
if pos >= len(data):
break
# Check tag
tag = data[pos]
if tag != expected_tag:
print(f"Unexpected tag {hex(tag)} for field {field_name}, expected {hex(expected_tag)}", file=sys.stderr)
break
pos += 1
# Get length
if pos >= len(data):
break
length = data[pos]
pos += 1
# Extract value
if pos + length > len(data):
print(f"Insufficient data for field {field_name}", file=sys.stderr)
break
value = data[pos:pos+length].decode('utf-8', errors='replace')
result[field_name] = value
pos += length
return result
def main():
"""Main function"""
# Parse arguments
if len(sys.argv) > 1:
if sys.argv[1] in ['-h', '--help']:
print("Usage: python3 extract-and-decode-pod-identity.py [namespace] [pod-name]")
print(" python3 extract-and-decode-pod-identity.py --from-file cert.pem")
print("\nExtracts and decodes Pod Identity extension from certificate")
sys.exit(0)
elif sys.argv[1] == '--from-file':
# Read certificate from file
if len(sys.argv) < 3:
print("Error: filename required after --from-file", file=sys.stderr)
sys.exit(1)
with open(sys.argv[2], 'r') as f:
cert_pem = f.read()
else:
# Get from Pod
namespace = sys.argv[1] if len(sys.argv) > 1 else "default"
pod_name = sys.argv[2] if len(sys.argv) > 2 else "pod-cert-test"
print(f"Getting certificate from pod {pod_name} in namespace {namespace}...")
cert_pem = get_certificate_from_pod(namespace, pod_name)
else:
# Default: get from pod-cert-test in default namespace
print("Getting certificate from pod pod-cert-test in namespace default...")
cert_pem = get_certificate_from_pod()
# Extract HEX data
print("\n1. Extracting Pod Identity extension HEX data...")
hex_data = extract_pod_identity_hex(cert_pem)
if not hex_data:
print("Failed to extract Pod Identity extension", file=sys.stderr)
sys.exit(1)
print(f" Found HEX data ({len(hex_data)} characters, {len(hex_data)//2} bytes)")
# Display HEX data in formatted way
print("\n2. HEX Data (formatted):")
for i in range(0, len(hex_data), 32):
print(f" {hex_data[i:i+32]}")
# Decode Pod Identity
print("\n3. Decoding Pod Identity fields...")
pod_identity = decode_pod_identity(hex_data)
if not pod_identity:
print("Failed to decode Pod Identity", file=sys.stderr)
sys.exit(1)
# Display results
print("\n4. Pod Identity Extension Contents:")
print(" " + "="*60)
for field, value in pod_identity.items():
print(f" {field:20s}: {value}")
print(" " + "="*60)
# Optional: Output as JSON
print("\n5. JSON representation:")
print(json.dumps(pod_identity, indent=2))
# Show ASN.1 structure summary
print("\n6. ASN.1 Structure Summary:")
print(f" SEQUENCE ({len(hex_data)//2} bytes total)")
for i, (field, value) in enumerate(pod_identity.items()):
print(f" ├─ [{i}] {field}: \"{value}\" ({len(value)} chars)")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment