Skip to content

Instantly share code, notes, and snippets.

@amertkara
Last active March 13, 2024 11:10
Show Gist options
  • Save amertkara/e294562759ff2755486e to your computer and use it in GitHub Desktop.
Save amertkara/e294562759ff2755486e to your computer and use it in GitHub Desktop.
Amazon SNS Notification Verification with Python, M2Crypto. When the SNS pushes a notification, a receiver should verify the origin/integrity of the push notification (AWS) using the signature and certificate provided in the notification data. The function `verify_sns_notification` below takes the request object and verifies the origin/integrity…
# -*- coding: utf-8 -*-
import json
import urllib2
from M2Crypto import X509
from base64 import b64decode
from M2Crypto.Err import M2CryptoError
SNS_MESSAGE_TYPE_SUB_NOTIFICATION = "SubscriptionConfirmation"
SNS_MESSAGE_TYPE_NOTIFICATION = "Notification"
SNS_MESSAGE_TYPE_UNSUB_NOTIFICATION = "UnsubscribeConfirmation"
def canonical_message_builder(content, format):
""" Builds the canonical message to be verified.
Sorts the fields as a requirement from AWS
Args:
content (dict): Parsed body of the response
format (list): List of the fields that need to go into the message
Returns (str):
canonical message
"""
m = ""
for field in sorted(format):
try:
m += field + "\n" + content[field] + "\n"
except KeyError:
# Build with what you have
pass
return str(m)
def verify_sns_notification(request):
""" Takes a notification request from Amazon push service SNS and verifies the origin of the notification.
Kudos to Artur Rodrigues for suggesting M2Crypto: http://goo.gl/KAgPPc
Args:
request (HTTPRequest): The request object that is passed to the view function
Returns (bool):
True if he message passes the verification, False otherwise
Raises:
ValueError: If the body of the response couldn't be parsed
M2CryptoError: If an error raises during the verification process
URLError: If the SigningCertURL couldn't be opened
"""
cert = None
pubkey = None
canonical_message = None
canonical_sub_unsub_format = ["Message", "MessageId", "SubscribeURL", "Timestamp", "Token", "TopicArn", "Type"]
canonical_notification_format = ["Message", "MessageId", "Subject", "Timestamp", "TopicArn", "Type"]
content = json.loads(request.body)
decoded_signature = b64decode(content["Signature"])
# Depending on the message type, canonical message format varies: http://goo.gl/oSrJl8
if request.META.get("HTTP_X_AMZ_SNS_MESSAGE_TYPE", None) == SNS_MESSAGE_TYPE_SUB_NOTIFICATION or \
request.META.get("HTTP_X_AMZ_SNS_MESSAGE_TYPE", None) == SNS_MESSAGE_TYPE_UNSUB_NOTIFICATION:
canonical_message = canonical_message_builder(content, canonical_sub_unsub_format)
elif request.META.get("HTTP_X_AMZ_SNS_MESSAGE_TYPE", None) == SNS_MESSAGE_TYPE_NOTIFICATION:
canonical_message = canonical_message_builder(content, canonical_notification_format)
else:
raise ValueError("Message Type (%s) is not recognized" % request.META.get("HTTP_X_AMZ_SNS_MESSAGE_TYPE", None))
# Load the certificate and extract the public key
cert = X509.load_cert_string(str(urllib2.urlopen(content["SigningCertURL"]).read()))
pubkey = cert.get_pubkey()
pubkey.reset_context(md='sha1')
pubkey.verify_init()
# Feed the canonical message to sign it with the public key from the certificate
pubkey.verify_update(canonical_message)
# M2Crypto uses EVP_VerifyFinal() from openssl as the underlying verification function.
# http://goo.gl/Bk2G36: "EVP_VerifyFinal() returns 1 for a correct signature, 0 for failure and -1
# if some other error occurred."
verification_result = pubkey.verify_final(decoded_signature)
if verification_result == 1:
return True
elif verification_result == 0:
return False
else:
raise M2CryptoError("Some error occured while verifying the signature.")
@samuelcolvin
Copy link

samuelcolvin commented Mar 12, 2019

pretty ugly python, with python3.7, aiohttp and cryptograph, that whole mess can be rewritten as

async def verify_sns(request, data):
    msg_type = request.headers.get('X-Amz-Sns-Message-Type')
    if msg_type == 'Notification':
        fields = 'Message', 'MessageId', 'Subject', 'Timestamp', 'TopicArn', 'Type'
    else:
        fields = 'Message', 'MessageId', 'SubscribeURL', 'Timestamp', 'Token', 'TopicArn', 'Type'

    try:
        canonical_msg = ''.join(f'{f}\n{data[f]}\n' for f in fields).encode()
        sign_url = data['SigningCertURL']
        decoded_signature = base64.b64decode(data['Signature'])
    except (ValueError, KeyError) as e:
        raise JsonErrors.HTTPForbidden(f'invalid request, error: {e}')

    cache_key = 'sns-signing-url:' + hashlib.md5(sign_url.encode()).hexdigest()
    sign_url = URL(sign_url)
    if sign_url.scheme != 'https' or not sign_url.host.endswith('.amazonaws.com'):
        logger.warning('invalid signing url "%s"', sign_url)
        raise JsonErrors.HTTPForbidden('invalid signing cert url')

    pem_data = await request.app['redis'].get(cache_key, encoding=None)
    if not pem_data:
        async with request.app['http_client'].get(sign_url, raise_for_status=True) as r:
            pem_data = await r.read()
        await request.app['redis'].setex(cache_key, 86400, pem_data)

    cert = x509.load_pem_x509_certificate(pem_data, default_backend())
    pubkey: rsa.RSAPublicKey = cert.public_key()
    try:
        pubkey.verify(decoded_signature, canonical_msg, padding.PKCS1v15(), hashes.SHA1())
    except InvalidSignature:
        raise JsonErrors.HTTPForbidden('invalid signature')

I've also adding signature caching and checked the url.

@aleGpereira
Copy link

I don't think is necessary to say "pretty ugly code" @samuelcolvin. Why the negativity? @amertkara is just sharing their code.

@youngfeldt
Copy link

I prefer amertkara's code. It's more readable and maintainable IMHO.
Taste is subjective, of course, but I prefer code that is simple to follow and maintain over code that is compact.

@iftahio
Copy link

iftahio commented Mar 13, 2024

2024 version

def authenticate_sns_request(request_data: Dict):
    """
    Verifying the signatures of Amazon SNS messages
    """
    # Get the X509 certificate that Amazon SNS used to sign the message.
    sign_url = request_data['SigningCertURL']
    parsed_url = urlparse(sign_url)
    if parsed_url.scheme != 'https' or not parsed_url.hostname.endswith('.amazonaws.com'):
        raise ValueError(f'Invalid SigningCertURL: {sign_url}')
    certificate = requests.get(sign_url).text.encode('utf-8')
    # Extract the public key from the certificate.
    public_key = x509.load_pem_x509_certificate(certificate, default_backend()).public_key()
    # Create the string to sign.
    if request_data['Type'] == 'Notification':
        fields = 'Message', 'MessageId', 'Subject', 'Timestamp', 'TopicArn', 'Type'
    else:
        fields = 'Message', 'MessageId', 'SubscribeURL', 'Timestamp', 'Token', 'TopicArn', 'Type'
    canonical_msg = ''.join(f'{f}\n{request_data[f]}\n' for f in fields).encode()
    # Decode the Signature value from Base64 format.
    decoded_signature = base64.b64decode(request_data['Signature'])
    # Verify the signature with the public key and the canonical message.
    if request_data['SignatureVersion'] == '1':
        public_key.verify(decoded_signature, canonical_msg, PKCS1v15(), hashes.SHA1())
    elif request_data['SignatureVersion'] == '2':
        public_key.verify(decoded_signature, canonical_msg, PKCS1v15(), hashes.SHA256())

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment