Created
September 20, 2025 12:04
-
-
Save hiyosi/861c70c014ebd58f63675c726797a93d to your computer and use it in GitHub Desktop.
Pod Identity Extension Extractor and Decoder
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Pod Identity Extension Extractor and Decoder | |
Extracts and decodes the Pod Identity extension from a certificate | |
""" | |
import subprocess | |
import binascii | |
import sys | |
import re | |
import json | |
from typing import Optional, Dict, Any | |
def get_certificate_from_pod(namespace: str = "default", pod_name: str = "pod-cert-test") -> str: | |
"""Get certificate from Pod""" | |
cmd = [ | |
"kubectl", "exec", pod_name, "-n", namespace, "--", | |
"cat", "/etc/certs/credential-bundle.pem" | |
] | |
result = subprocess.run(cmd, capture_output=True, text=True) | |
if result.returncode != 0: | |
print(f"Error getting certificate from pod: {result.stderr}", file=sys.stderr) | |
sys.exit(1) | |
return result.stdout | |
def extract_pod_identity_hex(cert_pem: str) -> Optional[str]: | |
"""Extract Pod Identity extension HEX data from certificate""" | |
# Convert PEM to DER and parse with openssl asn1parse | |
openssl_cmd = ["openssl", "x509", "-outform", "DER"] | |
der_result = subprocess.run(openssl_cmd, input=cert_pem.encode(), | |
capture_output=True) | |
if der_result.returncode != 0: | |
print(f"Error converting to DER: {der_result.stderr.decode()}", file=sys.stderr) | |
return None | |
# Parse ASN.1 structure | |
asn1_cmd = ["openssl", "asn1parse", "-inform", "DER"] | |
asn1_result = subprocess.run(asn1_cmd, input=der_result.stdout, | |
capture_output=True) | |
if asn1_result.returncode != 0: | |
print(f"Error parsing ASN.1: {asn1_result.stderr.decode()}", file=sys.stderr) | |
return None | |
# Find Pod Identity OID (1.3.6.1.4.1.57683.1) and extract HEX data | |
lines = asn1_result.stdout.decode().split('\n') | |
for i, line in enumerate(lines): | |
if '1.3.6.1.4.1.57683.1' in line: | |
# Look for the OCTET STRING with HEX DUMP in next few lines | |
for j in range(i+1, min(i+5, len(lines))): | |
if 'OCTET STRING' in lines[j] and '[HEX DUMP]:' in lines[j]: | |
hex_match = re.search(r'\[HEX DUMP\]:([0-9A-F]+)', lines[j]) | |
if hex_match: | |
return hex_match.group(1) | |
print("Pod Identity extension not found in certificate", file=sys.stderr) | |
return None | |
def decode_pod_identity(hex_data: str) -> Dict[str, str]: | |
"""Decode Pod Identity ASN.1 structure from HEX data""" | |
# Convert hex to bytes | |
try: | |
data = binascii.unhexlify(hex_data) | |
except Exception as e: | |
print(f"Error decoding hex: {e}", file=sys.stderr) | |
return {} | |
# Skip SEQUENCE header (0x30 0x81/0x82 LENGTH) | |
pos = 0 | |
if data[pos] != 0x30: | |
print(f"Expected SEQUENCE (0x30), got {hex(data[pos])}", file=sys.stderr) | |
return {} | |
pos += 1 | |
# Handle length encoding | |
if data[pos] == 0x81: | |
pos += 2 # 0x81 + 1 byte length | |
elif data[pos] == 0x82: | |
pos += 3 # 0x82 + 2 byte length | |
else: | |
pos += 1 # Direct length | |
# Field definitions | |
fields = [ | |
("Namespace", 0x80), | |
("NamespaceUID", 0x81), | |
("ServiceAccountName", 0x82), | |
("ServiceAccountUID", 0x83), | |
("PodName", 0x84), | |
("PodUID", 0x85), | |
("NodeName", 0x86), | |
("NodeUID", 0x87) | |
] | |
result = {} | |
for field_name, expected_tag in fields: | |
if pos >= len(data): | |
break | |
# Check tag | |
tag = data[pos] | |
if tag != expected_tag: | |
print(f"Unexpected tag {hex(tag)} for field {field_name}, expected {hex(expected_tag)}", file=sys.stderr) | |
break | |
pos += 1 | |
# Get length | |
if pos >= len(data): | |
break | |
length = data[pos] | |
pos += 1 | |
# Extract value | |
if pos + length > len(data): | |
print(f"Insufficient data for field {field_name}", file=sys.stderr) | |
break | |
value = data[pos:pos+length].decode('utf-8', errors='replace') | |
result[field_name] = value | |
pos += length | |
return result | |
def main(): | |
"""Main function""" | |
# Parse arguments | |
if len(sys.argv) > 1: | |
if sys.argv[1] in ['-h', '--help']: | |
print("Usage: python3 extract-and-decode-pod-identity.py [namespace] [pod-name]") | |
print(" python3 extract-and-decode-pod-identity.py --from-file cert.pem") | |
print("\nExtracts and decodes Pod Identity extension from certificate") | |
sys.exit(0) | |
elif sys.argv[1] == '--from-file': | |
# Read certificate from file | |
if len(sys.argv) < 3: | |
print("Error: filename required after --from-file", file=sys.stderr) | |
sys.exit(1) | |
with open(sys.argv[2], 'r') as f: | |
cert_pem = f.read() | |
else: | |
# Get from Pod | |
namespace = sys.argv[1] if len(sys.argv) > 1 else "default" | |
pod_name = sys.argv[2] if len(sys.argv) > 2 else "pod-cert-test" | |
print(f"Getting certificate from pod {pod_name} in namespace {namespace}...") | |
cert_pem = get_certificate_from_pod(namespace, pod_name) | |
else: | |
# Default: get from pod-cert-test in default namespace | |
print("Getting certificate from pod pod-cert-test in namespace default...") | |
cert_pem = get_certificate_from_pod() | |
# Extract HEX data | |
print("\n1. Extracting Pod Identity extension HEX data...") | |
hex_data = extract_pod_identity_hex(cert_pem) | |
if not hex_data: | |
print("Failed to extract Pod Identity extension", file=sys.stderr) | |
sys.exit(1) | |
print(f" Found HEX data ({len(hex_data)} characters, {len(hex_data)//2} bytes)") | |
# Display HEX data in formatted way | |
print("\n2. HEX Data (formatted):") | |
for i in range(0, len(hex_data), 32): | |
print(f" {hex_data[i:i+32]}") | |
# Decode Pod Identity | |
print("\n3. Decoding Pod Identity fields...") | |
pod_identity = decode_pod_identity(hex_data) | |
if not pod_identity: | |
print("Failed to decode Pod Identity", file=sys.stderr) | |
sys.exit(1) | |
# Display results | |
print("\n4. Pod Identity Extension Contents:") | |
print(" " + "="*60) | |
for field, value in pod_identity.items(): | |
print(f" {field:20s}: {value}") | |
print(" " + "="*60) | |
# Optional: Output as JSON | |
print("\n5. JSON representation:") | |
print(json.dumps(pod_identity, indent=2)) | |
# Show ASN.1 structure summary | |
print("\n6. ASN.1 Structure Summary:") | |
print(f" SEQUENCE ({len(hex_data)//2} bytes total)") | |
for i, (field, value) in enumerate(pod_identity.items()): | |
print(f" ├─ [{i}] {field}: \"{value}\" ({len(value)} chars)") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment