Skip to content

Instantly share code, notes, and snippets.

@alexlovelltroy
Created September 2, 2024 14:05
Show Gist options
  • Save alexlovelltroy/ee5d5f3a034301e6f6ff225d39991462 to your computer and use it in GitHub Desktop.
Save alexlovelltroy/ee5d5f3a034301e6f6ff225d39991462 to your computer and use it in GitHub Desktop.
OIDC client_assertion with private key jwt for authelia
#!/usr/bin/env python
## Demonstration code for confirming that https://github.com/authelia/authelia/issues/7790 is fixed in authelia
## Tested with the OIDC suite.
## Be sure to add the client public key (~/.config/jwt-client/public.pem) to your configuration.yml once both have been created
## python script.py auth --client-id=oidc-tester-app-2 --auth-url=https://login.example.com:8080 --redirect-uri=https://oidc.example.com:8080/oauth2/callback --insecure
## MIT License. Do what you want.
"""
Copyright (c) 2024 [email protected]
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import json
import time
import uuid
import requests
import jwt
from pathlib import Path
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
import argparse
CONFIG_DIR = Path.home() / '.config' / 'jwt_client'
CONFIG_FILE = CONFIG_DIR / 'config.json'
PRIVATE_KEY_FILE = CONFIG_DIR / 'private.key'
PUBLIC_KEY_FILE = CONFIG_DIR / 'public.pem'
def load_config():
"""Load configuration from the config file."""
if CONFIG_FILE.exists():
with open(CONFIG_FILE) as f:
return json.load(f)
return {}
def save_config(config):
"""Save configuration to the config file."""
CONFIG_DIR.mkdir(parents=True, exist_ok=True)
with open(CONFIG_FILE, 'w') as f:
json.dump(config, f, indent=4)
def generate_keys():
"""Generate RSA key pair and save to files."""
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
private_key_bytes = private_key.private_bytes(
serialization.Encoding.PEM, serialization.PrivateFormat.TraditionalOpenSSL, serialization.NoEncryption()
)
with open(PRIVATE_KEY_FILE, 'wb') as f:
f.write(private_key_bytes)
public_key_bytes = private_key.public_key().public_bytes(
serialization.Encoding.PEM, serialization.PublicFormat.SubjectPublicKeyInfo
)
with open(PUBLIC_KEY_FILE, 'wb') as f:
f.write(public_key_bytes)
def get_or_create_keys():
"""Ensure that keys exist, creating them if necessary."""
if not PRIVATE_KEY_FILE.exists() or not PUBLIC_KEY_FILE.exists():
generate_keys()
return True
return False
def get_openid_configuration(auth_url, verify=True):
"""Retrieve the OpenID configuration from the well-known endpoint."""
response = requests.get(f'{auth_url}/.well-known/openid-configuration', verify=verify)
response.raise_for_status()
return response.json()
def generate_client_assertion(client_id, token_endpoint, key_id):
"""Generate a JWT client assertion with the specified key ID."""
try:
with open(PRIVATE_KEY_FILE, 'rb') as f:
private_key = serialization.load_pem_private_key(f.read(), password=None)
except Exception as e:
print(f"Error loading private key: {e}")
return None
payload = {
"iss": client_id,
"sub": client_id,
"aud": token_endpoint,
"iat": int(time.time()),
"exp": int(time.time()) + 60,
"jti": str(uuid.uuid4()),
"client_id": client_id
}
headers = {"kid": key_id, "alg": "RS256"}
try:
return jwt.encode(payload, private_key, algorithm='RS256', headers=headers)
except Exception as e:
print(f"Error generating client assertion JWT: {e}")
return None
def par_request(par_endpoint, client_id, client_assertion, redirect_uri, verify=True):
"""Make a Pushed Authorization Request."""
response = requests.post(par_endpoint, data={
'client_id': client_id,
'client_assertion_type': 'urn:ietf:params:oauth:client-assertion-type:jwt-bearer',
'client_assertion': client_assertion,
'redirect_uri': redirect_uri,
'response_type': 'code',
'scope': 'offline_access offline',
'state': uuid.uuid4()
}, verify=verify)
response.raise_for_status()
return response.json()
def auth(args):
"""Perform authentication using the PAR flow with Authelia."""
config = load_config()
client_id = args.client_id or config.get('client_id')
auth_url = args.auth_url or config.get('auth_url')
if not client_id or not auth_url:
print("Error: client-id and auth-url must be provided.")
return
if 'client_id' not in config:
config['client_id'] = client_id
save_config(config)
if 'auth_url' not in config:
config['auth_url'] = auth_url
save_config(config)
try:
openid_config = get_openid_configuration(auth_url, verify=args.cacert or not args.insecure)
except requests.exceptions.HTTPError as e:
print(f"Error retrieving OpenID configuration: {e}")
return
if get_or_create_keys():
print("New keys generated. Update your configuration accordingly.")
key_id = f"{client_id}-key" # Generate key ID based on client ID
client_assertion = generate_client_assertion(client_id, openid_config['token_endpoint'], key_id)
if not client_assertion:
print("Failed to generate a valid client assertion.")
return
if not args.redirect_uri:
print("Error: redirect_uri must be provided.")
return
try:
par_response = par_request(openid_config['pushed_authorization_request_endpoint'], client_id, client_assertion, args.redirect_uri, verify=args.cacert or not args.insecure)
print("PAR Response:", par_response)
except Exception as e:
print(f"Error in Pushed Authorization Request: {e}")
def main():
parser = argparse.ArgumentParser(description="OpenChami Authentication Client")
parser.add_argument('--cacert', type=str, help='Path to a CA certificate bundle file for HTTPS connections')
subparsers = parser.add_subparsers(dest='command')
auth_parser = subparsers.add_parser('auth', help='Authenticate with Authelia')
auth_parser.add_argument('--client-id', type=str, help='Client ID for authentication')
auth_parser.add_argument('--auth-url', type=str, help='Authentication URL')
auth_parser.add_argument('--redirect-uri', type=str, help='Redirect URI for the authorization code flow')
auth_parser.add_argument('--insecure', action='store_true', help='Accept self-signed certificates')
args = parser.parse_args()
if args.command == 'auth':
auth(args)
else:
parser.print_help()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment