Recommendation: Enhance Mission Service with CEL-based rule evaluation by incorporating proven code from Alerts Service.
Key Insight: Mission service is the correct home for mission automation. We'll add CEL evaluation capability, SMS/Email delivery, and event processing to mission service by copying proven implementations from alerts service.
Schema Approach: Extend AutomationPolicy to support both CEL expressions (Week 3) and natural language instructions (future AI). This allows simple rules now while preserving the path to AI-driven automation.
Alerts Service (cloud/alerts/):
- CEL rule engine fully operational with flexible JSON evaluation
- NATS JetStream integration complete
- PostgreSQL schema complete (6 tables)
- gRPC APIs for alert definition CRUD
- Event routing and rule matching logic
- Database tables exist:
alert_instances,alert_deliveries
Object Detector (object-detector/):
- Publishes to:
*.*.*.*.objects.detected.annotated_frame(DETECTIONS stream) - Supports: 80 YOLO classes, face recognition, pose estimation
- Message format:
AnnotatedImageprotobuf with confidence scores
Mission Service (cloud/mission_service/):
- Timeline tracking operational (history already works!)
- Can view mission execution history via
StreamMissionTimelineRPC
- Mission service needs detection event consumption (subscribe to DETECTIONS stream)
- Mission service needs CEL evaluation engine (copy from alerts service)
- Mission service needs SMS/Email delivery (copy from alerts service)
- Mission service needs action history tracking (use mission_timeline or new tables)
- No UI components for creating rules or viewing alert history
Goal: Add CEL expression support and gRPC service for AutomationPolicy.
A. Extend AutomationPolicy proto:
Modify proto/mission/automation.proto:
message AutomationPolicySpec {
// ... existing fields ...
string instructions = 10; // Natural language (for future AI)
optional string cel_expression = 11; // CEL rule (for Week 3)
NotificationPreference notification_pref = 20;
repeated AllowedAction allowed_actions = 21;
string ai_guidance = 30;
}
// Add gRPC service
service AutomationPolicyService {
rpc CreateAutomationPolicy(CreateAutomationPolicyRequest) returns (AutomationPolicySpec);
rpc GetAutomationPolicy(GetByIDRequest) returns (AutomationPolicySpec);
rpc ListAutomationPolicies(ListAutomationPoliciesRequest) returns (ListAutomationPoliciesResponse);
rpc UpdateAutomationPolicy(AutomationPolicySpec) returns (AutomationPolicySpec);
rpc DeleteAutomationPolicy(GetByIDRequest) returns (google.protobuf.Empty);
}
message CreateAutomationPolicyRequest {
bytes mission_id = 1;
string cel_expression = 2; // For Week 3 simple rules
NotificationPreference notification_pref = 3;
}
message ListAutomationPoliciesRequest {
optional bytes mission_id = 1;
optional bytes customer_id = 2;
int32 page = 10;
int32 page_size = 11;
}
message ListAutomationPoliciesResponse {
repeated AutomationPolicySpec policies = 1;
int32 total_count = 2;
}B. Regenerate proto:
cd cloud && make proto-buildC. Update Ent schema:
Modify cloud/mission_service/ent/schema/automation.go:
field.String("cel_expression").Optional(),D. Regenerate Ent:
cd cloud/mission_service && go generate ./...This allows:
- Week 3: Use cel_expression for simple "if X then Y" rules
- Future: Use instructions for AI-driven complex automation
- Validation: Require at least one of cel_expression OR instructions
Goal: Bring proven CEL engine, SMS/Email delivery from alerts service into mission service.
Files to create in cloud/mission_service/:
-
automation_processor.go(adapted from alerts event_processor.go)- CEL evaluation logic
- Rule matching against events
- Action execution orchestration
- Transform AnnotatedImage → internal event format
-
cel_engine.go(copied from alerts)- CEL environment initialization
- Expression compilation and caching
- Evaluation with context
-
sms_delivery.go(new, following alerts pattern)- Twilio SMS integration
- Template rendering
-
email_delivery.go(new, following alerts pattern)- SendGrid email integration
- Template rendering
Key adaptations:
- Extend AutomationPolicy schema to include optional
cel_expressionfield - Keep
instructionsfor future AI use - For Week 3: populate cel_expression, leave instructions empty
- Use mission_timeline for action history (RULE_TRIGGERED, ACTION_EXECUTED events)
- Keep CEL engine setup identical
- Adapt NotificationPreference to drive SMS/Email directly
File: Create cloud/mission_service/detection_consumer.go
Initialize NATS in mission service:
First, ensure mission service has NATS connection in cmd/server/main.go:
// Connect to NATS
nc, err := nats.Connect(natsURL)
if err != nil {
log.Fatalf("Failed to connect to NATS: %v", err)
}
defer nc.Close()
// Start automation processor
processor := NewAutomationProcessor(entClient, nc)
if err := StartDetectionConsumer(context.Background(), nc, processor); err != nil {
log.Fatalf("Failed to start detection consumer: %v", err)
}Add NATS consumer for DETECTIONS stream:
func StartMissionRuleConsumer(ctx context.Context, nc *nats.Conn, processor *MissionRuleProcessor) error {
js, _ := nc.JetStream()
// Subscribe to DETECTIONS stream
consumer, err := js.CreateOrUpdateConsumer(ctx, "DETECTIONS", jetstream.ConsumerConfig{
Name: "mission-rule-processor",
FilterSubjects: []string{"us-west.*.*.*.objects.detected.annotated_frame"},
DeliverPolicy: jetstream.DeliverNewPolicy,
AckPolicy: jetstream.AckExplicitPolicy,
})
if err != nil {
return fmt.Errorf("failed to create consumer: %w", err)
}
// Process detection messages
go func() {
for {
batch, err := consumer.Fetch(10, jetstream.FetchMaxWait(30*time.Second))
if err != nil {
log.Printf("Failed to fetch detection messages: %v", err)
time.Sleep(5 * time.Second)
continue
}
for msg := range batch.Messages() {
if err := processDetectionForMissionRules(processor, msg); err != nil {
log.Printf("Failed to process detection: %v", err)
}
}
}
}()
log.Println("Mission rule consumer started")
return nil
}
func processDetectionForMissionRules(processor *MissionRuleProcessor, msg jetstream.Msg) error {
var annotated pb.AnnotatedImage
if err := proto.Unmarshal(msg.Data(), &annotated); err != nil {
log.Printf("Failed to unmarshal AnnotatedImage: %v", err)
msg.Ack()
return err
}
// Transform to internal event format
event := processor.TransformDetectionToEvent(&annotated, msg.Subject())
// Process through mission rules
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := processor.ProcessEvent(ctx, event); err != nil {
log.Printf("Failed to process event: %v", err)
msg.Nak()
return err
}
msg.Ack()
return nil
}Transformation function in mission_rule_processor.go:
func (p *MissionRuleProcessor) TransformDetectionToEvent(
img *pb.AnnotatedImage, subject string) *MissionEvent {
parts := strings.Split(subject, ".")
var label string
facts := map[string]interface{}{
"object_count": len(img.Objects),
"person_count": len(img.Person),
}
if len(img.Objects) > 0 {
primary := img.Objects[0]
for _, obj := range img.Objects {
if obj.Confidence > primary.Confidence {
primary = obj
}
}
label = strings.ToLower(pb.ObjectClass_name[int32(primary.ObjectClass)])
facts["confidence"] = primary.Confidence
facts["object_class"] = label
}
return &MissionEvent{
EventId: uuid.New().String(),
Region: parts[0],
CustomerId: parts[1],
FacilityId: parts[2],
HardwareId: parts[3],
Label: label,
Facts: facts,
OccurredAt: img.CapturedAtMs,
}
}Create in cloud/mission_service/:
1. sms_delivery.go:
type TwilioSMSService struct {
client *twilio.RestClient
fromPhone string
}
func (s *TwilioSMSService) SendSMS(to, message string) (messageID string, err error) {
params := &twilioApi.CreateMessageParams{}
params.SetTo(to)
params.SetFrom(s.fromPhone)
params.SetBody(message)
resp, err := s.client.Api.CreateMessage(params)
return *resp.Sid, err
}2. email_delivery.go:
type SendGridEmailService struct {
client *sendgrid.Client
fromEmail string
}
func (s *SendGridEmailService) SendEmail(to, subject, body string) (messageID string, err error) {
from := mail.NewEmail("SkyDaddy Alerts", s.fromEmail)
message := mail.NewSingleEmail(from, subject, mail.NewEmail("", to), body, body)
response, err := s.client.Send(message)
return response.Headers["X-Message-Id"][0], err
}Environment variables required:
TWILIO_ACCOUNT_SID=ACxxxxx
TWILIO_AUTH_TOKEN=xxxxx
TWILIO_FROM_PHONE=+1234567890
SENDGRID_API_KEY=SG.xxxxx
[email protected]Go dependencies:
go get github.com/twilio/twilio-go
go get github.com/sendgrid/sendgrid-goTwo options:
Option A: Use existing mission_timeline table
- Add new timeline entry type: RULE_TRIGGERED, RULE_ACTION_EXECUTED
- Store rule evaluation results in timeline
- Simplest approach, reuses existing infrastructure
Option B: Create new tables in mission service
Create: cloud/mission_service/sql/queries/mission_rule_instances.sql
-- name: CreateAlertInstance :one
INSERT INTO alert_instances (
id, alert_id, definition_id, rule_id,
trigger_event_id, matched_topic, matched_pattern,
trigger_data, cel_context, cel_result,
status, delivery_attempts, triggered_at, updated_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
RETURNING *;
-- name: ListAlertInstances :many
SELECT * FROM alert_instances
WHERE (sqlc.narg('alert_id')::uuid IS NULL OR alert_id = sqlc.narg('alert_id'))
AND (sqlc.narg('status')::int IS NULL OR status = sqlc.narg('status'))
ORDER BY triggered_at DESC
LIMIT sqlc.arg('limit_val') OFFSET sqlc.arg('offset_val');Create: cloud/sql/queries/alert_deliveries.sql
-- name: CreateAlertDelivery :one
INSERT INTO alert_deliveries (
id, instance_id, channel_id, status,
message_sent, error_message, external_id,
template_result, template_context,
attempted_at, completed_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
RETURNING *;Update: cloud/alerts/event_processor.go function handleTriggeredRules (lines 229-258)
- Create alert_instance record when rule triggers
- Create alert_delivery record for each action executed
- Store Twilio SID or SendGrid message ID in
external_idfield - Update status: PENDING → DELIVERED or FAILED
Regenerate sqlc:
cd cloud && sqlc generateRecommendation: Use Option A (mission_timeline) for Week 3 simplicity.
Week 3 Constraint: ONE condition per mission, ONE action per mission.
Create: ui/src/routes/missions/rules.create.tsx
Simple form with:
- Single dropdown: Condition type (person_detected, vehicle_detected, confidence_threshold)
- Single dropdown: Action type (send_sms, send_email)
- Text inputs: Phone/email, message template
- No "Add" buttons - enforces one condition, one action
Example condition → CEL mapping:
- Person detected →
label == 'person' - Vehicle detected →
label == 'car' || label == 'truck' - High confidence →
facts.confidence > 0.9
Create: ui/src/routes/alerts/history.tsx
Table showing:
- Triggered timestamp
- Alert name
- Trigger event (what was detected)
- Status badge (PENDING, DELIVERED, FAILED)
- Deliveries count
- Click row → expand to show delivery details
Query alert instances:
const { data } = useQuery({
queryKey: ["alert-instances"],
queryFn: () => alertClient.listAlertInstances({
page: 0,
pageSize: 50,
}),
});Existing mission timeline: ui/src/routes/missions.detail.$id.tsx likely already shows execution history via StreamMissionTimeline RPC.
Copy CEL initialization from alerts service:
File: cloud/mission_service/cel_engine.go
import (
"github.com/google/cel-go/cel"
"github.com/google/cel-go/common/types"
)
func NewCELEngine() (*cel.Env, error) {
return cel.NewEnv(
cel.Variable("event", cel.MapType(cel.StringType, cel.DynType)),
cel.Variable("label", cel.StringType),
cel.Variable("facts", cel.MapType(cel.StringType, cel.DynType)),
cel.Variable("confidence", cel.DoubleType),
cel.Variable("object_class", cel.StringType),
)
}
func EvaluateCEL(env *cel.Env, expression string, context map[string]interface{}) (bool, error) {
ast, issues := env.Compile(expression)
if issues != nil && issues.Err() != nil {
return false, issues.Err()
}
prg, err := env.Program(ast)
if err != nil {
return false, err
}
result, _, err := prg.Eval(context)
if err != nil {
return false, err
}
return result.Value().(bool), nil
}File: cloud/mission_service/service.go (in CreateMissionRule function)
Add validation:
func validateWeek3Constraints(req *pb.CreateAlertDefinitionRequest) error {
if len(req.Rules) > 1 {
return status.Error(codes.InvalidArgument,
"Week 3 limitation: only one rule per alert definition")
}
for _, rule := range req.Rules {
if len(rule.Actions) > 1 {
return status.Error(codes.InvalidArgument,
"Week 3 limitation: only one action per rule")
}
}
return nil
}This prevents power users from bypassing UI constraints via API.
End-to-end test flow:
-
Start all services:
cd cloud && make dev-up cd detection_bridge && go run main.go cd object-detector && docker compose up -d
-
Send test image:
cd object-detector python clients_subscribers/nats_client.py -f test_images/person.jpg -
Verify detection bridge logs:
Subscribed to: *.*.*.*.objects.detected.annotated_frame Received AnnotatedImage with 1 person detected Transformed to TriggerEvent: label=person, confidence=0.95 Published to: us-west.cust123.fac456.cam001.alerts.trigger.person -
Verify alerts service logs:
Received TriggerEvent on subject: alerts.trigger.person Matched alert definition: "Person Detection Rule" CEL rule 'Detect Person' evaluated to: true Executing SMS action to +15555551234 SMS sent successfully: Twilio SID SM12345678 -
Check database:
SELECT ai.id, ai.triggered_at, ai.status, ad.message_sent, ad.external_id FROM alert_instances ai JOIN alert_deliveries ad ON ad.instance_id = ai.id ORDER BY ai.triggered_at DESC LIMIT 5;
-
Verify UI:
- Navigate to
/alerts/history - Confirm alert instance appears with "Delivered" status
- Click row to expand and see Twilio SID
- Navigate to
Must create in cloud/mission_service/:
automation_processor.go- CEL evaluation and rule processing (adapted from alerts event_processor.go)detection_consumer.go- NATS consumer for DETECTIONS streamcel_engine.go- CEL environment setup (copied from alerts)sms_delivery.go- Twilio integration (new implementation)email_delivery.go- SendGrid integration (new implementation)automation_service.go- gRPC service for AutomationPolicy CRUD operations
Must create in ui/:
ui/src/routes/missions/automation/create.tsx- Create automation rule UIui/src/routes/missions/automation/list.tsx- List automation rules UIui/src/routes/missions/history.tsx- Mission action execution history (may already exist)
Must modify:
proto/mission/automation.proto- Add cel_expression field and AutomationPolicyService gRPCcloud/mission_service/ent/schema/automation.go- Addcel_expressionoptional fieldcloud/mission_service/cmd/server/main.go- Initialize NATS, start detection consumercloud/mission_service/service.go- Register AutomationPolicyService gRPC endpointscloud/mission_service/go.mod- Add dependencies: google/cel-go, twilio-go, sendgrid-go, nats.go
Code to reference/adapt from alerts service:
cloud/alerts/event_processor.go→ CEL evaluation patterns and logic structurecloud/alerts/alerting.go→ NATS consumer setup and message handling- Alerts CEL environment setup → Variable declarations and evaluation
- Template rendering logic → Message template processing
Mission service advantages over alerts:
- ✅ Conceptually correct: automation belongs with missions
- ✅ AutomationPolicy schema already exists with notification preferences
- ✅ Mission timeline already tracks mission events (extend for rule execution)
- ✅ Mission context: can access mission_id, device_id, facility_id naturally
- ✅ Future-ready: instructions field enables AI-driven automation later
Why this works for Week 3:
- Simple CEL expressions in cel_expression field
- NotificationPreference already has channel configs and cooldown
- AllowedAction enum already includes NOTIFY type
- Mission timeline records RULE_TRIGGERED and ACTION_EXECUTED events
- Clean path to AI when ready (just populate instructions field instead)
Risk: High detection event volume overwhelms system
- Mitigation: Rate limit in detection bridge (max 10 events/sec per camera)
- Use NATS consumer with manual acks for backpressure
Risk: Twilio/SendGrid API failures
- Mitigation: Retry logic with exponential backoff (3 attempts)
- Store error messages in alert_deliveries.error_message field
Risk: Alert fatigue (too many notifications)
- Short-term: Users can disable rules via UI toggle
- Week 4: Add cooldown periods (e.g., max 1 SMS per hour)
Why Mission Service is Correct:
-
Conceptual fit: AutomationPolicy is designed for mission automation
- Already has mission_id, device_id, facility_id relationships
- NotificationPreference schema matches Week 3 needs exactly
- Timeline tracking fits mission execution history
-
Schema flexibility: AutomationPolicy supports both approaches
instructionsfield → Future AI-driven automationcel_expressionfield → Week 3 simple rules- Both can coexist, use what fits the use case
-
Proven patterns from alerts: Copy working code without rewriting
- CEL engine setup (variable declarations, compilation, caching)
- NATS consumer patterns (batch fetching, ack handling)
- Template rendering for SMS/Email messages
-
Natural mission integration:
- Rules scoped to specific missions (mission_id)
- Action history in mission timeline (RULE_TRIGGERED, ACTION_EXECUTED)
- UI naturally fits in missions section
-
Future-ready architecture:
- Week 3: Simple CEL rules (if person detected, send SMS)
- Future: Complex AI automation (natural language instructions → action plans)
- Both use same AutomationPolicy entity, different fields
This is the way: Mission service owns automation, leverages proven alerts code.
Before marking Week 3 complete:
- AutomationPolicy can be created with cel_expression
- Mission service consumes DETECTIONS stream
- CEL rules evaluate correctly (test: label == 'person')
- SMS delivery works (test with real phone number)
- Email delivery works (test with real email)
- NotificationPreference cooldown respected
- Mission timeline shows RULE_TRIGGERED and ACTION_EXECUTED events
- UI shows automation rule creation form (one condition, one action)
- UI shows automation rule list filtered by mission_id
- UI shows execution history in mission timeline
- End-to-end test: image → detection → CEL evaluation → SMS received
- Verify no conflicts with future AI instructions field