This project deploys a lean, secure, and production-minded pipeline for detecting water leaks (e.g., a continuously running toilet) using short audio snippets analyzed by a pre-deployed SageMaker inference endpoint.
- Edge device (Pi/PC/Phone) records a 1–2s audio snippet when a trigger fires (timer, sound level, manual), base64-encodes it, and sends a JSON payload to Kinesis Data Streams.
- Lambda consumer reads Kinesis records, calls your SageMaker endpoint with the audio, and parses the model’s response.
- If the model predicts your alert label (default:
toilet) with confidence ≥ threshold (default:0.5), Lambda publishes a JSON alert to SNS. - SQS subscribes to the topic so a Streamlit dashboard (or any worker) can read alerts reliably.
Edge ▶ Kinesis ──▶ Lambda (invoke\_endpoint) ──▶ SNS ──▶ SQS (UI/worker)
│
└──▶ SageMaker Endpoint (already deployed)
- Terraform
main.tfthat creates:- Kinesis stream (KMS-encrypted)
- SNS topic + SQS queue (KMS-encrypted) with tight queue policy
- Lambda consumer (Python 3.12) with:
- Least-privilege IAM (InvokeEndpoint, Publish, Logs)
- DLQ, reserved concurrency, KMS-encrypted env vars, log retention, optional X-Ray
- Kinesis → Lambda event source mapping
- Outputs for client integration
- Input to endpoint (JSON):
{ "audio": "<base64-encoded-audio>" }
* **Output from endpoint (JSON)**:
```json
{ "label": "toilet", "confidence": 0.87 }
```
## Edge producer payload (to Kinesis)
Send **this JSON** (base64 audio under `audio_data`) to the Kinesis stream:
```json
{
"device_id": "pi-01",
"audio_data": "<base64-encoded-audio>"
}
```
* **Partition key**: use `device_id` (ensures ordering per device)
* **Size**: keep base64 string < \~2 MB
* **Duration**: 1–2 seconds is typically enough
## Quick start
```bash
terraform init
terraform apply -auto-approve \
-var="region=us-east-2" \
-var="sagemaker_endpoint_name=urbansound-audio-staging" \
-var="alert_label=toilet" \
-var="min_confidence=0.5"
```
Grab outputs for your client producer:
* `kinesis_stream_name`
* `sns_topic_arn`
* `sqs_queue_url`
* `lambda_consumer_name`
## Security & Ops defaults
* No `AdministratorAccess` on runtime roles (least privilege only)
* KMS on **Kinesis**, **SNS**, **SQS**, and Lambda **env vars**
* Tight **SQS** policy (`SourceArn` + `SourceAccount`)
* **DLQs**, **reserved concurrency**, **14-day log retention**
* Optional **X-Ray** tracing enabled
> Tip: For defense-in-depth, place Lambda in a VPC with interface endpoints for KMS/Logs/SNS/SQS/SageMaker Runtime.
## Variables
```hcl
variable "region" { default = "us-east-2" }
variable "sagemaker_endpoint_name" { description = "Existing SageMaker endpoint name" }
variable "alert_label" { default = "toilet" }
variable "min_confidence" { default = 0.5 }
```
## Destroy
```bash
terraform destroy
```
terraform {
required_version = ">= 1.1.0"
required_providers {
aws = { source = "hashicorp/aws", version = ">= 5.0" }
archive = { source = "hashicorp/archive", version = ">= 2.4.0" }
}
}
provider "aws" {
region = var.region
}
########################
# VARIABLES & DATA
########################
variable "region" { type = string, default = "us-east-2" }
variable "sagemaker_endpoint_name" { type = string } # e.g., urbansound-audio-staging
variable "alert_label" { type = string, default = "toilet" }
variable "min_confidence" { type = number, default = 0.5 }
data "aws_caller_identity" "this" {}
########################
# KMS KEYS
########################
resource "aws_kms_key" "kinesis_kms" {
description = "KMS for Kinesis (urbansound)"
enable_key_rotation = true
}
resource "aws_kms_key" "sns_kms" {
description = "KMS for SNS (urbansound)"
enable_key_rotation = true
}
resource "aws_kms_key" "sqs_kms" {
description = "KMS for SQS (urbansound)"
enable_key_rotation = true
}
resource "aws_kms_key" "lambda_env_kms" {
description = "KMS for Lambda environment encryption (urbansound)"
enable_key_rotation = true
}
########################
# CORE RESOURCES
########################
# Kinesis Stream
resource "aws_kinesis_stream" "audio_stream" {
name = "urbansound-audio-stream"
shard_count = 1
retention_period = 24
encryption_type = "KMS"
kms_key_id = aws_kms_key.kinesis_kms.arn
tags = { Environment = "dev" }
}
# SNS Topic (alerts)
resource "aws_sns_topic" "alerts_topic" {
name = "urbansound-alerts"
kms_master_key_id = aws_kms_key.sns_kms.arn
}
# SQS (alerts sink for UI)
resource "aws_sqs_queue" "alerts_queue" {
name = "urbansound-alerts-queue"
visibility_timeout_seconds = 30
message_retention_seconds = 1209600
kms_master_key_id = aws_kms_key.sqs_kms.arn
kms_data_key_reuse_period_seconds = 300
}
# DLQ for SQS alerts
resource "aws_sqs_queue" "alerts_dlq" {
name = "urbansound-alerts-dlq"
kms_master_key_id = aws_kms_key.sqs_kms.arn
kms_data_key_reuse_period_seconds = 300
}
resource "aws_sqs_queue_redrive_policy" "alerts_redrive" {
queue_url = aws_sqs_queue.alerts_queue.id
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.alerts_dlq.arn,
maxReceiveCount = 5
})
}
# SQS policy: allow only our SNS topic in this account
resource "aws_sqs_queue_policy" "alerts_queue_policy" {
queue_url = aws_sqs_queue.alerts_queue.id
policy = jsonencode({
Version : "2012-10-17",
Statement : [{
Sid : "AllowSNSSendMessage",
Effect : "Allow",
Principal : { Service : "sns.amazonaws.com" },
Action : "sqs:SendMessage",
Resource : aws_sqs_queue.alerts_queue.arn,
Condition : {
ArnEquals : { "aws:SourceArn" : aws_sns_topic.alerts_topic.arn },
StringEquals : { "aws:SourceAccount": data.aws_caller_identity.this.account_id }
}
}]
})
}
# SNS → SQS subscription
resource "aws_sns_topic_subscription" "sqs_subscription" {
topic_arn = aws_sns_topic.alerts_topic.arn
protocol = "sqs"
endpoint = aws_sqs_queue.alerts_queue.arn
}
########################
# IAM (Least Privilege)
########################
# Lambda execution role
resource "aws_iam_role" "lambda_execution_role" {
name = "urbansound-lambda-execution-role"
assume_role_policy = jsonencode({
Version : "2012-10-17",
Statement : [{
Effect : "Allow",
Principal : { Service : "lambda.amazonaws.com" },
Action : "sts:AssumeRole"
}]
})
}
data "aws_iam_policy_document" "lambda_permissions" {
statement {
sid = "SageMakerInvoke"
actions = ["sagemaker:InvokeEndpoint"]
resources = [
"arn:aws:sagemaker:${var.region}:${data.aws_caller_identity.this.account_id}:endpoint/${var.sagemaker_endpoint_name}"
]
}
statement {
sid = "SnsPublish"
actions = ["sns:Publish"]
resources = [aws_sns_topic.alerts_topic.arn]
}
statement {
sid = "Logs"
actions = ["logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents"]
resources = ["arn:aws:logs:${var.region}:${data.aws_caller_identity.this.account_id}:*"]
}
statement {
sid = "XRay" # optional tracing
actions = ["xray:PutTraceSegments", "xray:PutTelemetryRecords"]
resources = ["*"]
}
}
resource "aws_iam_policy" "lambda_policy" {
name = "urbansound-lambda-policy"
policy = data.aws_iam_policy_document.lambda_permissions.json
}
resource "aws_iam_role_policy_attachment" "lambda_attach_lp" {
role = aws_iam_role.lambda_execution_role.name
policy_arn = aws_iam_policy.lambda_policy.arn
}
########################
# LAMBDA (Consumer)
########################
# DLQ for Lambda
resource "aws_sqs_queue" "lambda_dlq" {
name = "urbansound-lambda-dlq"
kms_master_key_id = aws_kms_key.sqs_kms.arn
kms_data_key_reuse_period_seconds = 300
}
data "archive_file" "consumer_zip" {
type = "zip"
output_path = "consumer_lambda.zip"
source {
filename = "lambda_function.py"
content = <<PY
import json
import base64
import boto3
import os
from datetime import datetime
print("Lambda starting…")
sagemaker_runtime = boto3.client("sagemaker-runtime")
sns = boto3.client("sns")
ALERT_LABEL = os.getenv("ALERT_LABEL", "toilet")
MIN_CONF = float(os.getenv("MIN_CONFIDENCE", "0.5"))
def lambda_handler(event, context):
records = event.get("Records", [])
processed = 0
alerts = 0
for rec in records:
try:
payload = base64.b64decode(rec["kinesis"]["data"])
data = json.loads(payload)
device_id = data.get("device_id", "unknown")
audio_b64 = data.get("audio_data") or data.get("audio")
if not audio_b64:
print(f"No audio data for device {device_id}; keys={list(data.keys())}")
continue
if len(audio_b64) > 2_000_000:
print(f"Dropping oversized payload from {device_id}")
continue
sm_payload = {"audio": audio_b64}
resp = sagemaker_runtime.invoke_endpoint(
EndpointName=os.environ["SAGEMAKER_ENDPOINT"],
ContentType="application/json",
Body=json.dumps(sm_payload),
)
result = json.loads(resp["Body"].read().decode())
label = result.get("label", "")
try:
confidence = float(result.get("confidence", 0))
except Exception:
confidence = 0.0
print(f"{device_id}: label={label}, conf={confidence:.3f}")
if label == ALERT_LABEL and confidence >= MIN_CONF:
msg = {
"device_id": device_id,
"label": label,
"confidence": confidence,
"timestamp": datetime.now().isoformat(),
"alert_type": f"{ALERT_LABEL}_leak_detected",
"message": f"{ALERT_LABEL.capitalize()} leak detected from {device_id} with {confidence:.1%} confidence",
"sagemaker_result": result,
}
sns.publish(
TopicArn=os.environ["SNS_TOPIC_ARN"],
Message=json.dumps(msg),
Subject=f"🚨 Leak Alert - Device {device_id}",
MessageAttributes={
"device_id": {"DataType": "String", "StringValue": device_id},
"label": {"DataType": "String", "StringValue": label},
},
)
alerts += 1
processed += 1
except Exception as e:
print(f"Error: {e}")
print(f"done: processed={processed}, alerts={alerts}")
return {"statusCode": 200, "processed": processed, "alerts_sent": alerts}
PY
}
}
resource "aws_lambda_function" "consumer_lambda" {
function_name = "urbansound-consumer"
role = aws_iam_role.lambda_execution_role.arn
runtime = "python3.12"
handler = "lambda_function.lambda_handler"
filename = data.archive_file.consumer_zip.output_path
source_code_hash = data.archive_file.consumer_zip.output_base64sha256
timeout = 180
memory_size = 512
kms_key_arn = aws_kms_key.lambda_env_kms.arn
reserved_concurrent_executions = 10
dead_letter_config {
target_arn = aws_sqs_queue.lambda_dlq.arn
}
tracing_config {
mode = "Active"
}
environment {
variables = {
SAGEMAKER_ENDPOINT = var.sagemaker_endpoint_name
SNS_TOPIC_ARN = aws_sns_topic.alerts_topic.arn
ALERT_LABEL = var.alert_label
MIN_CONFIDENCE = tostring(var.min_confidence)
}
}
}
# Log retention
resource "aws_cloudwatch_log_group" "lambda_logs" {
name = "/aws/lambda/${aws_lambda_function.consumer_lambda.function_name}"
retention_in_days = 14
}
# Kinesis → Lambda mapping
resource "aws_lambda_event_source_mapping" "kinesis_trigger" {
event_source_arn = aws_kinesis_stream.audio_stream.arn
function_name = aws_lambda_function.consumer_lambda.arn
starting_position = "LATEST"
batch_size = 10
maximum_batching_window_in_seconds = 5
parallelization_factor = 1
enabled = true
}
########################
# OUTPUTS
########################
output "kinesis_stream_name" {
value = aws_kinesis_stream.audio_stream.name
}
output "sns_topic_arn" {
value = aws_sns_topic.alerts_topic.arn
}
output "sqs_queue_url" {
value = aws_sqs_queue.alerts_queue.url
}
output "lambda_consumer_name" {
value = aws_lambda_function.consumer_lambda.function_name
}