Skip to content

Instantly share code, notes, and snippets.

@amertkara
Last active December 3, 2024 14:44
Show Gist options
  • Save amertkara/e294562759ff2755486e to your computer and use it in GitHub Desktop.
Save amertkara/e294562759ff2755486e to your computer and use it in GitHub Desktop.
Amazon SNS Notification Verification with Python, M2Crypto. When the SNS pushes a notification, a receiver should verify the origin/integrity of the push notification (AWS) using the signature and certificate provided in the notification data. The function `verify_sns_notification` below takes the request object and verifies the origin/integrity…
# -*- coding: utf-8 -*-
import json
import urllib2
from M2Crypto import X509
from base64 import b64decode
from M2Crypto.Err import M2CryptoError
SNS_MESSAGE_TYPE_SUB_NOTIFICATION = "SubscriptionConfirmation"
SNS_MESSAGE_TYPE_NOTIFICATION = "Notification"
SNS_MESSAGE_TYPE_UNSUB_NOTIFICATION = "UnsubscribeConfirmation"
def canonical_message_builder(content, format):
""" Builds the canonical message to be verified.
Sorts the fields as a requirement from AWS
Args:
content (dict): Parsed body of the response
format (list): List of the fields that need to go into the message
Returns (str):
canonical message
"""
m = ""
for field in sorted(format):
try:
m += field + "\n" + content[field] + "\n"
except KeyError:
# Build with what you have
pass
return str(m)
def verify_sns_notification(request):
""" Takes a notification request from Amazon push service SNS and verifies the origin of the notification.
Kudos to Artur Rodrigues for suggesting M2Crypto: http://goo.gl/KAgPPc
Args:
request (HTTPRequest): The request object that is passed to the view function
Returns (bool):
True if he message passes the verification, False otherwise
Raises:
ValueError: If the body of the response couldn't be parsed
M2CryptoError: If an error raises during the verification process
URLError: If the SigningCertURL couldn't be opened
"""
cert = None
pubkey = None
canonical_message = None
canonical_sub_unsub_format = ["Message", "MessageId", "SubscribeURL", "Timestamp", "Token", "TopicArn", "Type"]
canonical_notification_format = ["Message", "MessageId", "Subject", "Timestamp", "TopicArn", "Type"]
content = json.loads(request.body)
decoded_signature = b64decode(content["Signature"])
# Depending on the message type, canonical message format varies: http://goo.gl/oSrJl8
if request.META.get("HTTP_X_AMZ_SNS_MESSAGE_TYPE", None) == SNS_MESSAGE_TYPE_SUB_NOTIFICATION or \
request.META.get("HTTP_X_AMZ_SNS_MESSAGE_TYPE", None) == SNS_MESSAGE_TYPE_UNSUB_NOTIFICATION:
canonical_message = canonical_message_builder(content, canonical_sub_unsub_format)
elif request.META.get("HTTP_X_AMZ_SNS_MESSAGE_TYPE", None) == SNS_MESSAGE_TYPE_NOTIFICATION:
canonical_message = canonical_message_builder(content, canonical_notification_format)
else:
raise ValueError("Message Type (%s) is not recognized" % request.META.get("HTTP_X_AMZ_SNS_MESSAGE_TYPE", None))
# Load the certificate and extract the public key
cert = X509.load_cert_string(str(urllib2.urlopen(content["SigningCertURL"]).read()))
pubkey = cert.get_pubkey()
pubkey.reset_context(md='sha1')
pubkey.verify_init()
# Feed the canonical message to sign it with the public key from the certificate
pubkey.verify_update(canonical_message)
# M2Crypto uses EVP_VerifyFinal() from openssl as the underlying verification function.
# http://goo.gl/Bk2G36: "EVP_VerifyFinal() returns 1 for a correct signature, 0 for failure and -1
# if some other error occurred."
verification_result = pubkey.verify_final(decoded_signature)
if verification_result == 1:
return True
elif verification_result == 0:
return False
else:
raise M2CryptoError("Some error occured while verifying the signature.")
@iftahio
Copy link

iftahio commented Mar 13, 2024

2024 version

def authenticate_sns_request(request_data: Dict):
    """
    Verifying the signatures of Amazon SNS messages
    """
    # Get the X509 certificate that Amazon SNS used to sign the message.
    sign_url = request_data['SigningCertURL']
    parsed_url = urlparse(sign_url)
    if parsed_url.scheme != 'https' or not parsed_url.hostname.endswith('.amazonaws.com'):
        raise ValueError(f'Invalid SigningCertURL: {sign_url}')
    certificate = requests.get(sign_url).text.encode('utf-8')
    # Extract the public key from the certificate.
    public_key = x509.load_pem_x509_certificate(certificate, default_backend()).public_key()
    # Create the string to sign.
    if request_data['Type'] == 'Notification':
        fields = 'Message', 'MessageId', 'Subject', 'Timestamp', 'TopicArn', 'Type'
    else:
        fields = 'Message', 'MessageId', 'SubscribeURL', 'Timestamp', 'Token', 'TopicArn', 'Type'
    canonical_msg = ''.join(f'{f}\n{request_data[f]}\n' for f in fields).encode()
    # Decode the Signature value from Base64 format.
    decoded_signature = base64.b64decode(request_data['Signature'])
    # Verify the signature with the public key and the canonical message.
    if request_data['SignatureVersion'] == '1':
        public_key.verify(decoded_signature, canonical_msg, PKCS1v15(), hashes.SHA1())
    elif request_data['SignatureVersion'] == '2':
        public_key.verify(decoded_signature, canonical_msg, PKCS1v15(), hashes.SHA256())

@webtweakers
Copy link

Actually M2Crypto's methods are deprecated in favor of the cryptography library, so none of the above is up to date.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment