-
-
Save rondomondo/b44660ec9acb0ae342733cada96b1518 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python | |
import os | |
import time | |
import json | |
import base64 | |
import requests | |
import argparse | |
from base64 import urlsafe_b64decode, b64decode | |
from Crypto.Hash import SHA256, SHA512 | |
from Crypto.Signature import PKCS1_v1_5 | |
from Crypto.PublicKey import RSA | |
from Crypto.Util.number import bytes_to_long | |
import jwt | |
u""" | |
Two methods/examples of how to decode and verify the signature of AWS cognito JWT web tokens externally | |
""" | |
def maybe_pad(s): | |
return (s + '=' * (4 - len(s) % 4)) | |
def get_token_segments(token): | |
u""" | |
A valid token will have at least two seqments. Segments are delimited by | |
a period '.' | |
""" | |
header, payload, signature = token.split(".") | |
u""" | |
The JWT spec tells us the header MUST be urlsafe B64 encoded. Decode it | |
but first add any padding (by adding one or more =) that may be needed | |
https://tools.ietf.org/html/rfc7519 | |
Get the header json object that was stringified, it will be returned a | |
string of bytes | |
""" | |
header_json_str = urlsafe_b64decode(maybe_pad(header)) | |
u""" | |
get the payload json object that was stringified, it will be returned a | |
string of bytes | |
""" | |
payload_json_str = urlsafe_b64decode(maybe_pad(payload)) | |
u""" | |
get the signature that was stringified, it will be returned a string of | |
bytes. It is not an object but rather the signature byte string, so full | |
of non printable characters | |
""" | |
signature_bytes = urlsafe_b64decode(maybe_pad(signature)) | |
u""" | |
convert header and payload back into objects. The signature is already | |
a byte string | |
NB: The order of the keys in the dict/object that results from the | |
json.loads call will not be ordered in any way so watch out if you | |
expect the transformations to be reversable | |
object -> json.dumps -> string <==> string -> json.loads -> object | |
This can trip you up if you decode a header and payload, then try to recode | |
it and expect the signature to work out. | |
""" | |
header_json = json.loads(header_json_str, 'utf-8') | |
payload_json = json.loads(payload_json_str, 'utf-8') | |
return header_json, payload_json, signature_bytes | |
def get_EXP(token): | |
payload_json = get_token_segments(token)[1] | |
exp = payload_json['exp'] | |
print "Time now: %s" % (time.strftime('%Y-%m-%d %H:%M:%S', | |
time.localtime(time.time()))) | |
print "Expires: %s" % (time.strftime('%Y-%m-%d %H:%M:%S', | |
time.localtime(exp))) | |
return exp | |
def get_AUD(token): | |
payload_json = get_token_segments(token)[1] | |
aud = payload_json['aud'] | |
return aud | |
def get_ISS(token): | |
payload_json = get_token_segments(token)[1] | |
iss = payload_json['iss'] | |
return iss | |
def get_ALG(token): | |
header_json = get_token_segments(token)[0] | |
alg = header_json['alg'] | |
return alg | |
def get_KID(token): | |
header_json = get_token_segments(token)[0] | |
kid = header_json['kid'] | |
return kid | |
def get_modulus_and_exponant(jwk_sets, kid, algorithm, force_fail=False): | |
print("Looking for kid=%s algo=%s in the jwt key sets" % (kid, algorithm)) | |
for jwks in jwk_sets['keys']: | |
if (force_fail and jwks['kid'] != kid) or (jwks['kid'] == kid and | |
jwks['alg'] == algorithm): | |
e_b64 = jwks['e'] | |
n_b64 = jwks['n'] | |
e_bytes = b64decode(e_b64) | |
n_bytes = b64decode(n_b64) | |
exponant = bytes_to_long(e_bytes) | |
modulus = bytes_to_long(n_bytes) | |
return modulus, exponant | |
def get_jwks_json(token): | |
iss = get_ISS(token) | |
url = "%s%s" % (iss, '/.well-known/jwks_uri') | |
print("jwks_uri url: %s" % url) | |
hfn = SHA256.new(url).hexdigest() | |
if not os.path.exists("/tmp/%s" % (hfn)): | |
r = requests.get(url) | |
if r.status_code == 200: | |
with open("/tmp/%s" % (hfn), "w") as outfile: | |
outfile.write(json.dumps(r.json())) | |
return r.json() | |
else: | |
with open("/tmp/%s" % (hfn), "r") as infile: | |
return json.loads(infile.read()) | |
def construct_RSA_publickey(exponant, modulus): | |
publicKey = RSA.construct((exponant, modulus)) | |
return publicKey.publickey().exportKey(), publicKey.publickey() | |
def main(token): | |
u""" the upstream base64 decode routines expect sr types so convert if needed """ | |
if type (token) = unicode: | |
token = token.encode('utf-8') | |
u""" | |
Extract the KeyID and some other useful information to validate the token. | |
See... | |
http://self-issued.info/docs/draft-jones-json-web-token-01.html#ReservedClaimName | |
Note: For other than this demo case, in real world uses we would obviously not | |
check the validity of AUD, ISS against itself but rather values you expect | |
""" | |
kid = get_KID(token) | |
alg = get_ALG(token) | |
aud = get_AUD(token) | |
get_EXP(token) | |
get_ISS(token) | |
u""" | |
The AWS Cognito JWT is digitally signed by the private key | |
half of the ISSUERS RSA key pair. We can find who the ISSUER | |
was by looking for the 'iss' key in payload. To verify the token | |
signature there are a few basic steps. | |
Step 1: | |
Get the corrosponding Public Key half of the RSA key pair that | |
signed the token. | |
We get it from the URL addressed via: | |
ISS + '/.well-known/jwks_uri' | |
ie: https://cognito-identity.amazonaws.com/.well-known/jwks_uri | |
""" | |
jwk_sets = get_jwks_json(token) | |
u""" | |
The particular key we want is the key set that matches the 'kid' in the | |
token header. It'll look something like this: | |
{ | |
"kty": "RSA", | |
"alg": "RS512", | |
"use": "sig", | |
"kid": "ap-southeast-22", | |
"n": "AJZzNUBnF1H6rFFiqJbiziWW7VVbyo............Ws35b7", | |
"e": "AQAB" | |
} | |
Step 2: | |
Note the key type and hash algorithm. Extract the modulus | |
(the n value) and the exponant (the e value) from the key set | |
""" | |
modulus, exponant = get_modulus_and_exponant(jwk_sets, kid, alg) | |
u""" Using the modulus and exponant construct the Public key and | |
return it in PEM format. It will look something like this: | |
-----BEGIN PUBLIC KEY----- | |
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAi8sT+HiH1d0BXLLQLt+f | |
Vldnca3phPYs+weygJQaA8BUmcsmM9GPd1IjZSaVZotpxKgdh4UAF/GPxhE6cT1+ | |
mIa2jktx3J+5EoRP02/lRpmnSQxJKgXvBeKenTsAJRuf5kTciZBHXqvX9D+PcAPg | |
KY3uBWOTn4RnNUJNC0DMlknz8SAI8UThgDRDZSAW0GNme3hIjxOWOKQGpSY0NUrK | |
OHbIj6bh9A78tk4Roj9oY5Zh6fhGs77/eFNiTvdv6gUI+cinWws1SZ0AfOMiBZgI | |
LaoHAL61FaLvTrl5rYpiP6Q00V69cVgyumHdTWbGoNlLMg68RciVmqWE6g5zk2ZY | |
xwIDAQAB | |
-----END PUBLIC KEY----- | |
""" | |
pem, publicKey = construct_RSA_publickey(modulus, exponant) | |
u""" | |
Step 3a | |
Using the pyjwt module we can now try to decode & verify the token | |
#pip install pyjwt | |
Use the correct AUD, PEM etc., values below as required. In this case they will | |
always be right because we just extrated from the token itself. | |
""" | |
payload_decoded_and_verified = jwt.decode(token, pem, audience=aud, | |
algorithms=[alg], verify=True) | |
u""" | |
possible errors/exceptions from pyjwt | |
jwt.exceptions.ExpiredSignatureError: Signature has expired | |
see the u'exp': 1483323209 value in the payload | |
jwt.exceptions.DecodeError: Signature verification failed | |
""" | |
if payload_decoded_and_verified: | |
print ("verify successful.\npayload:\n%s\n" % | |
(payload_decoded_and_verified)) | |
else: | |
print ("verify failed") | |
u""" | |
Or, alternatively, using the PKCS1_v1_5 module you can also verify it. | |
Step 3b | |
Note: One thing to watch out for here is that the order of the keys in the | |
header payload matters, so if you decode a header from a token to a dict eg | |
dict = json.loads(base64.urlsafe_b64decode(header)) and then encode it back | |
the order of the keys may be different as a python dict is unordered | |
With that in mind, using PKCS1_v1_5 we can try to verify | |
""" | |
header_base64 = token.split(".")[0] | |
payload_base64 = token.split(".")[1] | |
signature_base64 = token.split(".")[-1] | |
signature = base64.urlsafe_b64decode(maybe_pad(signature_base64)) | |
hash_object = SHA512.new(b'' + header_base64 + b'.' + b'' + payload_base64) | |
verifier = PKCS1_v1_5.new(publicKey) | |
u""" | |
Notice here it is the hash object and not the digest that is supplied to verify | |
""" | |
verified = verifier.verify(hash_object, signature) | |
print ("Signature verification result using PKCS1_v1_5: %s" % (verified)) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--token', help='a JWT or JWS token.', required=True) | |
args = parser.parse_args() | |
main(args.token) |
@richmondwang Can you post a gist of doing this with python-jose using a RSA key as used with AWS Cognito?
Sorry for the very late reply. But here is my simple Gist for doing this in Python Jose package.
https://gist.github.com/richmondwang/b34ea009e5d102573cb111910212ea14
You can get your JWK Set by using your token issuer and appending .well-known/jwks.json
, for example https://cognito-idp.ap-northeast-1.amazonaws.com/ap-northeast-1_CXt8HlAVk/.well-known/jwks.json
.
The KeyId you are using in decoding the token should match the kid
in the token headers because that is its public rsa key.
Thanks for the python-jose gist. I went ahead and implemented the rest of the things necessary to perform the validation of AWS tokens (getting the JWK set from AWS, pulling the kid from the token, multiple ways of finding the client_id for the audience, etc..)
Result is here: https://gist.github.com/jdalegonzalez/0ca2272c015266e667924836364349c3
Comments wecome
Great gist @bjinwright, @richmondwang, @jdalegonzalez
@richmondwang How do you perform verification of signature? I do not think it is safe to decode a jwt token that comes from unverified sources
I am getting "incorrect padding" while doing n_bytes=base64decode(n_64), so I tried in 2 ways:
a) added padding using
missing_padding = len(n_b64)%4
if missing_padding:
n_b64 += '=' * (4-missing_padding)
and
b) n_bytes = bytes(n_64, 'utf-8')
but no matter which step I follow, it is failing in jwt.decode() step stating "Invalid signature".
In the screenshot, I didn't pass the "aud", but I also tried that and it got failed.
Note: For the "aud", I am using payload['sub'] because I am not receiving "aud" in my JWT token.
@jagadish432 did you manage to solve? I'm having the same problems: /
@jagadish432 did you manage to solve? I'm having the same problems: /
@jonathanabila yes I used the below settings to make it work, let me know if you need any more info.
self.jwtOptions = {
'verify_signature': True,
'verify_exp': True,
'verify_iat': True,
'verify_aud': False
}
payload_decoded_and_verified = jwt.decode(accessToken, key=self.pem, algorithms=[self.header['alg']],
verify=True, options=self.jwtOptions)
Is there an easy way to get past this error: "Unable to import module 'lambda_function': No module named 'jose'"
Or you can also use python-jose package :)