Skip to content

Instantly share code, notes, and snippets.

@enachb
Last active January 13, 2026 21:28
Show Gist options
  • Select an option

  • Save enachb/b92f7c787fd0804072fa0ef94852597b to your computer and use it in GitHub Desktop.

Select an option

Save enachb/b92f7c787fd0804072fa0ef94852597b to your computer and use it in GitHub Desktop.
Week 3: Missions & Alerts - Mission Service with Alerts Logic (CORRECTED)

Week 3 Implementation Plan: Missions & Alerts

Executive Summary

Recommendation: Enhance Mission Service with CEL-based rule evaluation by incorporating proven code from Alerts Service.

Key Insight: Mission service is the correct home for mission automation. We'll add CEL evaluation capability, SMS/Email delivery, and event processing to mission service by copying proven implementations from alerts service.

Schema Approach: Extend AutomationPolicy to support both CEL expressions (Week 3) and natural language instructions (future AI). This allows simple rules now while preserving the path to AI-driven automation.


Current State Assessment

✅ What's Working

Alerts Service (cloud/alerts/):

  • CEL rule engine fully operational with flexible JSON evaluation
  • NATS JetStream integration complete
  • PostgreSQL schema complete (6 tables)
  • gRPC APIs for alert definition CRUD
  • Event routing and rule matching logic
  • Database tables exist: alert_instances, alert_deliveries

Object Detector (object-detector/):

  • Publishes to: *.*.*.*.objects.detected.annotated_frame (DETECTIONS stream)
  • Supports: 80 YOLO classes, face recognition, pose estimation
  • Message format: AnnotatedImage protobuf with confidence scores

Mission Service (cloud/mission_service/):

  • Timeline tracking operational (history already works!)
  • Can view mission execution history via StreamMissionTimeline RPC

⚠️ What Needs Work

  1. Mission service needs detection event consumption (subscribe to DETECTIONS stream)
  2. Mission service needs CEL evaluation engine (copy from alerts service)
  3. Mission service needs SMS/Email delivery (copy from alerts service)
  4. Mission service needs action history tracking (use mission_timeline or new tables)
  5. No UI components for creating rules or viewing alert history

Implementation Phases

Phase 0: Extend Schema and Proto

Goal: Add CEL expression support and gRPC service for AutomationPolicy.

A. Extend AutomationPolicy proto:

Modify proto/mission/automation.proto:

message AutomationPolicySpec {
  // ... existing fields ...
  string instructions = 10;  // Natural language (for future AI)
  optional string cel_expression = 11;  // CEL rule (for Week 3)
  NotificationPreference notification_pref = 20;
  repeated AllowedAction allowed_actions = 21;
  string ai_guidance = 30;
}

// Add gRPC service
service AutomationPolicyService {
  rpc CreateAutomationPolicy(CreateAutomationPolicyRequest) returns (AutomationPolicySpec);
  rpc GetAutomationPolicy(GetByIDRequest) returns (AutomationPolicySpec);
  rpc ListAutomationPolicies(ListAutomationPoliciesRequest) returns (ListAutomationPoliciesResponse);
  rpc UpdateAutomationPolicy(AutomationPolicySpec) returns (AutomationPolicySpec);
  rpc DeleteAutomationPolicy(GetByIDRequest) returns (google.protobuf.Empty);
}

message CreateAutomationPolicyRequest {
  bytes mission_id = 1;
  string cel_expression = 2;  // For Week 3 simple rules
  NotificationPreference notification_pref = 3;
}

message ListAutomationPoliciesRequest {
  optional bytes mission_id = 1;
  optional bytes customer_id = 2;
  int32 page = 10;
  int32 page_size = 11;
}

message ListAutomationPoliciesResponse {
  repeated AutomationPolicySpec policies = 1;
  int32 total_count = 2;
}

B. Regenerate proto:

cd cloud && make proto-build

C. Update Ent schema:

Modify cloud/mission_service/ent/schema/automation.go:

field.String("cel_expression").Optional(),

D. Regenerate Ent:

cd cloud/mission_service && go generate ./...

This allows:

  • Week 3: Use cel_expression for simple "if X then Y" rules
  • Future: Use instructions for AI-driven complex automation
  • Validation: Require at least one of cel_expression OR instructions

Phase 1: Copy Core Alerting Logic to Mission Service

Goal: Bring proven CEL engine, SMS/Email delivery from alerts service into mission service.

Files to create in cloud/mission_service/:

  1. automation_processor.go (adapted from alerts event_processor.go)

    • CEL evaluation logic
    • Rule matching against events
    • Action execution orchestration
    • Transform AnnotatedImage → internal event format
  2. cel_engine.go (copied from alerts)

    • CEL environment initialization
    • Expression compilation and caching
    • Evaluation with context
  3. sms_delivery.go (new, following alerts pattern)

    • Twilio SMS integration
    • Template rendering
  4. email_delivery.go (new, following alerts pattern)

    • SendGrid email integration
    • Template rendering

Key adaptations:

  • Extend AutomationPolicy schema to include optional cel_expression field
  • Keep instructions for future AI use
  • For Week 3: populate cel_expression, leave instructions empty
  • Use mission_timeline for action history (RULE_TRIGGERED, ACTION_EXECUTED events)
  • Keep CEL engine setup identical
  • Adapt NotificationPreference to drive SMS/Email directly

Phase 2: Detection Event Integration

File: Create cloud/mission_service/detection_consumer.go

Initialize NATS in mission service:

First, ensure mission service has NATS connection in cmd/server/main.go:

// Connect to NATS
nc, err := nats.Connect(natsURL)
if err != nil {
    log.Fatalf("Failed to connect to NATS: %v", err)
}
defer nc.Close()

// Start automation processor
processor := NewAutomationProcessor(entClient, nc)
if err := StartDetectionConsumer(context.Background(), nc, processor); err != nil {
    log.Fatalf("Failed to start detection consumer: %v", err)
}

Add NATS consumer for DETECTIONS stream:

func StartMissionRuleConsumer(ctx context.Context, nc *nats.Conn, processor *MissionRuleProcessor) error {
    js, _ := nc.JetStream()

    // Subscribe to DETECTIONS stream
    consumer, err := js.CreateOrUpdateConsumer(ctx, "DETECTIONS", jetstream.ConsumerConfig{
        Name:           "mission-rule-processor",
        FilterSubjects: []string{"us-west.*.*.*.objects.detected.annotated_frame"},
        DeliverPolicy:  jetstream.DeliverNewPolicy,
        AckPolicy:      jetstream.AckExplicitPolicy,
    })
    if err != nil {
        return fmt.Errorf("failed to create consumer: %w", err)
    }

    // Process detection messages
    go func() {
        for {
            batch, err := consumer.Fetch(10, jetstream.FetchMaxWait(30*time.Second))
            if err != nil {
                log.Printf("Failed to fetch detection messages: %v", err)
                time.Sleep(5 * time.Second)
                continue
            }

            for msg := range batch.Messages() {
                if err := processDetectionForMissionRules(processor, msg); err != nil {
                    log.Printf("Failed to process detection: %v", err)
                }
            }
        }
    }()

    log.Println("Mission rule consumer started")
    return nil
}

func processDetectionForMissionRules(processor *MissionRuleProcessor, msg jetstream.Msg) error {
    var annotated pb.AnnotatedImage
    if err := proto.Unmarshal(msg.Data(), &annotated); err != nil {
        log.Printf("Failed to unmarshal AnnotatedImage: %v", err)
        msg.Ack()
        return err
    }

    // Transform to internal event format
    event := processor.TransformDetectionToEvent(&annotated, msg.Subject())

    // Process through mission rules
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()

    if err := processor.ProcessEvent(ctx, event); err != nil {
        log.Printf("Failed to process event: %v", err)
        msg.Nak()
        return err
    }

    msg.Ack()
    return nil
}

Transformation function in mission_rule_processor.go:

func (p *MissionRuleProcessor) TransformDetectionToEvent(
    img *pb.AnnotatedImage, subject string) *MissionEvent {

    parts := strings.Split(subject, ".")

    var label string
    facts := map[string]interface{}{
        "object_count": len(img.Objects),
        "person_count": len(img.Person),
    }

    if len(img.Objects) > 0 {
        primary := img.Objects[0]
        for _, obj := range img.Objects {
            if obj.Confidence > primary.Confidence {
                primary = obj
            }
        }
        label = strings.ToLower(pb.ObjectClass_name[int32(primary.ObjectClass)])
        facts["confidence"] = primary.Confidence
        facts["object_class"] = label
    }

    return &MissionEvent{
        EventId:    uuid.New().String(),
        Region:     parts[0],
        CustomerId: parts[1],
        FacilityId: parts[2],
        HardwareId: parts[3],
        Label:      label,
        Facts:      facts,
        OccurredAt: img.CapturedAtMs,
    }
}

Phase 3: SMS & Email Delivery in Mission Service

Create in cloud/mission_service/:

1. sms_delivery.go:

type TwilioSMSService struct {
    client    *twilio.RestClient
    fromPhone string
}

func (s *TwilioSMSService) SendSMS(to, message string) (messageID string, err error) {
    params := &twilioApi.CreateMessageParams{}
    params.SetTo(to)
    params.SetFrom(s.fromPhone)
    params.SetBody(message)

    resp, err := s.client.Api.CreateMessage(params)
    return *resp.Sid, err
}

2. email_delivery.go:

type SendGridEmailService struct {
    client *sendgrid.Client
    fromEmail string
}

func (s *SendGridEmailService) SendEmail(to, subject, body string) (messageID string, err error) {
    from := mail.NewEmail("SkyDaddy Alerts", s.fromEmail)
    message := mail.NewSingleEmail(from, subject, mail.NewEmail("", to), body, body)
    response, err := s.client.Send(message)
    return response.Headers["X-Message-Id"][0], err
}

Environment variables required:

TWILIO_ACCOUNT_SID=ACxxxxx
TWILIO_AUTH_TOKEN=xxxxx
TWILIO_FROM_PHONE=+1234567890
SENDGRID_API_KEY=SG.xxxxx
[email protected]

Go dependencies:

go get github.com/twilio/twilio-go
go get github.com/sendgrid/sendgrid-go

Phase 4: Mission Rule Execution History

Two options:

Option A: Use existing mission_timeline table

  • Add new timeline entry type: RULE_TRIGGERED, RULE_ACTION_EXECUTED
  • Store rule evaluation results in timeline
  • Simplest approach, reuses existing infrastructure

Option B: Create new tables in mission service

Create: cloud/mission_service/sql/queries/mission_rule_instances.sql

-- name: CreateAlertInstance :one
INSERT INTO alert_instances (
    id, alert_id, definition_id, rule_id,
    trigger_event_id, matched_topic, matched_pattern,
    trigger_data, cel_context, cel_result,
    status, delivery_attempts, triggered_at, updated_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
RETURNING *;

-- name: ListAlertInstances :many
SELECT * FROM alert_instances
WHERE (sqlc.narg('alert_id')::uuid IS NULL OR alert_id = sqlc.narg('alert_id'))
  AND (sqlc.narg('status')::int IS NULL OR status = sqlc.narg('status'))
ORDER BY triggered_at DESC
LIMIT sqlc.arg('limit_val') OFFSET sqlc.arg('offset_val');

Create: cloud/sql/queries/alert_deliveries.sql

-- name: CreateAlertDelivery :one
INSERT INTO alert_deliveries (
    id, instance_id, channel_id, status,
    message_sent, error_message, external_id,
    template_result, template_context,
    attempted_at, completed_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
RETURNING *;

Update: cloud/alerts/event_processor.go function handleTriggeredRules (lines 229-258)

  • Create alert_instance record when rule triggers
  • Create alert_delivery record for each action executed
  • Store Twilio SID or SendGrid message ID in external_id field
  • Update status: PENDING → DELIVERED or FAILED

Regenerate sqlc:

cd cloud && sqlc generate

Recommendation: Use Option A (mission_timeline) for Week 3 simplicity.


Phase 5: UI Components (Priority: MEDIUM)

Week 3 Constraint: ONE condition per mission, ONE action per mission.

Create: ui/src/routes/missions/rules.create.tsx

Simple form with:

  • Single dropdown: Condition type (person_detected, vehicle_detected, confidence_threshold)
  • Single dropdown: Action type (send_sms, send_email)
  • Text inputs: Phone/email, message template
  • No "Add" buttons - enforces one condition, one action

Example condition → CEL mapping:

  • Person detected → label == 'person'
  • Vehicle detected → label == 'car' || label == 'truck'
  • High confidence → facts.confidence > 0.9

Create: ui/src/routes/alerts/history.tsx

Table showing:

  • Triggered timestamp
  • Alert name
  • Trigger event (what was detected)
  • Status badge (PENDING, DELIVERED, FAILED)
  • Deliveries count
  • Click row → expand to show delivery details

Query alert instances:

const { data } = useQuery({
  queryKey: ["alert-instances"],
  queryFn: () => alertClient.listAlertInstances({
    page: 0,
    pageSize: 50,
  }),
});

Existing mission timeline: ui/src/routes/missions.detail.$id.tsx likely already shows execution history via StreamMissionTimeline RPC.


Phase 6: CEL Rule Engine Setup in Mission Service

Copy CEL initialization from alerts service:

File: cloud/mission_service/cel_engine.go

import (
    "github.com/google/cel-go/cel"
    "github.com/google/cel-go/common/types"
)

func NewCELEngine() (*cel.Env, error) {
    return cel.NewEnv(
        cel.Variable("event", cel.MapType(cel.StringType, cel.DynType)),
        cel.Variable("label", cel.StringType),
        cel.Variable("facts", cel.MapType(cel.StringType, cel.DynType)),
        cel.Variable("confidence", cel.DoubleType),
        cel.Variable("object_class", cel.StringType),
    )
}

func EvaluateCEL(env *cel.Env, expression string, context map[string]interface{}) (bool, error) {
    ast, issues := env.Compile(expression)
    if issues != nil && issues.Err() != nil {
        return false, issues.Err()
    }

    prg, err := env.Program(ast)
    if err != nil {
        return false, err
    }

    result, _, err := prg.Eval(context)
    if err != nil {
        return false, err
    }

    return result.Value().(bool), nil
}

Phase 7: Backend Validation (Priority: LOW)

File: cloud/mission_service/service.go (in CreateMissionRule function)

Add validation:

func validateWeek3Constraints(req *pb.CreateAlertDefinitionRequest) error {
    if len(req.Rules) > 1 {
        return status.Error(codes.InvalidArgument,
            "Week 3 limitation: only one rule per alert definition")
    }

    for _, rule := range req.Rules {
        if len(rule.Actions) > 1 {
            return status.Error(codes.InvalidArgument,
                "Week 3 limitation: only one action per rule")
        }
    }

    return nil
}

This prevents power users from bypassing UI constraints via API.


Integration Testing

End-to-end test flow:

  1. Start all services:

    cd cloud && make dev-up
    cd detection_bridge && go run main.go
    cd object-detector && docker compose up -d
  2. Send test image:

    cd object-detector
    python clients_subscribers/nats_client.py -f test_images/person.jpg
  3. Verify detection bridge logs:

    Subscribed to: *.*.*.*.objects.detected.annotated_frame
    Received AnnotatedImage with 1 person detected
    Transformed to TriggerEvent: label=person, confidence=0.95
    Published to: us-west.cust123.fac456.cam001.alerts.trigger.person
    
  4. Verify alerts service logs:

    Received TriggerEvent on subject: alerts.trigger.person
    Matched alert definition: "Person Detection Rule"
    CEL rule 'Detect Person' evaluated to: true
    Executing SMS action to +15555551234
    SMS sent successfully: Twilio SID SM12345678
    
  5. Check database:

    SELECT ai.id, ai.triggered_at, ai.status, ad.message_sent, ad.external_id
    FROM alert_instances ai
    JOIN alert_deliveries ad ON ad.instance_id = ai.id
    ORDER BY ai.triggered_at DESC LIMIT 5;
  6. Verify UI:

    • Navigate to /alerts/history
    • Confirm alert instance appears with "Delivered" status
    • Click row to expand and see Twilio SID

Critical Files Summary

Must create in cloud/mission_service/:

  1. automation_processor.go - CEL evaluation and rule processing (adapted from alerts event_processor.go)
  2. detection_consumer.go - NATS consumer for DETECTIONS stream
  3. cel_engine.go - CEL environment setup (copied from alerts)
  4. sms_delivery.go - Twilio integration (new implementation)
  5. email_delivery.go - SendGrid integration (new implementation)
  6. automation_service.go - gRPC service for AutomationPolicy CRUD operations

Must create in ui/:

  1. ui/src/routes/missions/automation/create.tsx - Create automation rule UI
  2. ui/src/routes/missions/automation/list.tsx - List automation rules UI
  3. ui/src/routes/missions/history.tsx - Mission action execution history (may already exist)

Must modify:

  1. proto/mission/automation.proto - Add cel_expression field and AutomationPolicyService gRPC
  2. cloud/mission_service/ent/schema/automation.go - Add cel_expression optional field
  3. cloud/mission_service/cmd/server/main.go - Initialize NATS, start detection consumer
  4. cloud/mission_service/service.go - Register AutomationPolicyService gRPC endpoints
  5. cloud/mission_service/go.mod - Add dependencies: google/cel-go, twilio-go, sendgrid-go, nats.go

Code to reference/adapt from alerts service:

  1. cloud/alerts/event_processor.go → CEL evaluation patterns and logic structure
  2. cloud/alerts/alerting.go → NATS consumer setup and message handling
  3. Alerts CEL environment setup → Variable declarations and evaluation
  4. Template rendering logic → Message template processing

Mission service advantages over alerts:

  1. ✅ Conceptually correct: automation belongs with missions
  2. ✅ AutomationPolicy schema already exists with notification preferences
  3. ✅ Mission timeline already tracks mission events (extend for rule execution)
  4. ✅ Mission context: can access mission_id, device_id, facility_id naturally
  5. ✅ Future-ready: instructions field enables AI-driven automation later

Why this works for Week 3:

  • Simple CEL expressions in cel_expression field
  • NotificationPreference already has channel configs and cooldown
  • AllowedAction enum already includes NOTIFY type
  • Mission timeline records RULE_TRIGGERED and ACTION_EXECUTED events
  • Clean path to AI when ready (just populate instructions field instead)

Risk Mitigation

Risk: High detection event volume overwhelms system

  • Mitigation: Rate limit in detection bridge (max 10 events/sec per camera)
  • Use NATS consumer with manual acks for backpressure

Risk: Twilio/SendGrid API failures

  • Mitigation: Retry logic with exponential backoff (3 attempts)
  • Store error messages in alert_deliveries.error_message field

Risk: Alert fatigue (too many notifications)

  • Short-term: Users can disable rules via UI toggle
  • Week 4: Add cooldown periods (e.g., max 1 SMS per hour)

Architecture Validation

Why Mission Service is Correct:

  1. Conceptual fit: AutomationPolicy is designed for mission automation

    • Already has mission_id, device_id, facility_id relationships
    • NotificationPreference schema matches Week 3 needs exactly
    • Timeline tracking fits mission execution history
  2. Schema flexibility: AutomationPolicy supports both approaches

    • instructions field → Future AI-driven automation
    • cel_expression field → Week 3 simple rules
    • Both can coexist, use what fits the use case
  3. Proven patterns from alerts: Copy working code without rewriting

    • CEL engine setup (variable declarations, compilation, caching)
    • NATS consumer patterns (batch fetching, ack handling)
    • Template rendering for SMS/Email messages
  4. Natural mission integration:

    • Rules scoped to specific missions (mission_id)
    • Action history in mission timeline (RULE_TRIGGERED, ACTION_EXECUTED)
    • UI naturally fits in missions section
  5. Future-ready architecture:

    • Week 3: Simple CEL rules (if person detected, send SMS)
    • Future: Complex AI automation (natural language instructions → action plans)
    • Both use same AutomationPolicy entity, different fields

This is the way: Mission service owns automation, leverages proven alerts code.


Verification Checklist

Before marking Week 3 complete:

  • AutomationPolicy can be created with cel_expression
  • Mission service consumes DETECTIONS stream
  • CEL rules evaluate correctly (test: label == 'person')
  • SMS delivery works (test with real phone number)
  • Email delivery works (test with real email)
  • NotificationPreference cooldown respected
  • Mission timeline shows RULE_TRIGGERED and ACTION_EXECUTED events
  • UI shows automation rule creation form (one condition, one action)
  • UI shows automation rule list filtered by mission_id
  • UI shows execution history in mission timeline
  • End-to-end test: image → detection → CEL evaluation → SMS received
  • Verify no conflicts with future AI instructions field
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment