-
-
Save amertkara/e294562759ff2755486e to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*- | |
import json | |
import urllib2 | |
from M2Crypto import X509 | |
from base64 import b64decode | |
from M2Crypto.Err import M2CryptoError | |
SNS_MESSAGE_TYPE_SUB_NOTIFICATION = "SubscriptionConfirmation" | |
SNS_MESSAGE_TYPE_NOTIFICATION = "Notification" | |
SNS_MESSAGE_TYPE_UNSUB_NOTIFICATION = "UnsubscribeConfirmation" | |
def canonical_message_builder(content, format): | |
""" Builds the canonical message to be verified. | |
Sorts the fields as a requirement from AWS | |
Args: | |
content (dict): Parsed body of the response | |
format (list): List of the fields that need to go into the message | |
Returns (str): | |
canonical message | |
""" | |
m = "" | |
for field in sorted(format): | |
try: | |
m += field + "\n" + content[field] + "\n" | |
except KeyError: | |
# Build with what you have | |
pass | |
return str(m) | |
def verify_sns_notification(request): | |
""" Takes a notification request from Amazon push service SNS and verifies the origin of the notification. | |
Kudos to Artur Rodrigues for suggesting M2Crypto: http://goo.gl/KAgPPc | |
Args: | |
request (HTTPRequest): The request object that is passed to the view function | |
Returns (bool): | |
True if he message passes the verification, False otherwise | |
Raises: | |
ValueError: If the body of the response couldn't be parsed | |
M2CryptoError: If an error raises during the verification process | |
URLError: If the SigningCertURL couldn't be opened | |
""" | |
cert = None | |
pubkey = None | |
canonical_message = None | |
canonical_sub_unsub_format = ["Message", "MessageId", "SubscribeURL", "Timestamp", "Token", "TopicArn", "Type"] | |
canonical_notification_format = ["Message", "MessageId", "Subject", "Timestamp", "TopicArn", "Type"] | |
content = json.loads(request.body) | |
decoded_signature = b64decode(content["Signature"]) | |
# Depending on the message type, canonical message format varies: http://goo.gl/oSrJl8 | |
if request.META.get("HTTP_X_AMZ_SNS_MESSAGE_TYPE", None) == SNS_MESSAGE_TYPE_SUB_NOTIFICATION or \ | |
request.META.get("HTTP_X_AMZ_SNS_MESSAGE_TYPE", None) == SNS_MESSAGE_TYPE_UNSUB_NOTIFICATION: | |
canonical_message = canonical_message_builder(content, canonical_sub_unsub_format) | |
elif request.META.get("HTTP_X_AMZ_SNS_MESSAGE_TYPE", None) == SNS_MESSAGE_TYPE_NOTIFICATION: | |
canonical_message = canonical_message_builder(content, canonical_notification_format) | |
else: | |
raise ValueError("Message Type (%s) is not recognized" % request.META.get("HTTP_X_AMZ_SNS_MESSAGE_TYPE", None)) | |
# Load the certificate and extract the public key | |
cert = X509.load_cert_string(str(urllib2.urlopen(content["SigningCertURL"]).read())) | |
pubkey = cert.get_pubkey() | |
pubkey.reset_context(md='sha1') | |
pubkey.verify_init() | |
# Feed the canonical message to sign it with the public key from the certificate | |
pubkey.verify_update(canonical_message) | |
# M2Crypto uses EVP_VerifyFinal() from openssl as the underlying verification function. | |
# http://goo.gl/Bk2G36: "EVP_VerifyFinal() returns 1 for a correct signature, 0 for failure and -1 | |
# if some other error occurred." | |
verification_result = pubkey.verify_final(decoded_signature) | |
if verification_result == 1: | |
return True | |
elif verification_result == 0: | |
return False | |
else: | |
raise M2CryptoError("Some error occured while verifying the signature.") |
Thanks! But you missing one step in verification.
Amazon documentation states that you must verify notification arrived from Amazon (specifically certificate URL):
Quote:
To help prevent spoofing attacks, you should do the following when verifying messages sent by Amazon SNS:
- Always use HTTPS when getting the certificate from Amazon SNS.
- Validate the authenticity of the certificate.
- Verify the certificate was received from Amazon SNS.
- When possible, use one of the supported AWS SDKs for Amazon SNS to validate and verify messages. For example, with the AWS SDK for PHP you would use the isValid method from the MessageValidator class.
(emphasize added)
Otherwise, anyone can spoof a request supplying their own certificate making the verification valid while it never arrived from the Amazon but from the evil Eve.
pretty ugly python, with python3.7, aiohttp and cryptograph, that whole mess can be rewritten as
async def verify_sns(request, data):
msg_type = request.headers.get('X-Amz-Sns-Message-Type')
if msg_type == 'Notification':
fields = 'Message', 'MessageId', 'Subject', 'Timestamp', 'TopicArn', 'Type'
else:
fields = 'Message', 'MessageId', 'SubscribeURL', 'Timestamp', 'Token', 'TopicArn', 'Type'
try:
canonical_msg = ''.join(f'{f}\n{data[f]}\n' for f in fields).encode()
sign_url = data['SigningCertURL']
decoded_signature = base64.b64decode(data['Signature'])
except (ValueError, KeyError) as e:
raise JsonErrors.HTTPForbidden(f'invalid request, error: {e}')
cache_key = 'sns-signing-url:' + hashlib.md5(sign_url.encode()).hexdigest()
sign_url = URL(sign_url)
if sign_url.scheme != 'https' or not sign_url.host.endswith('.amazonaws.com'):
logger.warning('invalid signing url "%s"', sign_url)
raise JsonErrors.HTTPForbidden('invalid signing cert url')
pem_data = await request.app['redis'].get(cache_key, encoding=None)
if not pem_data:
async with request.app['http_client'].get(sign_url, raise_for_status=True) as r:
pem_data = await r.read()
await request.app['redis'].setex(cache_key, 86400, pem_data)
cert = x509.load_pem_x509_certificate(pem_data, default_backend())
pubkey: rsa.RSAPublicKey = cert.public_key()
try:
pubkey.verify(decoded_signature, canonical_msg, padding.PKCS1v15(), hashes.SHA1())
except InvalidSignature:
raise JsonErrors.HTTPForbidden('invalid signature')
I've also adding signature caching and checked the url.
I don't think is necessary to say "pretty ugly code" @samuelcolvin. Why the negativity? @amertkara is just sharing their code.
I prefer amertkara's code. It's more readable and maintainable IMHO.
Taste is subjective, of course, but I prefer code that is simple to follow and maintain over code that is compact.
2024 version
def authenticate_sns_request(request_data: Dict):
"""
Verifying the signatures of Amazon SNS messages
"""
# Get the X509 certificate that Amazon SNS used to sign the message.
sign_url = request_data['SigningCertURL']
parsed_url = urlparse(sign_url)
if parsed_url.scheme != 'https' or not parsed_url.hostname.endswith('.amazonaws.com'):
raise ValueError(f'Invalid SigningCertURL: {sign_url}')
certificate = requests.get(sign_url).text.encode('utf-8')
# Extract the public key from the certificate.
public_key = x509.load_pem_x509_certificate(certificate, default_backend()).public_key()
# Create the string to sign.
if request_data['Type'] == 'Notification':
fields = 'Message', 'MessageId', 'Subject', 'Timestamp', 'TopicArn', 'Type'
else:
fields = 'Message', 'MessageId', 'SubscribeURL', 'Timestamp', 'Token', 'TopicArn', 'Type'
canonical_msg = ''.join(f'{f}\n{request_data[f]}\n' for f in fields).encode()
# Decode the Signature value from Base64 format.
decoded_signature = base64.b64decode(request_data['Signature'])
# Verify the signature with the public key and the canonical message.
if request_data['SignatureVersion'] == '1':
public_key.verify(decoded_signature, canonical_msg, PKCS1v15(), hashes.SHA1())
elif request_data['SignatureVersion'] == '2':
public_key.verify(decoded_signature, canonical_msg, PKCS1v15(), hashes.SHA256())
Thanks for that Mert!
Amazon docs describing the verification process: http://docs.aws.amazon.com/sns/latest/dg/SendMessageToHttp.verify.signature.html