-
-
Save cdosborn/abc593837313f465f3475891ebbb65af to your computer and use it in GitHub Desktop.
Amazon SNS Notification Verification with Python, M2Crypto. When the SNS pushes a notification, a receiver should verify the origin/integrity of the push notification (AWS) using the signature and certificate provided in the notification data. The function `verify_sns_notification` below takes the request object and verifies the origin/integrity…
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from flask import current_app | |
from cryptography import x509 | |
from cryptography.exceptions import InvalidSignature | |
from cryptography.hazmat.backends import default_backend | |
from cryptography.hazmat.primitives import hashes | |
from cryptography.hazmat.primitives.asymmetric import rsa | |
from cryptography.hazmat.primitives.asymmetric import padding | |
import base64 | |
from urllib.parse import urlparse | |
import logging | |
import hashlib | |
import requests | |
from app import redis_client | |
def verify_sns(msg_type, data): | |
if msg_type == 'Notification': | |
fields = 'Message', 'MessageId', 'Subject', 'Timestamp', 'TopicArn', 'Type' | |
else: | |
fields = 'Message', 'MessageId', 'SubscribeURL', 'Timestamp', 'Token', 'TopicArn', 'Type' | |
try: | |
canonical_msg = ''.join(f'{f}\n{data[f]}\n' for f in fields).encode() | |
sign_url = data['SigningCertURL'] | |
decoded_signature = base64.b64decode(data['Signature']) | |
except KeyError: | |
current_app.logger.exception("Failed to verify signature, the data has an unexpected shape") | |
raise | |
cache_key = 'sns-signing-url:' + hashlib.md5(sign_url.encode()).hexdigest() | |
url = urlparse(sign_url) | |
if url.scheme != 'https' or not url.hostname.endswith('.amazonaws.com'): | |
current_app.logger.warning(f'SNS signature failed, invalid signing url "{sign_url}"') | |
return False | |
pem_data = None | |
current_app.logger.info(f'Checking cache for amazon cert') | |
pem_data = redis_client.get(cache_key) | |
if pem_data is None: | |
current_app.logger.info(f'Fetching amazon cert from "{sign_url}"') | |
response = requests.get(sign_url) | |
response.raise_for_status() | |
pem_data = response.content | |
redis_client.setex(cache_key, 86400, pem_data) | |
cert = x509.load_pem_x509_certificate(pem_data, default_backend()) | |
pubkey: rsa.RSAPublicKey = cert.public_key() | |
try: | |
pubkey.verify(decoded_signature, canonical_msg, padding.PKCS1v15(), hashes.SHA1()) | |
except InvalidSignature: | |
current_app.logger.exception("SNS Signature failed, may want to look into reason") | |
return False | |
return True |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This is great, thank you. I just used this code to upgrade django-ses. Would have been a lot harder without this example!