Last active
March 13, 2024 11:10
-
-
Save amertkara/e294562759ff2755486e to your computer and use it in GitHub Desktop.
Amazon SNS Notification Verification with Python, M2Crypto. When the SNS pushes a notification, a receiver should verify the origin/integrity of the push notification (AWS) using the signature and certificate provided in the notification data. The function `verify_sns_notification` below takes the request object and verifies the origin/integrity…
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import json | |
import urllib2 | |
from M2Crypto import X509 | |
from base64 import b64decode | |
from M2Crypto.Err import M2CryptoError | |
SNS_MESSAGE_TYPE_SUB_NOTIFICATION = "SubscriptionConfirmation" | |
SNS_MESSAGE_TYPE_NOTIFICATION = "Notification" | |
SNS_MESSAGE_TYPE_UNSUB_NOTIFICATION = "UnsubscribeConfirmation" | |
def canonical_message_builder(content, format): | |
""" Builds the canonical message to be verified. | |
Sorts the fields as a requirement from AWS | |
Args: | |
content (dict): Parsed body of the response | |
format (list): List of the fields that need to go into the message | |
Returns (str): | |
canonical message | |
""" | |
m = "" | |
for field in sorted(format): | |
try: | |
m += field + "\n" + content[field] + "\n" | |
except KeyError: | |
# Build with what you have | |
pass | |
return str(m) | |
def verify_sns_notification(request): | |
""" Takes a notification request from Amazon push service SNS and verifies the origin of the notification. | |
Kudos to Artur Rodrigues for suggesting M2Crypto: http://goo.gl/KAgPPc | |
Args: | |
request (HTTPRequest): The request object that is passed to the view function | |
Returns (bool): | |
True if he message passes the verification, False otherwise | |
Raises: | |
ValueError: If the body of the response couldn't be parsed | |
M2CryptoError: If an error raises during the verification process | |
URLError: If the SigningCertURL couldn't be opened | |
""" | |
cert = None | |
pubkey = None | |
canonical_message = None | |
canonical_sub_unsub_format = ["Message", "MessageId", "SubscribeURL", "Timestamp", "Token", "TopicArn", "Type"] | |
canonical_notification_format = ["Message", "MessageId", "Subject", "Timestamp", "TopicArn", "Type"] | |
content = json.loads(request.body) | |
decoded_signature = b64decode(content["Signature"]) | |
# Depending on the message type, canonical message format varies: http://goo.gl/oSrJl8 | |
if request.META.get("HTTP_X_AMZ_SNS_MESSAGE_TYPE", None) == SNS_MESSAGE_TYPE_SUB_NOTIFICATION or \ | |
request.META.get("HTTP_X_AMZ_SNS_MESSAGE_TYPE", None) == SNS_MESSAGE_TYPE_UNSUB_NOTIFICATION: | |
canonical_message = canonical_message_builder(content, canonical_sub_unsub_format) | |
elif request.META.get("HTTP_X_AMZ_SNS_MESSAGE_TYPE", None) == SNS_MESSAGE_TYPE_NOTIFICATION: | |
canonical_message = canonical_message_builder(content, canonical_notification_format) | |
else: | |
raise ValueError("Message Type (%s) is not recognized" % request.META.get("HTTP_X_AMZ_SNS_MESSAGE_TYPE", None)) | |
# Load the certificate and extract the public key | |
cert = X509.load_cert_string(str(urllib2.urlopen(content["SigningCertURL"]).read())) | |
pubkey = cert.get_pubkey() | |
pubkey.reset_context(md='sha1') | |
pubkey.verify_init() | |
# Feed the canonical message to sign it with the public key from the certificate | |
pubkey.verify_update(canonical_message) | |
# M2Crypto uses EVP_VerifyFinal() from openssl as the underlying verification function. | |
# http://goo.gl/Bk2G36: "EVP_VerifyFinal() returns 1 for a correct signature, 0 for failure and -1 | |
# if some other error occurred." | |
verification_result = pubkey.verify_final(decoded_signature) | |
if verification_result == 1: | |
return True | |
elif verification_result == 0: | |
return False | |
else: | |
raise M2CryptoError("Some error occured while verifying the signature.") |
I don't think is necessary to say "pretty ugly code" @samuelcolvin. Why the negativity? @amertkara is just sharing their code.
I prefer amertkara's code. It's more readable and maintainable IMHO.
Taste is subjective, of course, but I prefer code that is simple to follow and maintain over code that is compact.
2024 version
def authenticate_sns_request(request_data: Dict):
"""
Verifying the signatures of Amazon SNS messages
"""
# Get the X509 certificate that Amazon SNS used to sign the message.
sign_url = request_data['SigningCertURL']
parsed_url = urlparse(sign_url)
if parsed_url.scheme != 'https' or not parsed_url.hostname.endswith('.amazonaws.com'):
raise ValueError(f'Invalid SigningCertURL: {sign_url}')
certificate = requests.get(sign_url).text.encode('utf-8')
# Extract the public key from the certificate.
public_key = x509.load_pem_x509_certificate(certificate, default_backend()).public_key()
# Create the string to sign.
if request_data['Type'] == 'Notification':
fields = 'Message', 'MessageId', 'Subject', 'Timestamp', 'TopicArn', 'Type'
else:
fields = 'Message', 'MessageId', 'SubscribeURL', 'Timestamp', 'Token', 'TopicArn', 'Type'
canonical_msg = ''.join(f'{f}\n{request_data[f]}\n' for f in fields).encode()
# Decode the Signature value from Base64 format.
decoded_signature = base64.b64decode(request_data['Signature'])
# Verify the signature with the public key and the canonical message.
if request_data['SignatureVersion'] == '1':
public_key.verify(decoded_signature, canonical_msg, PKCS1v15(), hashes.SHA1())
elif request_data['SignatureVersion'] == '2':
public_key.verify(decoded_signature, canonical_msg, PKCS1v15(), hashes.SHA256())
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
pretty ugly python, with python3.7, aiohttp and cryptograph, that whole mess can be rewritten as
I've also adding signature caching and checked the url.