Skip to content

Instantly share code, notes, and snippets.

@si3mshady
Created September 2, 2025 12:42
Show Gist options
  • Select an option

  • Save si3mshady/1f1d290ef4b1626ea7a5b8bfa4b7feb1 to your computer and use it in GitHub Desktop.

Select an option

Save si3mshady/1f1d290ef4b1626ea7a5b8bfa4b7feb1 to your computer and use it in GitHub Desktop.
IoT water-leak detection pipeline with Terraform — streams audio to Kinesis, runs inference on SageMaker, and delivers leak alerts via SNS/SQS (dev stage with admin access).

IoT Water-Leak Detection (Audio) — AWS (Kinesis → Lambda → SageMaker → SNS/SQS)

This project deploys a lean, secure, and production-minded pipeline for detecting water leaks (e.g., a continuously running toilet) using short audio snippets analyzed by a pre-deployed SageMaker inference endpoint.

How it works

  1. Edge device (Pi/PC/Phone) records a 1–2s audio snippet when a trigger fires (timer, sound level, manual), base64-encodes it, and sends a JSON payload to Kinesis Data Streams.
  2. Lambda consumer reads Kinesis records, calls your SageMaker endpoint with the audio, and parses the model’s response.
  3. If the model predicts your alert label (default: toilet) with confidence ≥ threshold (default: 0.5), Lambda publishes a JSON alert to SNS.
  4. SQS subscribes to the topic so a Streamlit dashboard (or any worker) can read alerts reliably.

Edge ▶ Kinesis ──▶ Lambda (invoke\_endpoint) ──▶ SNS ──▶ SQS (UI/worker)
│
└──▶ SageMaker Endpoint (already deployed)

What’s included

  • Terraform main.tf that creates:
    • Kinesis stream (KMS-encrypted)
    • SNS topic + SQS queue (KMS-encrypted) with tight queue policy
    • Lambda consumer (Python 3.12) with:
      • Least-privilege IAM (InvokeEndpoint, Publish, Logs)
      • DLQ, reserved concurrency, KMS-encrypted env vars, log retention, optional X-Ray
    • Kinesis → Lambda event source mapping
    • Outputs for client integration

Model contract (expected I/O)

  • Input to endpoint (JSON):
    { "audio": "<base64-encoded-audio>" }

* **Output from endpoint (JSON)**:

  ```json
  { "label": "toilet", "confidence": 0.87 }
  ```

## Edge producer payload (to Kinesis)

Send **this JSON** (base64 audio under `audio_data`) to the Kinesis stream:

```json
{
  "device_id": "pi-01",
  "audio_data": "<base64-encoded-audio>"
}
```

* **Partition key**: use `device_id` (ensures ordering per device)
* **Size**: keep base64 string < \~2 MB
* **Duration**: 1–2 seconds is typically enough

## Quick start

```bash
terraform init
terraform apply -auto-approve \
  -var="region=us-east-2" \
  -var="sagemaker_endpoint_name=urbansound-audio-staging" \
  -var="alert_label=toilet" \
  -var="min_confidence=0.5"
```

Grab outputs for your client producer:

* `kinesis_stream_name`
* `sns_topic_arn`
* `sqs_queue_url`
* `lambda_consumer_name`

## Security & Ops defaults

* No `AdministratorAccess` on runtime roles (least privilege only)
* KMS on **Kinesis**, **SNS**, **SQS**, and Lambda **env vars**
* Tight **SQS** policy (`SourceArn` + `SourceAccount`)
* **DLQs**, **reserved concurrency**, **14-day log retention**
* Optional **X-Ray** tracing enabled

> Tip: For defense-in-depth, place Lambda in a VPC with interface endpoints for KMS/Logs/SNS/SQS/SageMaker Runtime.

## Variables

```hcl
variable "region"                  { default = "us-east-2" }
variable "sagemaker_endpoint_name" { description = "Existing SageMaker endpoint name" }
variable "alert_label"             { default = "toilet" }
variable "min_confidence"          { default = 0.5 }
```

## Destroy

```bash
terraform destroy
```


main.tf

terraform {
  required_version = ">= 1.1.0"
  required_providers {
    aws     = { source = "hashicorp/aws",     version = ">= 5.0" }
    archive = { source = "hashicorp/archive", version = ">= 2.4.0" }
  }
}

provider "aws" {
  region = var.region
}

########################
# VARIABLES & DATA
########################

variable "region"                  { type = string, default = "us-east-2" }
variable "sagemaker_endpoint_name" { type = string } # e.g., urbansound-audio-staging
variable "alert_label"             { type = string, default = "toilet" }
variable "min_confidence"          { type = number, default = 0.5 }

data "aws_caller_identity" "this" {}

########################
# KMS KEYS
########################

resource "aws_kms_key" "kinesis_kms" {
  description         = "KMS for Kinesis (urbansound)"
  enable_key_rotation = true
}

resource "aws_kms_key" "sns_kms" {
  description         = "KMS for SNS (urbansound)"
  enable_key_rotation = true
}

resource "aws_kms_key" "sqs_kms" {
  description         = "KMS for SQS (urbansound)"
  enable_key_rotation = true
}

resource "aws_kms_key" "lambda_env_kms" {
  description         = "KMS for Lambda environment encryption (urbansound)"
  enable_key_rotation = true
}

########################
# CORE RESOURCES
########################

# Kinesis Stream
resource "aws_kinesis_stream" "audio_stream" {
  name                  = "urbansound-audio-stream"
  shard_count           = 1
  retention_period      = 24
  encryption_type       = "KMS"
  kms_key_id            = aws_kms_key.kinesis_kms.arn
  tags = { Environment = "dev" }
}

# SNS Topic (alerts)
resource "aws_sns_topic" "alerts_topic" {
  name              = "urbansound-alerts"
  kms_master_key_id = aws_kms_key.sns_kms.arn
}

# SQS (alerts sink for UI)
resource "aws_sqs_queue" "alerts_queue" {
  name                              = "urbansound-alerts-queue"
  visibility_timeout_seconds        = 30
  message_retention_seconds         = 1209600
  kms_master_key_id                 = aws_kms_key.sqs_kms.arn
  kms_data_key_reuse_period_seconds = 300
}

# DLQ for SQS alerts
resource "aws_sqs_queue" "alerts_dlq" {
  name                              = "urbansound-alerts-dlq"
  kms_master_key_id                 = aws_kms_key.sqs_kms.arn
  kms_data_key_reuse_period_seconds = 300
}

resource "aws_sqs_queue_redrive_policy" "alerts_redrive" {
  queue_url = aws_sqs_queue.alerts_queue.id
  redrive_policy = jsonencode({
    deadLetterTargetArn = aws_sqs_queue.alerts_dlq.arn,
    maxReceiveCount     = 5
  })
}

# SQS policy: allow only our SNS topic in this account
resource "aws_sqs_queue_policy" "alerts_queue_policy" {
  queue_url = aws_sqs_queue.alerts_queue.id
  policy = jsonencode({
    Version : "2012-10-17",
    Statement : [{
      Sid       : "AllowSNSSendMessage",
      Effect    : "Allow",
      Principal : { Service : "sns.amazonaws.com" },
      Action    : "sqs:SendMessage",
      Resource  : aws_sqs_queue.alerts_queue.arn,
      Condition : {
        ArnEquals    : { "aws:SourceArn"    : aws_sns_topic.alerts_topic.arn },
        StringEquals : { "aws:SourceAccount": data.aws_caller_identity.this.account_id }
      }
    }]
  })
}

# SNS → SQS subscription
resource "aws_sns_topic_subscription" "sqs_subscription" {
  topic_arn = aws_sns_topic.alerts_topic.arn
  protocol  = "sqs"
  endpoint  = aws_sqs_queue.alerts_queue.arn
}

########################
# IAM (Least Privilege)
########################

# Lambda execution role
resource "aws_iam_role" "lambda_execution_role" {
  name = "urbansound-lambda-execution-role"
  assume_role_policy = jsonencode({
    Version : "2012-10-17",
    Statement : [{
      Effect : "Allow",
      Principal : { Service : "lambda.amazonaws.com" },
      Action : "sts:AssumeRole"
    }]
  })
}

data "aws_iam_policy_document" "lambda_permissions" {
  statement {
    sid     = "SageMakerInvoke"
    actions = ["sagemaker:InvokeEndpoint"]
    resources = [
      "arn:aws:sagemaker:${var.region}:${data.aws_caller_identity.this.account_id}:endpoint/${var.sagemaker_endpoint_name}"
    ]
  }
  statement {
    sid     = "SnsPublish"
    actions = ["sns:Publish"]
    resources = [aws_sns_topic.alerts_topic.arn]
  }
  statement {
    sid     = "Logs"
    actions = ["logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents"]
    resources = ["arn:aws:logs:${var.region}:${data.aws_caller_identity.this.account_id}:*"]
  }
  statement {
    sid     = "XRay" # optional tracing
    actions = ["xray:PutTraceSegments", "xray:PutTelemetryRecords"]
    resources = ["*"]
  }
}

resource "aws_iam_policy" "lambda_policy" {
  name   = "urbansound-lambda-policy"
  policy = data.aws_iam_policy_document.lambda_permissions.json
}

resource "aws_iam_role_policy_attachment" "lambda_attach_lp" {
  role       = aws_iam_role.lambda_execution_role.name
  policy_arn = aws_iam_policy.lambda_policy.arn
}

########################
# LAMBDA (Consumer)
########################

# DLQ for Lambda
resource "aws_sqs_queue" "lambda_dlq" {
  name                              = "urbansound-lambda-dlq"
  kms_master_key_id                 = aws_kms_key.sqs_kms.arn
  kms_data_key_reuse_period_seconds = 300
}

data "archive_file" "consumer_zip" {
  type        = "zip"
  output_path = "consumer_lambda.zip"

  source {
    filename = "lambda_function.py"
    content  = <<PY
import json
import base64
import boto3
import os
from datetime import datetime

print("Lambda starting…")
sagemaker_runtime = boto3.client("sagemaker-runtime")
sns = boto3.client("sns")

ALERT_LABEL = os.getenv("ALERT_LABEL", "toilet")
MIN_CONF = float(os.getenv("MIN_CONFIDENCE", "0.5"))

def lambda_handler(event, context):
    records = event.get("Records", [])
    processed = 0
    alerts = 0

    for rec in records:
        try:
            payload = base64.b64decode(rec["kinesis"]["data"])
            data = json.loads(payload)

            device_id = data.get("device_id", "unknown")
            audio_b64 = data.get("audio_data") or data.get("audio")
            if not audio_b64:
                print(f"No audio data for device {device_id}; keys={list(data.keys())}")
                continue

            if len(audio_b64) > 2_000_000:
                print(f"Dropping oversized payload from {device_id}")
                continue

            sm_payload = {"audio": audio_b64}
            resp = sagemaker_runtime.invoke_endpoint(
                EndpointName=os.environ["SAGEMAKER_ENDPOINT"],
                ContentType="application/json",
                Body=json.dumps(sm_payload),
            )
            result = json.loads(resp["Body"].read().decode())
            label = result.get("label", "")
            try:
                confidence = float(result.get("confidence", 0))
            except Exception:
                confidence = 0.0

            print(f"{device_id}: label={label}, conf={confidence:.3f}")

            if label == ALERT_LABEL and confidence >= MIN_CONF:
                msg = {
                    "device_id": device_id,
                    "label": label,
                    "confidence": confidence,
                    "timestamp": datetime.now().isoformat(),
                    "alert_type": f"{ALERT_LABEL}_leak_detected",
                    "message": f"{ALERT_LABEL.capitalize()} leak detected from {device_id} with {confidence:.1%} confidence",
                    "sagemaker_result": result,
                }
                sns.publish(
                    TopicArn=os.environ["SNS_TOPIC_ARN"],
                    Message=json.dumps(msg),
                    Subject=f"🚨 Leak Alert - Device {device_id}",
                    MessageAttributes={
                        "device_id": {"DataType": "String", "StringValue": device_id},
                        "label": {"DataType": "String", "StringValue": label},
                    },
                )
                alerts += 1

            processed += 1

        except Exception as e:
            print(f"Error: {e}")

    print(f"done: processed={processed}, alerts={alerts}")
    return {"statusCode": 200, "processed": processed, "alerts_sent": alerts}
PY
  }
}

resource "aws_lambda_function" "consumer_lambda" {
  function_name = "urbansound-consumer"
  role          = aws_iam_role.lambda_execution_role.arn
  runtime       = "python3.12"
  handler       = "lambda_function.lambda_handler"

  filename         = data.archive_file.consumer_zip.output_path
  source_code_hash = data.archive_file.consumer_zip.output_base64sha256

  timeout      = 180
  memory_size  = 512
  kms_key_arn  = aws_kms_key.lambda_env_kms.arn
  reserved_concurrent_executions = 10

  dead_letter_config {
    target_arn = aws_sqs_queue.lambda_dlq.arn
  }

  tracing_config {
    mode = "Active"
  }

  environment {
    variables = {
      SAGEMAKER_ENDPOINT = var.sagemaker_endpoint_name
      SNS_TOPIC_ARN      = aws_sns_topic.alerts_topic.arn
      ALERT_LABEL        = var.alert_label
      MIN_CONFIDENCE     = tostring(var.min_confidence)
    }
  }
}

# Log retention
resource "aws_cloudwatch_log_group" "lambda_logs" {
  name              = "/aws/lambda/${aws_lambda_function.consumer_lambda.function_name}"
  retention_in_days = 14
}

# Kinesis → Lambda mapping
resource "aws_lambda_event_source_mapping" "kinesis_trigger" {
  event_source_arn                    = aws_kinesis_stream.audio_stream.arn
  function_name                       = aws_lambda_function.consumer_lambda.arn
  starting_position                   = "LATEST"
  batch_size                          = 10
  maximum_batching_window_in_seconds  = 5
  parallelization_factor              = 1
  enabled                             = true
}

########################
# OUTPUTS
########################

output "kinesis_stream_name" {
  value = aws_kinesis_stream.audio_stream.name
}

output "sns_topic_arn" {
  value = aws_sns_topic.alerts_topic.arn
}

output "sqs_queue_url" {
  value = aws_sqs_queue.alerts_queue.url
}

output "lambda_consumer_name" {
  value = aws_lambda_function.consumer_lambda.function_name
}
terraform {
required_version = ">= 1.1.0"
required_providers {
aws = { source = "hashicorp/aws", version = ">= 4.0" }
}
}
provider "aws" {
region = "us-east-2"
}
########################
# ADMIN IAM ROLES (DEV MODE)
########################
resource "aws_iam_role" "lambda_execution_role" {
name = "urbansound-lambda-execution-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "lambda.amazonaws.com"
}
}]
})
}
resource "aws_iam_role_policy_attachment" "lambda_admin" {
role = aws_iam_role.lambda_execution_role.name
policy_arn = "arn:aws:iam::aws:policy/AdministratorAccess"
}
resource "aws_iam_role" "sagemaker_execution_role" {
name = "urbansound-sagemaker-execution-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "sagemaker.amazonaws.com"
}
}]
})
}
resource "aws_iam_role_policy_attachment" "sagemaker_admin" {
role = aws_iam_role.sagemaker_execution_role.name
policy_arn = "arn:aws:iam::aws:policy/AdministratorAccess"
}
########################
# KINESIS DATA STREAM
########################
resource "aws_kinesis_stream" "audio_stream" {
name = "urbansound-audio-stream"
shard_count = 1
retention_period = 24
tags = {
Environment = "dev"
}
}
########################
# SNS TOPIC FOR ALERTS
########################
resource "aws_sns_topic" "alerts_topic" {
name = "urbansound-alerts"
}
########################
# SQS QUEUE FOR STREAMLIT
########################
resource "aws_sqs_queue" "alerts_queue" {
name = "urbansound-alerts-queue"
visibility_timeout_seconds = 30
message_retention_seconds = 1209600
}
resource "aws_sns_topic_subscription" "sqs_subscription" {
topic_arn = aws_sns_topic.alerts_topic.arn
protocol = "sqs"
endpoint = aws_sqs_queue.alerts_queue.arn
}
resource "aws_sqs_queue_policy" "alerts_queue_policy" {
queue_url = aws_sqs_queue.alerts_queue.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Sid = "AllowSNSSendMessage"
Effect = "Allow"
Principal = "*"
Action = "sqs:SendMessage"
Resource = aws_sqs_queue.alerts_queue.arn
Condition = {
ArnEquals = {
"aws:SourceArn" = aws_sns_topic.alerts_topic.arn
}
}
}]
})
}
########################
# LAMBDA CONSUMER - ONLY REFERENCES EXISTING SAGEMAKER ENDPOINT
########################
resource "aws_lambda_function" "consumer_lambda" {
function_name = "urbansound-consumer"
runtime = "python3.9"
handler = "lambda_function.lambda_handler"
role = aws_iam_role.lambda_execution_role.arn
timeout = 180
memory_size = 512
filename = data.archive_file.consumer_zip.output_path
source_code_hash = data.archive_file.consumer_zip.output_base64sha256
environment {
variables = {
SAGEMAKER_ENDPOINT = "urbansound-audio-staging"
SNS_TOPIC_ARN = aws_sns_topic.alerts_topic.arn
}
}
}
data "archive_file" "consumer_zip" {
type = "zip"
output_path = "consumer_lambda.zip"
source {
content = <<EOF
import json
import base64
import boto3
import os
from datetime import datetime
print("Lambda function starting...")
sagemaker_runtime = boto3.client('sagemaker-runtime')
sns = boto3.client('sns')
def lambda_handler(event, context):
print(f"Lambda triggered with {len(event.get('Records', []))} records")
processed_count = 0
alerts_sent = 0
for record in event['Records']:
try:
print(f"Processing record: {record['eventSource']}")
payload = base64.b64decode(record['kinesis']['data'])
audio_data = json.loads(payload)
print(f"Decoded payload keys: {list(audio_data.keys())}")
device_id = audio_data.get('device_id', 'unknown')
audio_b64 = audio_data.get('audio_data') or audio_data.get('audio')
if not audio_b64:
print(f"No audio data in record from {device_id}")
print(f"Available keys: {list(audio_data.keys())}")
continue
print(f"Processing audio from device {device_id}, audio size: {len(audio_b64)} chars")
sagemaker_payload = {"audio": audio_b64}
response = sagemaker_runtime.invoke_endpoint(
EndpointName=os.environ['SAGEMAKER_ENDPOINT'],
ContentType='application/json',
Body=json.dumps(sagemaker_payload)
)
result = json.loads(response['Body'].read().decode())
label = result.get('label', '')
confidence = float(result.get('confidence', 0))
print(f"Device {device_id}: Label={label}, Confidence={confidence:.3f}")
if label == 'toilet' and confidence >= 0.5:
alert_message = {
'device_id': device_id,
'label': label,
'confidence': confidence,
'timestamp': datetime.now().isoformat(),
'alert_type': 'toilet_leak_detected',
'message': f'Toilet leak detected from {device_id} with {confidence:.1%} confidence',
'sagemaker_result': result
}
sns.publish(
TopicArn=os.environ['SNS_TOPIC_ARN'],
Message=json.dumps(alert_message),
Subject=f'🚨 Toilet Leak Alert - Device {device_id}'
)
alerts_sent += 1
print(f"ALERT SENT: {alert_message}")
else:
print(f"No alert - Label: {label}, Confidence: {confidence:.3f}")
processed_count += 1
except Exception as e:
print(f"Error processing record: {str(e)}")
import traceback
traceback.print_exc()
print(f"Lambda completed: {processed_count} processed, {alerts_sent} alerts sent")
return {
'statusCode': 200,
'processed': processed_count,
'alerts_sent': alerts_sent
}
EOF
filename = "lambda_function.py"
}
}
########################
# KINESIS EVENT SOURCE MAPPING
########################
resource "aws_lambda_event_source_mapping" "kinesis_trigger" {
event_source_arn = aws_kinesis_stream.audio_stream.arn
function_name = aws_lambda_function.consumer_lambda.arn
starting_position = "LATEST"
batch_size = 10
enabled = true
maximum_batching_window_in_seconds = 5
parallelization_factor = 1
}
########################
# OUTPUTS
########################
output "kinesis_stream_name" {
value = aws_kinesis_stream.audio_stream.name
}
output "sns_topic_arn" {
value = aws_sns_topic.alerts_topic.arn
}
output "sqs_queue_url" {
value = aws_sqs_queue.alerts_queue.url
}
output "lambda_consumer_name" {
value = aws_lambda_function.consumer_lambda.function_name
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment