Skip to content

Instantly share code, notes, and snippets.

@jay0lee
Last active January 1, 2025 14:44
Show Gist options
  • Save jay0lee/fd274e683f9a16c9494ea2483621f050 to your computer and use it in GitHub Desktop.
Save jay0lee/fd274e683f9a16c9494ea2483621f050 to your computer and use it in GitHub Desktop.
A simple OIDC IDP AppEngine app. Tested to work with Google Cloud Identity and Workforce Identity Federation. It is NOT fully compliant with OIDC standards. It IS NOT SECURE (you only need the magic word to log in as any user and make any claim) and may not properly guard token values. This is intended for testing/debugging purposes only.
runtime: python312
handlers:
- url: /.*
secure: always
script: auto
#!/usr/bin/env python
''' A simple OIDC IDP AppEngine app. Tested to work with Google Cloud Identity and
Workforce Identity Federation. It is NOT fully compliant with OIDC standards.
It IS NOT SECURE (you only need the magic word to log in as any user and make any claim)
and may not properly guard token values. This is intended for testing/debugging
purposes only. A few more points:
- Designed to be "stateless" the server doesn't need to remember anything. This saves
any need to setup a database and track client sessions. Values are passed to/from clients
in tokens and codes.
- Enter an email to login with a standard ID token and claims or design your own claims
based on the Custom ID Token setting. This is great for testing how your OIDC service
provider handles these custom claim details. Values like {now} are replaced with (wait for
it) the current timestamp.
- client_ids / secrets which our IDP knows about can be edited below (see "CLIENT_IDS =").
You decide which strings you want to use there and then configure the same ID/secret in
your service provider config.
- The script does validate PKCE to ensure it's talking to the same client the whole time.
- A GCP service account attached to AppEngine is used to perform cryptographic operations
via GCP's SignJWT method and the public JWK page for the service account. This completely
offloads all the crypto operations to Google at no cost!
'''
__author__ = "Jay Lee"
__email__ = "[email protected]"
__copyright__ = "Apache 2.0 License"
import base64
import calendar
import datetime
import json
from urllib.parse import unquote
from flask import Flask, Response, redirect, request
import google.auth
import google.auth.transport.requests
import google.oauth2.id_token
from googleapiclient.discovery import build
from oauthlib.oauth2.rfc6749.grant_types.authorization_code import code_challenge_method_s256, \
code_challenge_method_plain
import requests
app = Flask(__name__)
# EDIT THESE #
MAGIC_WORD = 'shazaam' # use your own magic word
CLIENT_IDS = {
# 'client_id': {
# 'name': 'A Client',
# 'client_secret': 'secret',
# 'url': 'https://login.serviceprovider.com',
# },
'google-workspace-client': {
'name': 'Google Workspace',
'client_secret': 'something_random',
'url': 'https://mail.google.com/a/example.com',
},
'gcp-wforce-identity-federation': {
'name': 'GCP Workforce Identity Federation',
'client_secret': 'gibberish',
'url': ('https://auth.cloud.google/signin/locations/global/workforcePools/MYPOOL/'
'providers/MYPROVIDER?continueUrl=https://console.cloud.google/'),
},
}
# STOP EDITING HERE #
SA_EMAIL_URL = ('http://metadata.google.internal/computeMetadata/v1/instance'
'/service-accounts/default/email')
def get_sa_email():
''' Returns the email address of the attached service account. '''
headers = {
'Metadata-Flavor': 'Google',
}
resp = requests.get(
SA_EMAIL_URL,
headers=headers,
timeout=20,
)
return resp.text
def client_id_html():
''' Outputs HTML listing our Client IDs '''
html = ''
for v in CLIENT_IDS.values():
html += f'<a href="{v.get("url")}">{v.get("name")}</a><br>\n'
return html
def sign_jwt(ujwt):
''' Signs a JWT with service account key
Input:
ujwt: a dict object that will form the JWT body
Output:
string: the signed JWT
'''
creds, _ = google.auth.default()
iamc = build('iamcredentials',
'v1',
credentials=creds)
sa_email = get_sa_email()
name = f'projects/-/serviceAccounts/{sa_email}'
body = {
'payload': json.dumps(ujwt),
}
resp = iamc.projects().serviceAccounts().signJwt( # pylint: disable=no-member
name=name,
body=body,
).execute()
return resp.get('signedJwt')
def datetime_to_secs(value):
''' convert Python datetime to Unix epoch seconds '''
return calendar.timegm(value.utctimetuple())
class JWTValidateException(Exception):
''' custom error for our JWT validation '''
def validate_jwt(jwt):
'''Validate a JWT (but not it's audience).
Args:
jwt: The JWT to validate.
Returns:
decoded_jwt
'''
expect_iss = get_base_url()
sa_email = get_sa_email()
certs_url = f'https://www.googleapis.com/service_accounts/v1/metadata/x509/{sa_email}'
try:
decoded_jwt = google.oauth2.id_token.verify_token(jwt,
google.auth.transport.requests.Request(),
certs_url=certs_url)
if (got_iss := decoded_jwt.get('iss')) != expect_iss:
raise JWTValidateException(f'JWT Issuer invalid. Expected {expect_iss} got {got_iss}')
return decoded_jwt
except google.auth.exceptions.InvalidValue as err:
raise JWTValidateException(f'JWT validation error: {err}') from err
except google.auth.exceptions.MalformedError as err:
raise JWTValidateException(f'Invalid JWT Error: {err}') from err
class CodeChallengeVerifyException(Exception):
''' custom error for our Code Challenge verification'''
def verify_code_challenge(code_challenge,
code_challenge_method,
code_verifier):
''' Verify a code challenge matches verifier
Arguments:
code_challenge: string, the PKCE code challenge.
code_challenge_method: string, plain or S256.
code_verifier: string, the PKCE code verifier.
Returns:
Tuple: (
True/False: if the code challenge was verified.
String: reason for verification failure if any
)
'''
match code_challenge_method.lower():
case 'plain':
challenge_passed = code_challenge_method_plain(code_verifier, code_challenge)
case 's256':
challenge_passed = code_challenge_method_s256(code_verifier, code_challenge)
case _:
return False, f'server does not support code_challenge_method {code_challenge_method}'
if not challenge_passed:
return False, (f'failed to verify {code_verifier} with challenge '
f'{code_challenge} using {code_challenge_method}')
return True, ''
@app.route('/oauth/jwks')
def jwks():
''' /oauth/jwks URL returns current public keys used in signing '''
sa_email = get_sa_email()
goog_jwk_url = f'https://www.googleapis.com/service_accounts/v1/jwk/{sa_email}'
jwk_resp = requests.get(goog_jwk_url, timeout=20)
return Response(jwk_resp,
mimetype='application/json')
def get_base_url():
''' returns the base URL like https://somehost.com '''
host = request.host
base_url = f'https://{host}'
return base_url
@app.route('/.well-known/openid-configuration')
def oidc_config():
''' /.well-knwon/openid-configuration URL returns info about our IDP '''
with open('openid-configuration.json',
'r',
encoding='utf-8') as fpointer:
oconfig = json.load(fpointer)
base_url = get_base_url()
oconfig['issuer'] = base_url
oconfig['authorization_endpoint'] = f'{base_url}/authorize'
oconfig['token_endpoint'] = f'{base_url}/token'
oconfig['userinfo_endpoint'] = f'{base_url}/userinfo'
oconfig['jwks_uri'] = f'{base_url}/oauth/jwks'
oconfig['registration_url'] = f'{base_url}/registration'
resp = json.dumps(oconfig,
indent=2,
sort_keys=True)
return Response(resp,
mimetype='application/json')
@app.route('/authorize', methods=['GET'])
def authorize():
''' /authorize URL displays login page '''
redirect_uri = request.args.get('redirect_uri')
state = request.args.get('state')
client_id = request.args.get('client_id')
if client_id not in CLIENT_IDS:
if not client_id:
reason = 'No valid client ID set'
else:
reason = f'{client_id} is not a known Client ID'
return f'''<HTML><BODY>
{reason}. Try visiting one of these URLs:<br>
{client_id_html()}
</BODY></HTML>
'''
nonce = request.args.get('nonce')
login_hint = request.args.get('login_hint', '')
iss = get_base_url()
other_inputs = ''
if code_challenge_method := request.args.get('code_challenge_method'):
if code_challenge_method.lower() not in ['plain', 's256']:
return ('ERROR: I don\'t know how to handle code_challenge_method '
f'"{code_challenge_method}"'), 400
code_challenge = request.args.get('code_challenge')
other_inputs = f'''<input type="hidden"
id="code_challenge_method"
name="code_challenge_method"
value="{code_challenge_method}">
<input type="hidden"
id="code_challenge"
name="code_challenge"
value="{code_challenge}">'''
response = f'''<HTML>
<HEAD></HEAD>
<BODY>
<form method="POST" action="/login">
<input type="hidden" id="state" name="state" value="{state}">
<input type="hidden" id="redirect_uri" name="redirect_uri" value="{redirect_uri}">
<input type="hidden" id="client_id" name="client_id" value="{client_id}">
<input type="hidden" id="nonce" name="nonce" value="{nonce}">{other_inputs}
Email: <input type="email" id="_email" name="_email" value="{login_hint}"><br>
Magic Word: <input type="password" id="magicword" name="magicword"><br>
<button type="submit">LOGIN</button><br><br>
<input type="checkbox" id="use_custom" name="use_custom"> Use a custom ID Token<br>
Custom ID Token value:<br>
<textarea id="custom_id_token" name="custom_id_token" rows="30" cols="80">
{{
"iat": {{now}},
"exp": {{hourfromnow}},
"sub": "[email protected]",
"email": "[email protected]",
"iss": "{iss}",
"aud": "{client_id}",
"nonce": "{nonce}",
"profile_photo": "https://dummyimage.com/400x400/ff004d/fff.png&text=WIF",
"groups": ["[email protected]", "[email protected]"],
"display_name": "OIDC User",
"posix_username": "oidc_user"
}}
</textarea>
</form><br>
{client_id_html()}
</BODY>
</HTML>'''
return response
@app.route('/login', methods=['POST'])
def login():
''' /login URL where login info is POSTed '''
redirect_uri = request.form.get('redirect_uri')
state = request.form.get('state')
client_id = request.form.get('client_id')
if client_id not in CLIENT_IDS:
return f'''<HTML><BODY>
No valid client ID set. Try visiting one of these URLs:<br>
{client_id_html()}
</BODY></HTML>
'''
nonce = request.form.get('nonce')
magicword = request.form.get('magicword')
if magicword != MAGIC_WORD:
return 'THAT ISN\'T THE MAGIC WORD!'
email = request.form.get('_email')
now = datetime.datetime.utcnow()
lifetime = datetime.timedelta(seconds=300)
expiry = now + lifetime
hourfromnow = now + datetime.timedelta(seconds=3600)
ucode = {
'iat': datetime_to_secs(now),
'exp': datetime_to_secs(expiry),
'email': email,
'client_id': client_id,
'nonce': nonce,
'iss': get_base_url(),
'aud': client_id,
}
use_custom = request.form.get('use_custom')
if use_custom == 'on':
ucode['custom_idt'] = request.form.get('custom_id_token')
ucode['custom_idt'] = ucode['custom_idt'].replace('{now}', str(datetime_to_secs(now))) \
.replace('{hourfromnow}', str(datetime_to_secs(hourfromnow)))
if code_challenge_method := request.form.get('code_challenge_method'):
ucode['code_challenge_method'] = code_challenge_method
ucode['code_challenge'] = request.form.get('code_challenge')
code = sign_jwt(ucode)
redirect_uri = f'{redirect_uri}?state={state}&code={code}'
return redirect(redirect_uri)
@app.route('/token', methods=['POST'])
def token(): # pylint: disable=too-many-locals
''' /token URL where SP exchnages auth code for ID/access tokens '''
if auth_header := request.headers.get('Authorization'):
_, value = auth_header.split(' ')
decoded_value = base64.b64decode(value).decode('utf-8')
client_id, client_secret = decoded_value.split(':')
client_id = unquote(client_id)
client_secret = unquote(client_secret)
if not client_id:
client_id = request.form.get('client_id')
client_secret = request.form.get('client_secret')
if client_id not in CLIENT_IDS:
err = f'ERROR: client_id {client_id} is invalid'
print(err)
return err, 400
if client_secret != CLIENT_IDS[client_id].get('client_secret'):
err = f'ERROR: client_secret {client_secret} is invalid'
print(err)
return err, 400
auth_code = request.form.get('code')
try:
decoded_auth_code = validate_jwt(auth_code)
except JWTValidateException as err:
resp = f'ERROR: {err}'
print(resp)
return resp, 400
if code_challenge := decoded_auth_code.get('code_challenge'):
if code_verifier := request.form.get('code_verifier'):
code_challenge_method = decoded_auth_code.get('code_challenge_method')
verified, err = verify_code_challenge(code_challenge,
code_challenge_method,
code_verifier)
if not verified:
return 'ERROR: {err}', 400
else:
return 'ERROR: missing code_verifier', 400
if payload := decoded_auth_code.get('custom_idt'):
payload = json.loads(payload)
else:
email = decoded_auth_code.get('email')
aud = decoded_auth_code.get('aud')
nonce = decoded_auth_code.get('nonce')
now = datetime.datetime.utcnow()
lifetime = datetime.timedelta(seconds=3600)
expiry = now + lifetime
payload = {
'iat': datetime_to_secs(now),
'exp': datetime_to_secs(expiry),
'sub': email,
'email': email,
'iss': get_base_url(),
'aud': aud,
'nonce': nonce,
'profile_photo': 'https://dummyimage.com/400x400/ff004d/fff.png&text=WIF',
'groups': ['users', 'persons', 'humans', 'sapiens', 'employees', 'people'],
'display_name': 'wForce User',
'posix_username': 'user',
}
id_token = sign_jwt(payload)
return_data = {
'id_token': id_token,
'access_token': id_token, # HACKY - we're passing id_token as access token
'expires_in': 3599,
'token_type': 'Bearer',
}
return Response(json.dumps(return_data,
indent=2,
sort_keys=True),
mimetype='application/json')
@app.route('/userinfo')
def userinfo():
''' /userinfo URL where SP retrieves info about the user. '''
_, id_token = request.headers.get('Authorization').split(' ')
try:
decoded = validate_jwt(id_token)
except JWTValidateException as err:
resp = f'ERROR: {err}'
return resp, 400
resp = json.dumps(decoded,
indent=2,
sort_keys=True)
return Response(resp,
mimetype='application/json')
@app.route('/')
def rootpage():
''' / URL redircts to /authorize login page. '''
return redirect('/authorize')
if __name__ == '__main__':
app.run()
{
"issuer": "",
"authorization_endpoint": "",
"token_endpoint": "",
"userinfo_endpoint": "",
"jwks_uri": "",
"registration_endpoint": "",
"scopes_supported":[
"openid",
"profile",
"email"
],
"response_types_supported":[
"code"
],
"grant_types_supported":[
"authorization_code"
],
"subject_types_supported":[
"public"
],
"id_token_signing_alg_values_supported":[
"RS256"
],
"token_endpoint_auth_methods_supported":[
"client_secret_post",
"client_secret_basic",
"client_secret_jwt",
"private_key_jwt"
],
"token_endpoint_auth_signing_alg_values_supported":[
"RS256"
],
"claims_parameter_supported": false,
"request_parameter_supported": false,
"request_uri_parameter_supported": false
}
Flask
requests
google-auth
google-api-python-client
oauthlib
pyjwt
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment