Last active
January 1, 2025 14:44
-
-
Save jay0lee/fd274e683f9a16c9494ea2483621f050 to your computer and use it in GitHub Desktop.
A simple OIDC IDP AppEngine app. Tested to work with Google Cloud Identity and Workforce Identity Federation. It is NOT fully compliant with OIDC standards. It IS NOT SECURE (you only need the magic word to log in as any user and make any claim) and may not properly guard token values. This is intended for testing/debugging purposes only.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
runtime: python312 | |
handlers: | |
- url: /.* | |
secure: always | |
script: auto |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
''' A simple OIDC IDP AppEngine app. Tested to work with Google Cloud Identity and | |
Workforce Identity Federation. It is NOT fully compliant with OIDC standards. | |
It IS NOT SECURE (you only need the magic word to log in as any user and make any claim) | |
and may not properly guard token values. This is intended for testing/debugging | |
purposes only. A few more points: | |
- Designed to be "stateless" the server doesn't need to remember anything. This saves | |
any need to setup a database and track client sessions. Values are passed to/from clients | |
in tokens and codes. | |
- Enter an email to login with a standard ID token and claims or design your own claims | |
based on the Custom ID Token setting. This is great for testing how your OIDC service | |
provider handles these custom claim details. Values like {now} are replaced with (wait for | |
it) the current timestamp. | |
- client_ids / secrets which our IDP knows about can be edited below (see "CLIENT_IDS ="). | |
You decide which strings you want to use there and then configure the same ID/secret in | |
your service provider config. | |
- The script does validate PKCE to ensure it's talking to the same client the whole time. | |
- A GCP service account attached to AppEngine is used to perform cryptographic operations | |
via GCP's SignJWT method and the public JWK page for the service account. This completely | |
offloads all the crypto operations to Google at no cost! | |
''' | |
__author__ = "Jay Lee" | |
__email__ = "[email protected]" | |
__copyright__ = "Apache 2.0 License" | |
import base64 | |
import calendar | |
import datetime | |
import json | |
from urllib.parse import unquote | |
from flask import Flask, Response, redirect, request | |
import google.auth | |
import google.auth.transport.requests | |
import google.oauth2.id_token | |
from googleapiclient.discovery import build | |
from oauthlib.oauth2.rfc6749.grant_types.authorization_code import code_challenge_method_s256, \ | |
code_challenge_method_plain | |
import requests | |
app = Flask(__name__) | |
# EDIT THESE # | |
MAGIC_WORD = 'shazaam' # use your own magic word | |
CLIENT_IDS = { | |
# 'client_id': { | |
# 'name': 'A Client', | |
# 'client_secret': 'secret', | |
# 'url': 'https://login.serviceprovider.com', | |
# }, | |
'google-workspace-client': { | |
'name': 'Google Workspace', | |
'client_secret': 'something_random', | |
'url': 'https://mail.google.com/a/example.com', | |
}, | |
'gcp-wforce-identity-federation': { | |
'name': 'GCP Workforce Identity Federation', | |
'client_secret': 'gibberish', | |
'url': ('https://auth.cloud.google/signin/locations/global/workforcePools/MYPOOL/' | |
'providers/MYPROVIDER?continueUrl=https://console.cloud.google/'), | |
}, | |
} | |
# STOP EDITING HERE # | |
SA_EMAIL_URL = ('http://metadata.google.internal/computeMetadata/v1/instance' | |
'/service-accounts/default/email') | |
def get_sa_email(): | |
''' Returns the email address of the attached service account. ''' | |
headers = { | |
'Metadata-Flavor': 'Google', | |
} | |
resp = requests.get( | |
SA_EMAIL_URL, | |
headers=headers, | |
timeout=20, | |
) | |
return resp.text | |
def client_id_html(): | |
''' Outputs HTML listing our Client IDs ''' | |
html = '' | |
for v in CLIENT_IDS.values(): | |
html += f'<a href="{v.get("url")}">{v.get("name")}</a><br>\n' | |
return html | |
def sign_jwt(ujwt): | |
''' Signs a JWT with service account key | |
Input: | |
ujwt: a dict object that will form the JWT body | |
Output: | |
string: the signed JWT | |
''' | |
creds, _ = google.auth.default() | |
iamc = build('iamcredentials', | |
'v1', | |
credentials=creds) | |
sa_email = get_sa_email() | |
name = f'projects/-/serviceAccounts/{sa_email}' | |
body = { | |
'payload': json.dumps(ujwt), | |
} | |
resp = iamc.projects().serviceAccounts().signJwt( # pylint: disable=no-member | |
name=name, | |
body=body, | |
).execute() | |
return resp.get('signedJwt') | |
def datetime_to_secs(value): | |
''' convert Python datetime to Unix epoch seconds ''' | |
return calendar.timegm(value.utctimetuple()) | |
class JWTValidateException(Exception): | |
''' custom error for our JWT validation ''' | |
def validate_jwt(jwt): | |
'''Validate a JWT (but not it's audience). | |
Args: | |
jwt: The JWT to validate. | |
Returns: | |
decoded_jwt | |
''' | |
expect_iss = get_base_url() | |
sa_email = get_sa_email() | |
certs_url = f'https://www.googleapis.com/service_accounts/v1/metadata/x509/{sa_email}' | |
try: | |
decoded_jwt = google.oauth2.id_token.verify_token(jwt, | |
google.auth.transport.requests.Request(), | |
certs_url=certs_url) | |
if (got_iss := decoded_jwt.get('iss')) != expect_iss: | |
raise JWTValidateException(f'JWT Issuer invalid. Expected {expect_iss} got {got_iss}') | |
return decoded_jwt | |
except google.auth.exceptions.InvalidValue as err: | |
raise JWTValidateException(f'JWT validation error: {err}') from err | |
except google.auth.exceptions.MalformedError as err: | |
raise JWTValidateException(f'Invalid JWT Error: {err}') from err | |
class CodeChallengeVerifyException(Exception): | |
''' custom error for our Code Challenge verification''' | |
def verify_code_challenge(code_challenge, | |
code_challenge_method, | |
code_verifier): | |
''' Verify a code challenge matches verifier | |
Arguments: | |
code_challenge: string, the PKCE code challenge. | |
code_challenge_method: string, plain or S256. | |
code_verifier: string, the PKCE code verifier. | |
Returns: | |
Tuple: ( | |
True/False: if the code challenge was verified. | |
String: reason for verification failure if any | |
) | |
''' | |
match code_challenge_method.lower(): | |
case 'plain': | |
challenge_passed = code_challenge_method_plain(code_verifier, code_challenge) | |
case 's256': | |
challenge_passed = code_challenge_method_s256(code_verifier, code_challenge) | |
case _: | |
return False, f'server does not support code_challenge_method {code_challenge_method}' | |
if not challenge_passed: | |
return False, (f'failed to verify {code_verifier} with challenge ' | |
f'{code_challenge} using {code_challenge_method}') | |
return True, '' | |
@app.route('/oauth/jwks') | |
def jwks(): | |
''' /oauth/jwks URL returns current public keys used in signing ''' | |
sa_email = get_sa_email() | |
goog_jwk_url = f'https://www.googleapis.com/service_accounts/v1/jwk/{sa_email}' | |
jwk_resp = requests.get(goog_jwk_url, timeout=20) | |
return Response(jwk_resp, | |
mimetype='application/json') | |
def get_base_url(): | |
''' returns the base URL like https://somehost.com ''' | |
host = request.host | |
base_url = f'https://{host}' | |
return base_url | |
@app.route('/.well-known/openid-configuration') | |
def oidc_config(): | |
''' /.well-knwon/openid-configuration URL returns info about our IDP ''' | |
with open('openid-configuration.json', | |
'r', | |
encoding='utf-8') as fpointer: | |
oconfig = json.load(fpointer) | |
base_url = get_base_url() | |
oconfig['issuer'] = base_url | |
oconfig['authorization_endpoint'] = f'{base_url}/authorize' | |
oconfig['token_endpoint'] = f'{base_url}/token' | |
oconfig['userinfo_endpoint'] = f'{base_url}/userinfo' | |
oconfig['jwks_uri'] = f'{base_url}/oauth/jwks' | |
oconfig['registration_url'] = f'{base_url}/registration' | |
resp = json.dumps(oconfig, | |
indent=2, | |
sort_keys=True) | |
return Response(resp, | |
mimetype='application/json') | |
@app.route('/authorize', methods=['GET']) | |
def authorize(): | |
''' /authorize URL displays login page ''' | |
redirect_uri = request.args.get('redirect_uri') | |
state = request.args.get('state') | |
client_id = request.args.get('client_id') | |
if client_id not in CLIENT_IDS: | |
if not client_id: | |
reason = 'No valid client ID set' | |
else: | |
reason = f'{client_id} is not a known Client ID' | |
return f'''<HTML><BODY> | |
{reason}. Try visiting one of these URLs:<br> | |
{client_id_html()} | |
</BODY></HTML> | |
''' | |
nonce = request.args.get('nonce') | |
login_hint = request.args.get('login_hint', '') | |
iss = get_base_url() | |
other_inputs = '' | |
if code_challenge_method := request.args.get('code_challenge_method'): | |
if code_challenge_method.lower() not in ['plain', 's256']: | |
return ('ERROR: I don\'t know how to handle code_challenge_method ' | |
f'"{code_challenge_method}"'), 400 | |
code_challenge = request.args.get('code_challenge') | |
other_inputs = f'''<input type="hidden" | |
id="code_challenge_method" | |
name="code_challenge_method" | |
value="{code_challenge_method}"> | |
<input type="hidden" | |
id="code_challenge" | |
name="code_challenge" | |
value="{code_challenge}">''' | |
response = f'''<HTML> | |
<HEAD></HEAD> | |
<BODY> | |
<form method="POST" action="/login"> | |
<input type="hidden" id="state" name="state" value="{state}"> | |
<input type="hidden" id="redirect_uri" name="redirect_uri" value="{redirect_uri}"> | |
<input type="hidden" id="client_id" name="client_id" value="{client_id}"> | |
<input type="hidden" id="nonce" name="nonce" value="{nonce}">{other_inputs} | |
Email: <input type="email" id="_email" name="_email" value="{login_hint}"><br> | |
Magic Word: <input type="password" id="magicword" name="magicword"><br> | |
<button type="submit">LOGIN</button><br><br> | |
<input type="checkbox" id="use_custom" name="use_custom"> Use a custom ID Token<br> | |
Custom ID Token value:<br> | |
<textarea id="custom_id_token" name="custom_id_token" rows="30" cols="80"> | |
{{ | |
"iat": {{now}}, | |
"exp": {{hourfromnow}}, | |
"sub": "[email protected]", | |
"email": "[email protected]", | |
"iss": "{iss}", | |
"aud": "{client_id}", | |
"nonce": "{nonce}", | |
"profile_photo": "https://dummyimage.com/400x400/ff004d/fff.png&text=WIF", | |
"groups": ["[email protected]", "[email protected]"], | |
"display_name": "OIDC User", | |
"posix_username": "oidc_user" | |
}} | |
</textarea> | |
</form><br> | |
{client_id_html()} | |
</BODY> | |
</HTML>''' | |
return response | |
@app.route('/login', methods=['POST']) | |
def login(): | |
''' /login URL where login info is POSTed ''' | |
redirect_uri = request.form.get('redirect_uri') | |
state = request.form.get('state') | |
client_id = request.form.get('client_id') | |
if client_id not in CLIENT_IDS: | |
return f'''<HTML><BODY> | |
No valid client ID set. Try visiting one of these URLs:<br> | |
{client_id_html()} | |
</BODY></HTML> | |
''' | |
nonce = request.form.get('nonce') | |
magicword = request.form.get('magicword') | |
if magicword != MAGIC_WORD: | |
return 'THAT ISN\'T THE MAGIC WORD!' | |
email = request.form.get('_email') | |
now = datetime.datetime.utcnow() | |
lifetime = datetime.timedelta(seconds=300) | |
expiry = now + lifetime | |
hourfromnow = now + datetime.timedelta(seconds=3600) | |
ucode = { | |
'iat': datetime_to_secs(now), | |
'exp': datetime_to_secs(expiry), | |
'email': email, | |
'client_id': client_id, | |
'nonce': nonce, | |
'iss': get_base_url(), | |
'aud': client_id, | |
} | |
use_custom = request.form.get('use_custom') | |
if use_custom == 'on': | |
ucode['custom_idt'] = request.form.get('custom_id_token') | |
ucode['custom_idt'] = ucode['custom_idt'].replace('{now}', str(datetime_to_secs(now))) \ | |
.replace('{hourfromnow}', str(datetime_to_secs(hourfromnow))) | |
if code_challenge_method := request.form.get('code_challenge_method'): | |
ucode['code_challenge_method'] = code_challenge_method | |
ucode['code_challenge'] = request.form.get('code_challenge') | |
code = sign_jwt(ucode) | |
redirect_uri = f'{redirect_uri}?state={state}&code={code}' | |
return redirect(redirect_uri) | |
@app.route('/token', methods=['POST']) | |
def token(): # pylint: disable=too-many-locals | |
''' /token URL where SP exchnages auth code for ID/access tokens ''' | |
if auth_header := request.headers.get('Authorization'): | |
_, value = auth_header.split(' ') | |
decoded_value = base64.b64decode(value).decode('utf-8') | |
client_id, client_secret = decoded_value.split(':') | |
client_id = unquote(client_id) | |
client_secret = unquote(client_secret) | |
if not client_id: | |
client_id = request.form.get('client_id') | |
client_secret = request.form.get('client_secret') | |
if client_id not in CLIENT_IDS: | |
err = f'ERROR: client_id {client_id} is invalid' | |
print(err) | |
return err, 400 | |
if client_secret != CLIENT_IDS[client_id].get('client_secret'): | |
err = f'ERROR: client_secret {client_secret} is invalid' | |
print(err) | |
return err, 400 | |
auth_code = request.form.get('code') | |
try: | |
decoded_auth_code = validate_jwt(auth_code) | |
except JWTValidateException as err: | |
resp = f'ERROR: {err}' | |
print(resp) | |
return resp, 400 | |
if code_challenge := decoded_auth_code.get('code_challenge'): | |
if code_verifier := request.form.get('code_verifier'): | |
code_challenge_method = decoded_auth_code.get('code_challenge_method') | |
verified, err = verify_code_challenge(code_challenge, | |
code_challenge_method, | |
code_verifier) | |
if not verified: | |
return 'ERROR: {err}', 400 | |
else: | |
return 'ERROR: missing code_verifier', 400 | |
if payload := decoded_auth_code.get('custom_idt'): | |
payload = json.loads(payload) | |
else: | |
email = decoded_auth_code.get('email') | |
aud = decoded_auth_code.get('aud') | |
nonce = decoded_auth_code.get('nonce') | |
now = datetime.datetime.utcnow() | |
lifetime = datetime.timedelta(seconds=3600) | |
expiry = now + lifetime | |
payload = { | |
'iat': datetime_to_secs(now), | |
'exp': datetime_to_secs(expiry), | |
'sub': email, | |
'email': email, | |
'iss': get_base_url(), | |
'aud': aud, | |
'nonce': nonce, | |
'profile_photo': 'https://dummyimage.com/400x400/ff004d/fff.png&text=WIF', | |
'groups': ['users', 'persons', 'humans', 'sapiens', 'employees', 'people'], | |
'display_name': 'wForce User', | |
'posix_username': 'user', | |
} | |
id_token = sign_jwt(payload) | |
return_data = { | |
'id_token': id_token, | |
'access_token': id_token, # HACKY - we're passing id_token as access token | |
'expires_in': 3599, | |
'token_type': 'Bearer', | |
} | |
return Response(json.dumps(return_data, | |
indent=2, | |
sort_keys=True), | |
mimetype='application/json') | |
@app.route('/userinfo') | |
def userinfo(): | |
''' /userinfo URL where SP retrieves info about the user. ''' | |
_, id_token = request.headers.get('Authorization').split(' ') | |
try: | |
decoded = validate_jwt(id_token) | |
except JWTValidateException as err: | |
resp = f'ERROR: {err}' | |
return resp, 400 | |
resp = json.dumps(decoded, | |
indent=2, | |
sort_keys=True) | |
return Response(resp, | |
mimetype='application/json') | |
@app.route('/') | |
def rootpage(): | |
''' / URL redircts to /authorize login page. ''' | |
return redirect('/authorize') | |
if __name__ == '__main__': | |
app.run() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"issuer": "", | |
"authorization_endpoint": "", | |
"token_endpoint": "", | |
"userinfo_endpoint": "", | |
"jwks_uri": "", | |
"registration_endpoint": "", | |
"scopes_supported":[ | |
"openid", | |
"profile", | |
"email" | |
], | |
"response_types_supported":[ | |
"code" | |
], | |
"grant_types_supported":[ | |
"authorization_code" | |
], | |
"subject_types_supported":[ | |
"public" | |
], | |
"id_token_signing_alg_values_supported":[ | |
"RS256" | |
], | |
"token_endpoint_auth_methods_supported":[ | |
"client_secret_post", | |
"client_secret_basic", | |
"client_secret_jwt", | |
"private_key_jwt" | |
], | |
"token_endpoint_auth_signing_alg_values_supported":[ | |
"RS256" | |
], | |
"claims_parameter_supported": false, | |
"request_parameter_supported": false, | |
"request_uri_parameter_supported": false | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Flask | |
requests | |
google-auth | |
google-api-python-client | |
oauthlib | |
pyjwt |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment