Skip to content

Instantly share code, notes, and snippets.

@dceoy
Last active May 14, 2025 07:35
Show Gist options
  • Save dceoy/1bab88c71d90a6e8a3bcc9d3cea1701e to your computer and use it in GitHub Desktop.
Save dceoy/1bab88c71d90a6e8a3bcc9d3cea1701e to your computer and use it in GitHub Desktop.
[Python] Twilio signature validator
"""Unit tests for twilio_signature_validator module."""
import base64
import pytest
from aws_lambda_powertools.event_handler.exceptions import (
BadRequestError,
UnauthorizedError,
)
from aws_lambda_powertools.utilities.data_classes import LambdaFunctionUrlEvent
from fastapi import HTTPException
from pytest_mock import MockerFixture
from twilio_signature_validator import (
validate_http_twilio_signature,
validate_websocket_twilio_signature,
)
def test_validate_http_twilio_signature(mocker: MockerFixture) -> None:
event = LambdaFunctionUrlEvent({
"headers": {"X-Twilio-Signature": "valid"},
"requestContext": {
"domainName": "example.com",
"stage": "$default",
"http": {"method": "POST", "path": "/incoming-call"},
},
"rawPath": "/incoming-call",
"path": "/incoming-call",
"body": base64.b64encode(b"From=+1234567890&To=").decode("utf-8"),
"isBase64Encoded": True,
})
mock_validator = mocker.MagicMock()
mocker.patch("twilio_signature_validator.RequestValidator", return_value=mock_validator)
mock_validator_validate = mocker.patch.object(mock_validator, "validate", return_value=True)
mock_logger_info = mocker.patch("twilio_signature_validator.logger.info")
validate_http_twilio_signature("test-token", event)
mock_validator_validate.assert_called_once_with(
uri="https://example.com/incoming-call",
params={"From": " 1234567890", "To": ""},
signature="valid",
)
mock_logger_info.assert_any_call("Twilio request signature is valid")
def test_validate_http_twilio_signature_missing_signature(
mocker: MockerFixture,
) -> None:
event = LambdaFunctionUrlEvent({
"headers": {},
"requestContext": {
"domainName": "example.com",
"stage": "$default",
"http": {"method": "POST", "path": "/incoming-call"},
},
"rawPath": "/incoming-call",
"path": "/incoming-call",
"body": "From=+1234567890&To=",
})
mocker.patch("twilio_signature_validator.RequestValidator", return_value=mocker.MagicMock())
with pytest.raises(BadRequestError, match="Missing X-Twilio-Signature header"):
validate_http_twilio_signature("test-token", event)
def test_validate_http_twilio_signature_invalid(mocker: MockerFixture) -> None:
event = LambdaFunctionUrlEvent({
"headers": {"X-Twilio-Signature": "invalid"},
"requestContext": {
"domainName": "example.com",
"stage": "$default",
"http": {"method": "POST", "path": "/incoming-call"},
},
"rawPath": "/incoming-call",
"path": "/incoming-call",
"body": "From=+1234567890&To=",
})
mock_validator = mocker.MagicMock()
mocker.patch("twilio_signature_validator.RequestValidator", return_value=mock_validator)
mocker.patch.object(mock_validator, "validate", return_value=False)
with pytest.raises(UnauthorizedError, match="Invalid Twilio request signature"):
validate_http_twilio_signature("test-token", event)
@pytest.mark.asyncio
async def test_validate_websocket_twilio_signature(mocker: MockerFixture) -> None:
websocket = mocker.MagicMock()
websocket.headers = {"x-twilio-signature": "valid-signature"}
websocket.url = "wss://example.com/websocket"
websocket.query_params = {"param1": "value1"}
mock_validator = mocker.MagicMock()
mocker.patch("twilio_signature_validator.RequestValidator", return_value=mock_validator)
mock_validator_validate = mocker.patch.object(mock_validator, "validate", return_value=True)
await validate_websocket_twilio_signature(token="dummy-token", websocket=websocket)
mock_validator_validate.assert_called_once_with(
uri=websocket.url, params=websocket.query_params, signature=websocket.headers["x-twilio-signature"]
)
websocket.close.assert_not_called()
@pytest.mark.asyncio
async def test_validate_websocket_twilio_signature_missing_header(mocker: MockerFixture) -> None:
websocket = mocker.MagicMock()
websocket.headers = {}
mocker.patch.object(websocket, "close", new_callable=mocker.AsyncMock)
expected_http_status_code = 403
expected_websocket_close_code = 1008
with pytest.raises(HTTPException) as exc_info:
await validate_websocket_twilio_signature(token="dummy-token", websocket=websocket)
assert exc_info.value.status_code == expected_http_status_code
assert "Missing x-twilio-signature header" in exc_info.value.detail
websocket.close.assert_called_once_with(code=expected_websocket_close_code)
@pytest.mark.asyncio
async def test_validate_websocket_twilio_signature_invalid_signature(mocker: MockerFixture) -> None:
websocket = mocker.MagicMock()
websocket.headers = {"x-twilio-signature": "invalid-signature"}
websocket.url = "ws://example.com/websocket"
websocket.query_params = {"param1": "value1"}
expected_http_status_code = 403
expected_websocket_close_code = 1008
mocker.patch.object(websocket, "close", new_callable=mocker.AsyncMock)
mock_validator = mocker.MagicMock()
mocker.patch("twilio_signature_validator.RequestValidator", return_value=mock_validator)
mocker.patch.object(mock_validator, "validate", return_value=False)
with pytest.raises(HTTPException) as exc_info:
await validate_websocket_twilio_signature(token="dummy-token", websocket=websocket)
assert exc_info.value.status_code == expected_http_status_code
assert "Invalid Twilio request signature" in exc_info.value.detail
websocket.close.assert_called_once_with(code=expected_websocket_close_code)
"""Module for Twilio signature validation."""
from urllib.parse import parse_qsl
from aws_lambda_powertools import Logger
from aws_lambda_powertools.event_handler.exceptions import (
BadRequestError,
UnauthorizedError,
)
from aws_lambda_powertools.utilities.data_classes import LambdaFunctionUrlEvent
from fastapi import HTTPException, WebSocket
from twilio.request_validator import RequestValidator
logger = Logger()
def validate_twilio_http_signature(token: str, event: LambdaFunctionUrlEvent) -> None:
"""Validate Twilio signature for HTTP request.
Args:
token (str): Twilio auth token.
event (LambdaFunctionUrlEvent): The event data passed by AWS Lambda.
Raises:
BadRequestError: If the request signature is missing.
UnauthorizedError: If the request signature is invalid.
"""
logger.info("Validating Twilio signature")
validator = RequestValidator(token)
query_parameters = event.get("queryStringParameters")
query_string = (
"?{}".format("&".join([f"{k}={v}" for k, v in query_parameters.items()]))
if query_parameters
else ""
)
uri = f"https://{event.request_context.domain_name}{event.path}{query_string}"
logger.info("uri: %s", uri)
logger.info("event.decoded_body: %s", event.decoded_body)
params = dict(parse_qsl(event.decoded_body, keep_blank_values=True))
logger.info("params: %s", params)
signature = event.headers.get("X-Twilio-Signature")
if not signature:
error_message = "Missing X-Twilio-Signature header"
raise BadRequestError(error_message)
if not validator.validate(uri=uri, params=params, signature=signature):
error_message = "Invalid Twilio request signature"
raise UnauthorizedError(error_message)
logger.info("Twilio request signature is valid")
async def validate_websocket_twilio_signature(token: str, websocket: WebSocket) -> None:
"""Validate Twilio signature for WebSocket handshake request.
Args:
token (str): Twilio auth token.
websocket (WebSocket): WebSocket connection object.
Raises:
HTTPException: If the request signature is missing or invalid.
"""
signature = websocket.headers.get("x-twilio-signature")
if not signature:
await websocket.close(code=1008)
raise HTTPException(status_code=403, detail="Missing x-twilio-signature header")
validator = RequestValidator(token)
logger.info(
"websocket.url: %s, websocket.query_params: %s",
websocket.url,
websocket.query_params,
)
uri = "wss://" + str(websocket.url).split("://", maxsplit=1)[1]
logger.info("uri: %s", uri)
params = dict(websocket.query_params)
logger.info("params: %s", params)
if not validator.validate(uri=uri, params=params, signature=signature):
await websocket.close(code=1008)
raise HTTPException(status_code=403, detail="Invalid Twilio request signature")
logger.info("Twilio request signature is valid")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment