Last active
December 9, 2024 16:29
-
-
Save qrkourier/b9cacf765b2d62817672bc7e6be6bdc3 to your computer and use it in GitHub Desktop.
parse or verify a Ziti enrollment token as JWT
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# This script parses and attempts to verify the signature of a Ziti JWT token and prints the header, payload, and | |
# analysis of the token. | |
import argparse | |
import json | |
import logging | |
import os | |
import ssl | |
import socket | |
from datetime import datetime | |
from urllib.parse import urlparse | |
import chardet | |
import humanize | |
import jwt | |
from OpenSSL import crypto | |
def detect_encoding(file_path): | |
with open(file_path, 'rb') as file: | |
data = file.read() | |
return chardet.detect(data)['encoding'] | |
def get_file_content_or_string(s): | |
if os.path.isfile(s): | |
with open(s, 'rb') as file: | |
return file.read().decode(detect_encoding(s), 'strict').strip() | |
else: | |
return s | |
def parse_verify_jwt(token, key: str = None): | |
claimset = jwt.decode( | |
jwt=token, | |
key=key, | |
algorithms=["ES256", "RS256"], | |
options={ | |
"verify_signature": True if key else False, | |
"verify_aud": False, | |
"verify_exp": False, | |
} | |
) | |
return claimset | |
def fetch_issuer_pubkey(iss: str): | |
url = urlparse(iss) | |
# Get the server's certificate | |
cert = ssl.get_server_certificate((url.hostname, url.port or 443), timeout=3) | |
# Get the public key from the certificate | |
x509 = crypto.load_certificate(crypto.FILETYPE_PEM, cert) | |
pubkey = crypto.dump_publickey(crypto.FILETYPE_PEM, x509.get_pubkey()) | |
return pubkey | |
# Create the parser | |
parser = argparse.ArgumentParser(description="Process some integers.") | |
# Add the positional arguments | |
parser.add_argument('token', type=str, help='The token to verify (required) as a string or file path') | |
parser.add_argument('pubkey', type=str, nargs='?', default=None, help='The client API\'s pubkey (optional) as a string or file path') | |
# Add the --verbose option | |
parser.add_argument('-v', '--verbose', action='store_true', help='Set log level to DEBUG') | |
# Parse the arguments | |
args = parser.parse_args() | |
# Create a custom logger | |
logger = logging.getLogger(__name__) | |
# Create handlers | |
c_handler = logging.StreamHandler() | |
c_handler.setLevel(logging.INFO) | |
# Create formatters and add it to handlers | |
c_format = logging.Formatter('%(name)s - %(levelname)s - %(message)s') | |
c_handler.setFormatter(c_format) | |
# Add handlers to the logger | |
logger.addHandler(c_handler) | |
# Set up logging | |
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO) | |
token = get_file_content_or_string(args.token) | |
if args.pubkey: | |
pubkey = get_file_content_or_string(args.pubkey) | |
else: | |
claimset = parse_verify_jwt(token) | |
try: | |
pubkey = fetch_issuer_pubkey(claimset["iss"]) | |
except (TimeoutError, socket.gaierror) as e: | |
logger.debug(f"failed to fetch issuer pubkey: {e}") | |
pubkey = None | |
if pubkey: | |
logger.debug(f"verifying '{token}' with pubkey '{pubkey}'") | |
else: | |
logger.debug(f"parsing '{token}' without a pubkey for signature verification") | |
header = jwt.get_unverified_header(token) | |
try: | |
claimset = parse_verify_jwt(token, pubkey) | |
if pubkey: | |
signature_valid = True | |
else: | |
signature_valid = False | |
except jwt.exceptions.InvalidSignatureError: | |
signature_valid = False | |
if 'em' in claimset: | |
if claimset['em'] == 'ott': | |
enrollment_method = 'one-time token for an identity from the built-in edge signer CA' | |
elif claimset['em'] == 'ottca': | |
enrollment_method = 'one-time token for a an identity from a trusted external CA' | |
elif claimset['em'] == 'ca': | |
enrollment_method = 'reusable token for auto-creating an identity from a trusted external CA' | |
elif claimset['em'] == 'erott': | |
enrollment_method = 'one-time token for a router' | |
else: | |
enrollment_method = 'unknown' | |
analysis = { | |
"signature_valid": signature_valid, | |
"enrollment_method": enrollment_method | |
} | |
if 'exp' in claimset: | |
expiration_time = datetime.fromtimestamp(claimset['exp']) | |
expires_in = expiration_time - datetime.now() | |
if expires_in.total_seconds() > 0: | |
analysis['expiration'] = ( | |
f"valid until {expiration_time.isoformat()} " | |
f"({humanize.naturaldelta(expires_in)})" | |
) | |
else: | |
expires_ago = -expires_in | |
analysis['expiration'] = ( | |
f"expired since {expiration_time.isoformat()} " | |
f"({humanize.naturaldelta(expires_ago)})" | |
) | |
print( | |
json.dumps({ | |
"header": header, | |
"payload": claimset, | |
"analysis": analysis | |
}, indent=4) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment