Skip to content

Instantly share code, notes, and snippets.

@cdosborn
Forked from amertkara/aws_utils.py
Last active October 5, 2023 22:31
Show Gist options
  • Save cdosborn/abc593837313f465f3475891ebbb65af to your computer and use it in GitHub Desktop.
Save cdosborn/abc593837313f465f3475891ebbb65af to your computer and use it in GitHub Desktop.
Amazon SNS Notification Verification with Python, M2Crypto. When the SNS pushes a notification, a receiver should verify the origin/integrity of the push notification (AWS) using the signature and certificate provided in the notification data. The function `verify_sns_notification` below takes the request object and verifies the origin/integrity…
from flask import current_app
from cryptography import x509
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import padding
import base64
from urllib.parse import urlparse
import logging
import hashlib
import requests
from app import redis_client
def verify_sns(msg_type, data):
if msg_type == 'Notification':
fields = 'Message', 'MessageId', 'Subject', 'Timestamp', 'TopicArn', 'Type'
else:
fields = 'Message', 'MessageId', 'SubscribeURL', 'Timestamp', 'Token', 'TopicArn', 'Type'
try:
canonical_msg = ''.join(f'{f}\n{data[f]}\n' for f in fields).encode()
sign_url = data['SigningCertURL']
decoded_signature = base64.b64decode(data['Signature'])
except KeyError:
current_app.logger.exception("Failed to verify signature, the data has an unexpected shape")
raise
cache_key = 'sns-signing-url:' + hashlib.md5(sign_url.encode()).hexdigest()
url = urlparse(sign_url)
if url.scheme != 'https' or not url.hostname.endswith('.amazonaws.com'):
current_app.logger.warning(f'SNS signature failed, invalid signing url "{sign_url}"')
return False
pem_data = None
current_app.logger.info(f'Checking cache for amazon cert')
pem_data = redis_client.get(cache_key)
if pem_data is None:
current_app.logger.info(f'Fetching amazon cert from "{sign_url}"')
response = requests.get(sign_url)
response.raise_for_status()
pem_data = response.content
redis_client.setex(cache_key, 86400, pem_data)
cert = x509.load_pem_x509_certificate(pem_data, default_backend())
pubkey: rsa.RSAPublicKey = cert.public_key()
try:
pubkey.verify(decoded_signature, canonical_msg, padding.PKCS1v15(), hashes.SHA1())
except InvalidSignature:
current_app.logger.exception("SNS Signature failed, may want to look into reason")
return False
return True
@ucals
Copy link

ucals commented Oct 5, 2023

Great snippet, thx!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment