Skip to content

Instantly share code, notes, and snippets.

@dhrp
Last active November 5, 2024 12:18
Show Gist options
  • Save dhrp/10e3564d90eb36035ed07d1b67b98f3c to your computer and use it in GitHub Desktop.
Save dhrp/10e3564d90eb36035ed07d1b67b98f3c to your computer and use it in GitHub Desktop.
this is a minimal demonstration of how to authenticate a python CLI client to an oauth2 IDP
IDP_BASE_URL=https://<your subdomain>.zitadel.cloud/oauth/v2/
CLIENT_ID=<the client id>
#!/usr/bin/env python3
## note:
# This code is adapted from: https://github.com/gateley-auth0/CLI-PKCE
#
# Documentation on how to configure with Zitadel Here:
# https://zitadel.com/blog/secure-logins-with-zitadel-part-2#233-exchange-the-authorization-code-for-the-access-token
import base64
import hashlib
import json
import os
import pathlib
import requests
import secrets
import threading
import urllib
import webbrowser
from time import sleep
from werkzeug.serving import make_server
import dotenv
from flask import Flask, request
app = Flask(__name__)
####
# Configure the Flask server
####
@app.route("/callback")
def callback():
"""
The callback is invoked after a completed login attempt (succesful or otherwise).
It sets global variables with the auth code or error messages, then sets the
polling flag received_callback.
:return:
"""
global received_callback, code, error_message, received_state
error_message = None
code = None
if 'error' in request.args:
error_message = request.args['error'] + ': ' + request.args['error_description']
else:
code = request.args['code']
received_state = request.args['state']
received_callback = True
return "Please return to your application now."
class ServerThread(threading.Thread):
"""
The Flask server is done this way to allow shutting down after a single request has been received.
"""
def __init__(self, app):
threading.Thread.__init__(self)
self.srv = make_server('127.0.0.1', 5000, app)
self.ctx = app.app_context()
self.ctx.push()
def run(self):
print('starting server')
self.srv.serve_forever()
def shutdown(self):
self.srv.shutdown()
def auth0_url_encode(byte_data):
"""
Safe encoding handles + and /, and also replace = with nothing
:param byte_data:
:return:
"""
return base64.urlsafe_b64encode(byte_data).decode('utf-8').replace('=', '')
def generate_challenge(a_verifier):
return auth0_url_encode(hashlib.sha256(a_verifier.encode()).digest())
####
# Take the actions to log in
####
env_path = pathlib.Path('.') / '.env'
dotenv.load_dotenv(dotenv_path=env_path)
verifier = auth0_url_encode(secrets.token_bytes(32))
challenge = generate_challenge(verifier)
state = auth0_url_encode(secrets.token_bytes(32))
client_id = os.getenv('CLIENT_ID')
redirect_uri = 'http://127.0.0.1:5000/callback'
# We generate a nonce (state) that is used to protect against attackers invoking the callback
base_url = os.getenv('IDP_BASE_URL') + '/authorize?'
url_parameters = {
'client_id': client_id,
'redirect_uri': redirect_uri,
'scope': 'profile openid email',
'response_type': 'code',
'response_mode': 'query',
'code_challenge_method': 'S256',
'code_challenge': challenge.replace('=', ''),
'state': state,
}
url = base_url + urllib.parse.urlencode(url_parameters)
# Open the browser window to the login url
# Start the server
# Poll til the callback has been invoked
received_callback = False
webbrowser.open_new(url)
server = ServerThread(app)
server.start()
while not received_callback:
sleep(1)
server.shutdown()
if state != received_state:
print("Error: session replay or similar attack in progress. Please log out of all connections.")
exit(-1)
if error_message:
print("An error occurred:")
print(error_message)
exit(-1)
# Exchange the code for a token
url = os.getenv('IDP_BASE_URL') + 'token'
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
body = {'grant_type': 'authorization_code',
'code': code,
'redirect_uri': redirect_uri,
'client_id': client_id,
'scope': 'profile openid email',
'code_verifier': verifier,
}
r = requests.post(url, headers=headers, data=body)
if r.status_code != 200:
print("Error: failed to exchange code for token")
print(r.text)
exit(-1)
data = r.json()
print("Access token: " + data['access_token'])
####
# Demonstrate that the token is valid by retrieving information about the user.
####
url = 'https://whiffle-development-5ngrhx.zitadel.cloud/oidc/v1/userinfo'
headers = {'Authorization': 'Bearer %s' % data['access_token']}
r = requests.get(url, headers=headers)
data = r.json()
print("Success! Your name is: " + data['name'])
flask
python-dotenv
requests
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment