Created
September 2, 2024 14:05
-
-
Save alexlovelltroy/ee5d5f3a034301e6f6ff225d39991462 to your computer and use it in GitHub Desktop.
OIDC client_assertion with private key jwt for authelia
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
## Demonstration code for confirming that https://github.com/authelia/authelia/issues/7790 is fixed in authelia | |
## Tested with the OIDC suite. | |
## Be sure to add the client public key (~/.config/jwt-client/public.pem) to your configuration.yml once both have been created | |
## python script.py auth --client-id=oidc-tester-app-2 --auth-url=https://login.example.com:8080 --redirect-uri=https://oidc.example.com:8080/oauth2/callback --insecure | |
## MIT License. Do what you want. | |
""" | |
Copyright (c) 2024 [email protected] | |
Permission is hereby granted, free of charge, to any person obtaining a copy | |
of this software and associated documentation files (the "Software"), to deal | |
in the Software without restriction, including without limitation the rights | |
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
copies of the Software, and to permit persons to whom the Software is | |
furnished to do so, subject to the following conditions: | |
The above copyright notice and this permission notice shall be included in all | |
copies or substantial portions of the Software. | |
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
SOFTWARE. | |
""" | |
import json | |
import time | |
import uuid | |
import requests | |
import jwt | |
from pathlib import Path | |
from cryptography.hazmat.primitives.asymmetric import rsa | |
from cryptography.hazmat.primitives import serialization | |
import argparse | |
CONFIG_DIR = Path.home() / '.config' / 'jwt_client' | |
CONFIG_FILE = CONFIG_DIR / 'config.json' | |
PRIVATE_KEY_FILE = CONFIG_DIR / 'private.key' | |
PUBLIC_KEY_FILE = CONFIG_DIR / 'public.pem' | |
def load_config(): | |
"""Load configuration from the config file.""" | |
if CONFIG_FILE.exists(): | |
with open(CONFIG_FILE) as f: | |
return json.load(f) | |
return {} | |
def save_config(config): | |
"""Save configuration to the config file.""" | |
CONFIG_DIR.mkdir(parents=True, exist_ok=True) | |
with open(CONFIG_FILE, 'w') as f: | |
json.dump(config, f, indent=4) | |
def generate_keys(): | |
"""Generate RSA key pair and save to files.""" | |
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) | |
private_key_bytes = private_key.private_bytes( | |
serialization.Encoding.PEM, serialization.PrivateFormat.TraditionalOpenSSL, serialization.NoEncryption() | |
) | |
with open(PRIVATE_KEY_FILE, 'wb') as f: | |
f.write(private_key_bytes) | |
public_key_bytes = private_key.public_key().public_bytes( | |
serialization.Encoding.PEM, serialization.PublicFormat.SubjectPublicKeyInfo | |
) | |
with open(PUBLIC_KEY_FILE, 'wb') as f: | |
f.write(public_key_bytes) | |
def get_or_create_keys(): | |
"""Ensure that keys exist, creating them if necessary.""" | |
if not PRIVATE_KEY_FILE.exists() or not PUBLIC_KEY_FILE.exists(): | |
generate_keys() | |
return True | |
return False | |
def get_openid_configuration(auth_url, verify=True): | |
"""Retrieve the OpenID configuration from the well-known endpoint.""" | |
response = requests.get(f'{auth_url}/.well-known/openid-configuration', verify=verify) | |
response.raise_for_status() | |
return response.json() | |
def generate_client_assertion(client_id, token_endpoint, key_id): | |
"""Generate a JWT client assertion with the specified key ID.""" | |
try: | |
with open(PRIVATE_KEY_FILE, 'rb') as f: | |
private_key = serialization.load_pem_private_key(f.read(), password=None) | |
except Exception as e: | |
print(f"Error loading private key: {e}") | |
return None | |
payload = { | |
"iss": client_id, | |
"sub": client_id, | |
"aud": token_endpoint, | |
"iat": int(time.time()), | |
"exp": int(time.time()) + 60, | |
"jti": str(uuid.uuid4()), | |
"client_id": client_id | |
} | |
headers = {"kid": key_id, "alg": "RS256"} | |
try: | |
return jwt.encode(payload, private_key, algorithm='RS256', headers=headers) | |
except Exception as e: | |
print(f"Error generating client assertion JWT: {e}") | |
return None | |
def par_request(par_endpoint, client_id, client_assertion, redirect_uri, verify=True): | |
"""Make a Pushed Authorization Request.""" | |
response = requests.post(par_endpoint, data={ | |
'client_id': client_id, | |
'client_assertion_type': 'urn:ietf:params:oauth:client-assertion-type:jwt-bearer', | |
'client_assertion': client_assertion, | |
'redirect_uri': redirect_uri, | |
'response_type': 'code', | |
'scope': 'offline_access offline', | |
'state': uuid.uuid4() | |
}, verify=verify) | |
response.raise_for_status() | |
return response.json() | |
def auth(args): | |
"""Perform authentication using the PAR flow with Authelia.""" | |
config = load_config() | |
client_id = args.client_id or config.get('client_id') | |
auth_url = args.auth_url or config.get('auth_url') | |
if not client_id or not auth_url: | |
print("Error: client-id and auth-url must be provided.") | |
return | |
if 'client_id' not in config: | |
config['client_id'] = client_id | |
save_config(config) | |
if 'auth_url' not in config: | |
config['auth_url'] = auth_url | |
save_config(config) | |
try: | |
openid_config = get_openid_configuration(auth_url, verify=args.cacert or not args.insecure) | |
except requests.exceptions.HTTPError as e: | |
print(f"Error retrieving OpenID configuration: {e}") | |
return | |
if get_or_create_keys(): | |
print("New keys generated. Update your configuration accordingly.") | |
key_id = f"{client_id}-key" # Generate key ID based on client ID | |
client_assertion = generate_client_assertion(client_id, openid_config['token_endpoint'], key_id) | |
if not client_assertion: | |
print("Failed to generate a valid client assertion.") | |
return | |
if not args.redirect_uri: | |
print("Error: redirect_uri must be provided.") | |
return | |
try: | |
par_response = par_request(openid_config['pushed_authorization_request_endpoint'], client_id, client_assertion, args.redirect_uri, verify=args.cacert or not args.insecure) | |
print("PAR Response:", par_response) | |
except Exception as e: | |
print(f"Error in Pushed Authorization Request: {e}") | |
def main(): | |
parser = argparse.ArgumentParser(description="OpenChami Authentication Client") | |
parser.add_argument('--cacert', type=str, help='Path to a CA certificate bundle file for HTTPS connections') | |
subparsers = parser.add_subparsers(dest='command') | |
auth_parser = subparsers.add_parser('auth', help='Authenticate with Authelia') | |
auth_parser.add_argument('--client-id', type=str, help='Client ID for authentication') | |
auth_parser.add_argument('--auth-url', type=str, help='Authentication URL') | |
auth_parser.add_argument('--redirect-uri', type=str, help='Redirect URI for the authorization code flow') | |
auth_parser.add_argument('--insecure', action='store_true', help='Accept self-signed certificates') | |
args = parser.parse_args() | |
if args.command == 'auth': | |
auth(args) | |
else: | |
parser.print_help() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment