Last active
May 14, 2025 07:35
-
-
Save dceoy/1bab88c71d90a6e8a3bcc9d3cea1701e to your computer and use it in GitHub Desktop.
[Python] Twilio signature validator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Unit tests for twilio_signature_validator module.""" | |
import base64 | |
import pytest | |
from aws_lambda_powertools.event_handler.exceptions import ( | |
BadRequestError, | |
UnauthorizedError, | |
) | |
from aws_lambda_powertools.utilities.data_classes import LambdaFunctionUrlEvent | |
from fastapi import HTTPException | |
from pytest_mock import MockerFixture | |
from twilio_signature_validator import ( | |
validate_http_twilio_signature, | |
validate_websocket_twilio_signature, | |
) | |
def test_validate_http_twilio_signature(mocker: MockerFixture) -> None: | |
event = LambdaFunctionUrlEvent({ | |
"headers": {"X-Twilio-Signature": "valid"}, | |
"requestContext": { | |
"domainName": "example.com", | |
"stage": "$default", | |
"http": {"method": "POST", "path": "/incoming-call"}, | |
}, | |
"rawPath": "/incoming-call", | |
"path": "/incoming-call", | |
"body": base64.b64encode(b"From=+1234567890&To=").decode("utf-8"), | |
"isBase64Encoded": True, | |
}) | |
mock_validator = mocker.MagicMock() | |
mocker.patch("twilio_signature_validator.RequestValidator", return_value=mock_validator) | |
mock_validator_validate = mocker.patch.object(mock_validator, "validate", return_value=True) | |
mock_logger_info = mocker.patch("twilio_signature_validator.logger.info") | |
validate_http_twilio_signature("test-token", event) | |
mock_validator_validate.assert_called_once_with( | |
uri="https://example.com/incoming-call", | |
params={"From": " 1234567890", "To": ""}, | |
signature="valid", | |
) | |
mock_logger_info.assert_any_call("Twilio request signature is valid") | |
def test_validate_http_twilio_signature_missing_signature( | |
mocker: MockerFixture, | |
) -> None: | |
event = LambdaFunctionUrlEvent({ | |
"headers": {}, | |
"requestContext": { | |
"domainName": "example.com", | |
"stage": "$default", | |
"http": {"method": "POST", "path": "/incoming-call"}, | |
}, | |
"rawPath": "/incoming-call", | |
"path": "/incoming-call", | |
"body": "From=+1234567890&To=", | |
}) | |
mocker.patch("twilio_signature_validator.RequestValidator", return_value=mocker.MagicMock()) | |
with pytest.raises(BadRequestError, match="Missing X-Twilio-Signature header"): | |
validate_http_twilio_signature("test-token", event) | |
def test_validate_http_twilio_signature_invalid(mocker: MockerFixture) -> None: | |
event = LambdaFunctionUrlEvent({ | |
"headers": {"X-Twilio-Signature": "invalid"}, | |
"requestContext": { | |
"domainName": "example.com", | |
"stage": "$default", | |
"http": {"method": "POST", "path": "/incoming-call"}, | |
}, | |
"rawPath": "/incoming-call", | |
"path": "/incoming-call", | |
"body": "From=+1234567890&To=", | |
}) | |
mock_validator = mocker.MagicMock() | |
mocker.patch("twilio_signature_validator.RequestValidator", return_value=mock_validator) | |
mocker.patch.object(mock_validator, "validate", return_value=False) | |
with pytest.raises(UnauthorizedError, match="Invalid Twilio request signature"): | |
validate_http_twilio_signature("test-token", event) | |
@pytest.mark.asyncio | |
async def test_validate_websocket_twilio_signature(mocker: MockerFixture) -> None: | |
websocket = mocker.MagicMock() | |
websocket.headers = {"x-twilio-signature": "valid-signature"} | |
websocket.url = "wss://example.com/websocket" | |
websocket.query_params = {"param1": "value1"} | |
mock_validator = mocker.MagicMock() | |
mocker.patch("twilio_signature_validator.RequestValidator", return_value=mock_validator) | |
mock_validator_validate = mocker.patch.object(mock_validator, "validate", return_value=True) | |
await validate_websocket_twilio_signature(token="dummy-token", websocket=websocket) | |
mock_validator_validate.assert_called_once_with( | |
uri=websocket.url, params=websocket.query_params, signature=websocket.headers["x-twilio-signature"] | |
) | |
websocket.close.assert_not_called() | |
@pytest.mark.asyncio | |
async def test_validate_websocket_twilio_signature_missing_header(mocker: MockerFixture) -> None: | |
websocket = mocker.MagicMock() | |
websocket.headers = {} | |
mocker.patch.object(websocket, "close", new_callable=mocker.AsyncMock) | |
expected_http_status_code = 403 | |
expected_websocket_close_code = 1008 | |
with pytest.raises(HTTPException) as exc_info: | |
await validate_websocket_twilio_signature(token="dummy-token", websocket=websocket) | |
assert exc_info.value.status_code == expected_http_status_code | |
assert "Missing x-twilio-signature header" in exc_info.value.detail | |
websocket.close.assert_called_once_with(code=expected_websocket_close_code) | |
@pytest.mark.asyncio | |
async def test_validate_websocket_twilio_signature_invalid_signature(mocker: MockerFixture) -> None: | |
websocket = mocker.MagicMock() | |
websocket.headers = {"x-twilio-signature": "invalid-signature"} | |
websocket.url = "ws://example.com/websocket" | |
websocket.query_params = {"param1": "value1"} | |
expected_http_status_code = 403 | |
expected_websocket_close_code = 1008 | |
mocker.patch.object(websocket, "close", new_callable=mocker.AsyncMock) | |
mock_validator = mocker.MagicMock() | |
mocker.patch("twilio_signature_validator.RequestValidator", return_value=mock_validator) | |
mocker.patch.object(mock_validator, "validate", return_value=False) | |
with pytest.raises(HTTPException) as exc_info: | |
await validate_websocket_twilio_signature(token="dummy-token", websocket=websocket) | |
assert exc_info.value.status_code == expected_http_status_code | |
assert "Invalid Twilio request signature" in exc_info.value.detail | |
websocket.close.assert_called_once_with(code=expected_websocket_close_code) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Module for Twilio signature validation.""" | |
from urllib.parse import parse_qsl | |
from aws_lambda_powertools import Logger | |
from aws_lambda_powertools.event_handler.exceptions import ( | |
BadRequestError, | |
UnauthorizedError, | |
) | |
from aws_lambda_powertools.utilities.data_classes import LambdaFunctionUrlEvent | |
from fastapi import HTTPException, WebSocket | |
from twilio.request_validator import RequestValidator | |
logger = Logger() | |
def validate_twilio_http_signature(token: str, event: LambdaFunctionUrlEvent) -> None: | |
"""Validate Twilio signature for HTTP request. | |
Args: | |
token (str): Twilio auth token. | |
event (LambdaFunctionUrlEvent): The event data passed by AWS Lambda. | |
Raises: | |
BadRequestError: If the request signature is missing. | |
UnauthorizedError: If the request signature is invalid. | |
""" | |
logger.info("Validating Twilio signature") | |
validator = RequestValidator(token) | |
query_parameters = event.get("queryStringParameters") | |
query_string = ( | |
"?{}".format("&".join([f"{k}={v}" for k, v in query_parameters.items()])) | |
if query_parameters | |
else "" | |
) | |
uri = f"https://{event.request_context.domain_name}{event.path}{query_string}" | |
logger.info("uri: %s", uri) | |
logger.info("event.decoded_body: %s", event.decoded_body) | |
params = dict(parse_qsl(event.decoded_body, keep_blank_values=True)) | |
logger.info("params: %s", params) | |
signature = event.headers.get("X-Twilio-Signature") | |
if not signature: | |
error_message = "Missing X-Twilio-Signature header" | |
raise BadRequestError(error_message) | |
if not validator.validate(uri=uri, params=params, signature=signature): | |
error_message = "Invalid Twilio request signature" | |
raise UnauthorizedError(error_message) | |
logger.info("Twilio request signature is valid") | |
async def validate_websocket_twilio_signature(token: str, websocket: WebSocket) -> None: | |
"""Validate Twilio signature for WebSocket handshake request. | |
Args: | |
token (str): Twilio auth token. | |
websocket (WebSocket): WebSocket connection object. | |
Raises: | |
HTTPException: If the request signature is missing or invalid. | |
""" | |
signature = websocket.headers.get("x-twilio-signature") | |
if not signature: | |
await websocket.close(code=1008) | |
raise HTTPException(status_code=403, detail="Missing x-twilio-signature header") | |
validator = RequestValidator(token) | |
logger.info( | |
"websocket.url: %s, websocket.query_params: %s", | |
websocket.url, | |
websocket.query_params, | |
) | |
uri = "wss://" + str(websocket.url).split("://", maxsplit=1)[1] | |
logger.info("uri: %s", uri) | |
params = dict(websocket.query_params) | |
logger.info("params: %s", params) | |
if not validator.validate(uri=uri, params=params, signature=signature): | |
await websocket.close(code=1008) | |
raise HTTPException(status_code=403, detail="Invalid Twilio request signature") | |
logger.info("Twilio request signature is valid") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment