Skip to content

Instantly share code, notes, and snippets.

@amertkara
Last active March 13, 2024 11:10
Show Gist options
  • Save amertkara/e294562759ff2755486e to your computer and use it in GitHub Desktop.
Save amertkara/e294562759ff2755486e to your computer and use it in GitHub Desktop.
Amazon SNS Notification Verification with Python, M2Crypto. When the SNS pushes a notification, a receiver should verify the origin/integrity of the push notification (AWS) using the signature and certificate provided in the notification data. The function `verify_sns_notification` below takes the request object and verifies the origin/integrity…
# -*- coding: utf-8 -*-
import json
import urllib2
from M2Crypto import X509
from base64 import b64decode
from M2Crypto.Err import M2CryptoError
SNS_MESSAGE_TYPE_SUB_NOTIFICATION = "SubscriptionConfirmation"
SNS_MESSAGE_TYPE_NOTIFICATION = "Notification"
SNS_MESSAGE_TYPE_UNSUB_NOTIFICATION = "UnsubscribeConfirmation"
def canonical_message_builder(content, format):
""" Builds the canonical message to be verified.
Sorts the fields as a requirement from AWS
Args:
content (dict): Parsed body of the response
format (list): List of the fields that need to go into the message
Returns (str):
canonical message
"""
m = ""
for field in sorted(format):
try:
m += field + "\n" + content[field] + "\n"
except KeyError:
# Build with what you have
pass
return str(m)
def verify_sns_notification(request):
""" Takes a notification request from Amazon push service SNS and verifies the origin of the notification.
Kudos to Artur Rodrigues for suggesting M2Crypto: http://goo.gl/KAgPPc
Args:
request (HTTPRequest): The request object that is passed to the view function
Returns (bool):
True if he message passes the verification, False otherwise
Raises:
ValueError: If the body of the response couldn't be parsed
M2CryptoError: If an error raises during the verification process
URLError: If the SigningCertURL couldn't be opened
"""
cert = None
pubkey = None
canonical_message = None
canonical_sub_unsub_format = ["Message", "MessageId", "SubscribeURL", "Timestamp", "Token", "TopicArn", "Type"]
canonical_notification_format = ["Message", "MessageId", "Subject", "Timestamp", "TopicArn", "Type"]
content = json.loads(request.body)
decoded_signature = b64decode(content["Signature"])
# Depending on the message type, canonical message format varies: http://goo.gl/oSrJl8
if request.META.get("HTTP_X_AMZ_SNS_MESSAGE_TYPE", None) == SNS_MESSAGE_TYPE_SUB_NOTIFICATION or \
request.META.get("HTTP_X_AMZ_SNS_MESSAGE_TYPE", None) == SNS_MESSAGE_TYPE_UNSUB_NOTIFICATION:
canonical_message = canonical_message_builder(content, canonical_sub_unsub_format)
elif request.META.get("HTTP_X_AMZ_SNS_MESSAGE_TYPE", None) == SNS_MESSAGE_TYPE_NOTIFICATION:
canonical_message = canonical_message_builder(content, canonical_notification_format)
else:
raise ValueError("Message Type (%s) is not recognized" % request.META.get("HTTP_X_AMZ_SNS_MESSAGE_TYPE", None))
# Load the certificate and extract the public key
cert = X509.load_cert_string(str(urllib2.urlopen(content["SigningCertURL"]).read()))
pubkey = cert.get_pubkey()
pubkey.reset_context(md='sha1')
pubkey.verify_init()
# Feed the canonical message to sign it with the public key from the certificate
pubkey.verify_update(canonical_message)
# M2Crypto uses EVP_VerifyFinal() from openssl as the underlying verification function.
# http://goo.gl/Bk2G36: "EVP_VerifyFinal() returns 1 for a correct signature, 0 for failure and -1
# if some other error occurred."
verification_result = pubkey.verify_final(decoded_signature)
if verification_result == 1:
return True
elif verification_result == 0:
return False
else:
raise M2CryptoError("Some error occured while verifying the signature.")
@daremon
Copy link

daremon commented Oct 20, 2016

Thanks for that Mert!

Amazon docs describing the verification process: http://docs.aws.amazon.com/sns/latest/dg/SendMessageToHttp.verify.signature.html

@slavaatsig
Copy link

Thanks! But you missing one step in verification.
Amazon documentation states that you must verify notification arrived from Amazon (specifically certificate URL):

Quote:

To help prevent spoofing attacks, you should do the following when verifying messages sent by Amazon SNS:

  • Always use HTTPS when getting the certificate from Amazon SNS.
  • Validate the authenticity of the certificate.
  • Verify the certificate was received from Amazon SNS.
  • When possible, use one of the supported AWS SDKs for Amazon SNS to validate and verify messages. For example, with the AWS SDK for PHP you would use the isValid method from the MessageValidator class.

(emphasize added)

Otherwise, anyone can spoof a request supplying their own certificate making the verification valid while it never arrived from the Amazon but from the evil Eve.

@samuelcolvin
Copy link

samuelcolvin commented Mar 12, 2019

pretty ugly python, with python3.7, aiohttp and cryptograph, that whole mess can be rewritten as

async def verify_sns(request, data):
    msg_type = request.headers.get('X-Amz-Sns-Message-Type')
    if msg_type == 'Notification':
        fields = 'Message', 'MessageId', 'Subject', 'Timestamp', 'TopicArn', 'Type'
    else:
        fields = 'Message', 'MessageId', 'SubscribeURL', 'Timestamp', 'Token', 'TopicArn', 'Type'

    try:
        canonical_msg = ''.join(f'{f}\n{data[f]}\n' for f in fields).encode()
        sign_url = data['SigningCertURL']
        decoded_signature = base64.b64decode(data['Signature'])
    except (ValueError, KeyError) as e:
        raise JsonErrors.HTTPForbidden(f'invalid request, error: {e}')

    cache_key = 'sns-signing-url:' + hashlib.md5(sign_url.encode()).hexdigest()
    sign_url = URL(sign_url)
    if sign_url.scheme != 'https' or not sign_url.host.endswith('.amazonaws.com'):
        logger.warning('invalid signing url "%s"', sign_url)
        raise JsonErrors.HTTPForbidden('invalid signing cert url')

    pem_data = await request.app['redis'].get(cache_key, encoding=None)
    if not pem_data:
        async with request.app['http_client'].get(sign_url, raise_for_status=True) as r:
            pem_data = await r.read()
        await request.app['redis'].setex(cache_key, 86400, pem_data)

    cert = x509.load_pem_x509_certificate(pem_data, default_backend())
    pubkey: rsa.RSAPublicKey = cert.public_key()
    try:
        pubkey.verify(decoded_signature, canonical_msg, padding.PKCS1v15(), hashes.SHA1())
    except InvalidSignature:
        raise JsonErrors.HTTPForbidden('invalid signature')

I've also adding signature caching and checked the url.

@aleGpereira
Copy link

I don't think is necessary to say "pretty ugly code" @samuelcolvin. Why the negativity? @amertkara is just sharing their code.

@youngfeldt
Copy link

I prefer amertkara's code. It's more readable and maintainable IMHO.
Taste is subjective, of course, but I prefer code that is simple to follow and maintain over code that is compact.

@iftahio
Copy link

iftahio commented Mar 13, 2024

2024 version

def authenticate_sns_request(request_data: Dict):
    """
    Verifying the signatures of Amazon SNS messages
    """
    # Get the X509 certificate that Amazon SNS used to sign the message.
    sign_url = request_data['SigningCertURL']
    parsed_url = urlparse(sign_url)
    if parsed_url.scheme != 'https' or not parsed_url.hostname.endswith('.amazonaws.com'):
        raise ValueError(f'Invalid SigningCertURL: {sign_url}')
    certificate = requests.get(sign_url).text.encode('utf-8')
    # Extract the public key from the certificate.
    public_key = x509.load_pem_x509_certificate(certificate, default_backend()).public_key()
    # Create the string to sign.
    if request_data['Type'] == 'Notification':
        fields = 'Message', 'MessageId', 'Subject', 'Timestamp', 'TopicArn', 'Type'
    else:
        fields = 'Message', 'MessageId', 'SubscribeURL', 'Timestamp', 'Token', 'TopicArn', 'Type'
    canonical_msg = ''.join(f'{f}\n{request_data[f]}\n' for f in fields).encode()
    # Decode the Signature value from Base64 format.
    decoded_signature = base64.b64decode(request_data['Signature'])
    # Verify the signature with the public key and the canonical message.
    if request_data['SignatureVersion'] == '1':
        public_key.verify(decoded_signature, canonical_msg, PKCS1v15(), hashes.SHA1())
    elif request_data['SignatureVersion'] == '2':
        public_key.verify(decoded_signature, canonical_msg, PKCS1v15(), hashes.SHA256())

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment