Last active
October 8, 2024 17:24
-
-
Save taylorhughes/3968575b40dd97f851f35892931ebf3e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Any, Dict, List, Optional | |
from OpenSSL import crypto | |
import jwt | |
from jwt.utils import base64url_decode | |
import requests | |
import logging | |
ROOT_CER_URL = "https://www.apple.com/certificateauthority/AppleRootCA-G3.cer" | |
G6_CER_URL = "https://www.apple.com/certificateauthority/AppleWWDRCAG6.cer" | |
def get_validated_jwt_content(apple_jwt: str) -> Optional[Dict[str, Any]]: | |
# Fetch the well-known/expected root & intermediate keys from Apple: | |
root_cert_bytes: bytes = requests.get(ROOT_CER_URL).content | |
root_cert = crypto.load_certificate(crypto.FILETYPE_ASN1, root_cert_bytes) | |
g6_cert_bytes: bytes = requests.get(G6_CER_URL).content | |
g6_cert = crypto.load_certificate(crypto.FILETYPE_ASN1, g6_cert_bytes) | |
# Get the signing keys out of the JWT header. The header will look like: | |
# {"alg": "ES256", "x5c": ["...base64 cert...", "...base64 cert..."]} | |
header = jwt.get_unverified_header(apple_jwt) | |
alg = header['alg'] # ES256 | |
provided_certificates: List[crypto.X509] = [] | |
certificate_names: List[Dict[bytes, bytes]] = [] | |
for cert_base64 in header['x5c']: | |
cert_bytes = base64url_decode(cert_base64) | |
another_cert = crypto.load_certificate(crypto.FILETYPE_ASN1, cert_bytes) | |
# To see the certificate chain by name, which corresponds to certs you can fetch: | |
# https://www.apple.com/certificateauthority/ | |
# | |
# Prints <X509Name object '/CN=Apple Root CA - G3/OU=Apple Certification Authority/O=Apple Inc./C=US'>: | |
certificate_names.append(dict(another_cert.get_subject().get_components())) | |
provided_certificates.append(another_cert) | |
# Verify that the root & intermediate keys are what we expect from Apple: | |
assert certificate_names[-1][b'CN'] == b'Apple Root CA - G3', f'Root cert changed: {certificate_names[-1]}' | |
assert certificate_names[-2][b'OU'] == b'G6', f'Intermediate cert changed: {certificate_names[-2]}' | |
assert provided_certificates[-2].digest('sha256') == g6_cert.digest('sha256') | |
assert provided_certificates[-1].digest('sha256') == root_cert.digest('sha256') | |
# Validate that the cert chain is cryptographically legit: | |
store = crypto.X509Store() | |
store.add_cert(root_cert) | |
store.add_cert(g6_cert) | |
for cert in provided_certificates[:-2]: | |
try: | |
crypto.X509StoreContext(store, cert).verify_certificate() | |
except crypto.X509StoreContextError: | |
logging.error("Invalid certificate chain in JWT: %s", apple_jwt) | |
return None | |
store.add_cert(cert) | |
# Now that the cert is validated, we can use it to verify the actual signature | |
# of the JWT. PyJWT does not understand this certificate if we pass it in, so | |
# we have to get the cryptography library's version of the same key: | |
cryptography_version_of_key = provided_certificates[0].get_pubkey().to_cryptography_key() | |
try: | |
return jwt.decode(apple_jwt, cryptography_version_of_key, algorithms=["ES256"]) | |
except Exception: | |
logging.exception("Problem validating Apple JWT") | |
return None |
@ascendant-david done
Thank you very much for making this function. Very helpful!
@ascendant-david glad it helped somebody, it took me multiple days to figure this all out because I could not believe the stunning lack of documentation from Apple...
awesome code! saved me a lot!
Thank you very much. I used this in addition with apple store server library, it wasn't loading local certificates for some reason.
from appstoreserverlibrary.api_client import AppStoreServerAPIClient, APIException
from appstoreserverlibrary.models.Environment import Environment
from appstoreserverlibrary.signed_data_verifier import SignedDataVerifier
import requests
ROOT_CER_URL = "https://www.apple.com/certificateauthority/AppleRootCA-G3.cer"
G6_CER_URL = "https://www.apple.com/certificateauthority/AppleWWDRCAG6.cer"
root_cert_bytes: bytes = requests.get(ROOT_CER_URL).content
g6_cert_bytes: bytes = requests.get(G6_CER_URL).content
client = AppStoreServerAPIClient(private_key_bytes, key_id, issuer_id, bundle_id, environment)
verifier = SignedDataVerifier([root_cert_bytes, g6_cert_bytes], False, environment, bundle_id)
try:
response = client.get_transaction_info(<txn-id>)
decoded_txn = verifier.verify_and_decode_signed_transaction(response.signedTransactionInfo)
print(decoded_txn)
except APIException as e:
print(e)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Argument for the function should be changed from "apple_storekit_2_jwt" to "apple_jwt".