Last active
October 11, 2024 16:44
-
-
Save bgdnlp/ba7564742a06a65b6c7d583aba44d356 to your computer and use it in GitHub Desktop.
Sign up and log in to Cognito, check tokens, then call an API. Details: https://www.neant.ro/aws/working-with-cognito-and-api-gateway-in-python.html
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# Demonstrates the use of Python to work with Cognito. | |
# Create a new a user, log in, check tokens and call an API. | |
# The purpose was to learn about Cognito. Security has been | |
# circumvented in the interest of keeping it simple. | |
# Notably, the authentication procedure uses the most insecure | |
# method. This code is not intended for use in production. | |
# | |
# https://www.neant.ro/aws/working-with-cognito-and-api-gateway-in-python.html | |
import argparse | |
import boto3 | |
import requests | |
from botocore.exceptions import ClientError | |
from pprint import pprint | |
from jose import jwt, jwk | |
from jose.utils import base64url_decode | |
def sign_up(): | |
cidp = boto3.client('cognito-idp') | |
try: | |
# Add user to pool | |
sign_up_response = cidp.sign_up( | |
ClientId=deadpool['app_client_id'], | |
Username=deadpool['username'], | |
Password=deadpool['password'], | |
UserAttributes=[{'Name': 'email', | |
'Value': deadpool['email']}]) | |
pprint(sign_up_response) | |
print(" Confirming user...") | |
# Use Admin powers to confirm user. Normally the user would | |
# have to provide a code or click a link received by email | |
confirm_sign_up_response = cidp.admin_confirm_sign_up( | |
UserPoolId=deadpool['user_pool_id'], | |
Username=deadpool['username']) | |
pprint(confirm_sign_up_response) | |
except ClientError as err: | |
# Probably user already exists | |
print(err) | |
def init_auth(): | |
# Log in the user we just created | |
global deadpool | |
cidp = boto3.client('cognito-idp') | |
# This is less secure, but simpler | |
response = cidp.initiate_auth( | |
AuthFlow='USER_PASSWORD_AUTH', | |
AuthParameters={ | |
'USERNAME': deadpool['username'], | |
'PASSWORD': deadpool['password']}, | |
ClientId=deadpool['app_client_id']) | |
print("----- Log in response -----") | |
pprint(response) | |
print("---------------------------") | |
# AWS official docs on using tokens with user pools: | |
# https://amzn.to/2HbmJG6 | |
# If authentication was successful we got three tokens | |
deadpool['jwt_access_token'] = \ | |
response['AuthenticationResult']['AccessToken'] | |
deadpool['jwt_id_token'] = \ | |
response['AuthenticationResult']['IdToken'] | |
deadpool['jwt_refresh_token'] = \ | |
response['AuthenticationResult']['RefreshToken'] | |
def check_token(token): | |
# AWS docs on verifying tokens: | |
# https://amzn.to/2vUwFx7 | |
# Decode token header | |
token_header = jwt.get_unverified_header(token) | |
print('Token header:') | |
pprint(token_header) | |
# Decode token payload | |
token_claims = jwt.get_unverified_claims(token) | |
print('Token claims:') | |
pprint(token_claims) | |
# Verify signature, step by step. | |
# Original (and better) code in this gist: https://bit.ly/2E3fAFP | |
print('Checking key manually') | |
# First, get the JSON Web Key Set, which contains two public | |
# keys corresponding to the two private keys that could | |
# have been used to sign the token. | |
r = requests.get(deadpool['jwks_url']) | |
if r.status_code == 200: | |
jwks = r.json() | |
else: | |
raise 'Did not retrieve JWKS, got {}'.format(r.status_code) | |
# The token header contains a field named 'kid', which stands | |
# for Key ID. The JWKS also contains two 'kid' fields, one for | |
# each key. The 'kid' in the header tells us which public key | |
# must be used to verify the signature. | |
kid = token_header['kid'] | |
# Search the JWKS for the proper public key | |
key_index = -1 | |
for i in range(len(jwks['keys'])): | |
if kid == jwks['keys'][i]['kid']: | |
key_index = i | |
break | |
if key_index == -1: | |
print('Public key not found, can not verify token') | |
else: | |
# Convert public key | |
public_key = jwk.construct(jwks['keys'][key_index]) | |
# Get claims and signature from token | |
claims, encoded_signature = token.rsplit('.', 1) | |
# Verify signature | |
decoded_signature = base64url_decode( | |
encoded_signature.encode('utf-8')) | |
if not public_key.verify(claims.encode("utf8"), | |
decoded_signature): | |
print('Signature verification failed') | |
else: | |
print('Signature successfully verified') | |
def decode_token(token): | |
# Executing decode() on the token will return the header or raise | |
# an error if checking the signature or one of the claims fails. | |
# See https://python-jose.readthedocs.io/en/latest/jwt/api.html | |
pprint(jwt.decode( | |
token, | |
requests.get(deadpool['jwks_url']).json())) | |
def call_api(token): | |
headers = {'Authorization': token} | |
url = deadpool['api_url'] | |
r = requests.post(url, headers=headers) | |
print(r.status_code) | |
print(r.text) | |
if __name__ == '__main__': | |
optparser = argparse.ArgumentParser(description='Cognito demo') | |
optparser.add_argument('-p', | |
'--profile', | |
help='aws credentials profile') | |
args = optparser.parse_args() | |
if args.profile: | |
boto3.setup_default_session(profile_name=args.profile) | |
global deadpool # Yes, global. This isn't production code | |
deadpool = {} | |
# Put in your own values. These are fake | |
deadpool['user_pool_id'] = 'eu-central-1_a5NXAWJDK' | |
deadpool['region'] = 'eu-central-1' | |
deadpool['jwks_url'] = 'https://cognito-idp.{}.amazonaws.com/{}/' \ | |
'.well-known/jwks.json'.format( | |
deadpool['region'], | |
deadpool['user_pool_id']) | |
deadpool['app_client_id'] = '3rb9mhrfqme2lbjepb353jrlml' | |
deadpool['app_client_secret'] = \ | |
'https://stackoverflow.com/questions/1306550/' + \ | |
'calculating-a-sha-hash-with-a-string-secret-key-in-python' | |
deadpool['username'] = 'cognito-py-demo' | |
deadpool['password'] = 'D0lphins!' | |
deadpool['email'] = '[email protected]' | |
deadpool['api_url'] = 'https://' \ | |
'4a48x6598i.execute-api.eu-central-1.amazonaws.com/prod/insert-login' | |
print('Signing up...') | |
sign_up() | |
print('Authenticating...') | |
init_auth() | |
print('"Manually" check access token...') | |
check_token(deadpool['jwt_id_token']) | |
print('Decode token, also runs checks...') | |
decode_token(deadpool['jwt_access_token']) | |
print('POST to API...') | |
call_api(deadpool['jwt_id_token']) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Calculate secret hash if needed:
Add it here: