Skip to content

Instantly share code, notes, and snippets.

@enachb
Last active February 3, 2026 23:45
Show Gist options
  • Select an option

  • Save enachb/43db90497fd4efee23a88308f87f7e14 to your computer and use it in GitHub Desktop.

Select an option

Save enachb/43db90497fd4efee23a88308f87f7e14 to your computer and use it in GitHub Desktop.
Design: Synchronous gRPC Pipeline via Dapr Service Invocation for SenseMesh

Design: Synchronous gRPC Pipeline via Dapr Service Invocation

Summary

Replace the async NATS pub/sub chain after step 4 (detection + tracking + categorization) with synchronous gRPC calls using Dapr Service Invocation. The detection-agent becomes the orchestrator: it invokes downstream agents via gRPC, waits for responses, aggregates all enrichment results into a single AnnotatedImage, and publishes one final message per frame to NATS for persistence.

Steps 1-4 remain unchanged (NATS ingestion -> YOLO -> ByteTrack -> categorize). NATS is also retained at the output edge for persistence fan-out.


Current Architecture (Async NATS Pub/Sub)

Each agent subscribes to a NATS topic, processes the message, and publishes to the next topic. No agent knows when the full pipeline is complete for a given frame.

flowchart TD
    NATS_IN["NATS: Image.diffed.frame"] --> DET["Detection Agent<br/>(YOLO + ByteTrack)"]

    DET -->|"js.publish(person.detected)"| P_TOPIC["NATS: person.detected"]
    DET -->|"js.publish(vehicle.detected)"| V_TOPIC["NATS: vehicle.detected"]
    DET -->|"js.publish(objects.detected)"| O_TOPIC["NATS: objects.detected"]

    P_TOPIC --> REC["Recognition Agent"]
    P_TOPIC --> WPN["Weapon Detection Agent"]

    REC -->|"js.publish(person.recognized)"| PR_TOPIC["NATS: person.recognized"]
    WPN -->|"js.publish(weapon.detected)"| WD_TOPIC["NATS: weapon.detected"]

    PR_TOPIC --> PROF["Person Profiling Agent<br/>(LLM)"]
    WD_TOPIC --> WINT["Weapon Intent Agent<br/>(LLM)"]

    PROF -->|"js.publish(person.profiled)"| PP_TOPIC["NATS: person.profiled"]
    WINT -->|"js.publish(weapon.classified)"| WC_TOPIC["NATS: weapon.classified"]

    PP_TOPIC --> POSE["Pose Agent"]
    POSE -->|"js.publish(person.posed)"| POSED_TOPIC["NATS: person.posed"]

    V_TOPIC --> ALPR["ALPR Agent"]
    ALPR -->|"js.publish(vehicle.recognized)"| VR_TOPIC["NATS: vehicle.recognized"]
    VR_TOPIC --> VPROF["Vehicle Profiling Agent<br/>(LLM)"]
    VPROF -->|"js.publish(vehicle.profiled)"| VP_TOPIC["NATS: vehicle.profiled"]

    O_TOPIC --> PERSIST_O["Persistence"]
    POSED_TOPIC --> PERSIST_P["Persistence"]
    WC_TOPIC --> PERSIST_W["Persistence"]
    VP_TOPIC --> PERSIST_V["Persistence"]

    style NATS_IN fill:#2d6a4f,color:#fff
    style O_TOPIC fill:#264653,color:#fff
    style PERSIST_O fill:#6c757d,color:#fff
    style PERSIST_P fill:#6c757d,color:#fff
    style PERSIST_W fill:#6c757d,color:#fff
    style PERSIST_V fill:#6c757d,color:#fff
Loading

Problems with current approach:

  • Persistence subscribes to 10+ separate NATS topics, receiving partial data at different times
  • No way to know when all enrichment for a frame is "done"
  • Silent message drops (no error propagation back to detection-agent)
  • Difficult to correlate/merge person, vehicle, weapon, and object data per frame

Proposed Architecture (Synchronous gRPC via Dapr)

The detection-agent orchestrates all downstream processing synchronously. Each agent exposes a gRPC (or HTTP) endpoint invoked via Dapr Service Invocation. Results flow back to the caller.

flowchart TD
    NATS_IN["NATS: Image.diffed.frame"] --> DET["Detection Agent<br/>(YOLO + ByteTrack)<br/>== ORCHESTRATOR =="]

    DET -->|"gRPC invoke<br/>(parallel)"| REC["Recognition Agent"]
    DET -->|"gRPC invoke<br/>(parallel)"| WPN["Weapon Detection Agent"]
    DET -->|"gRPC invoke<br/>(parallel)"| ALPR["ALPR Agent"]

    REC -->|"return enriched persons"| DET
    WPN -->|"return weapons"| DET

    REC -.->|"gRPC invoke"| PROF["Person Profiling<br/>(LLM)"]
    PROF -.->|"return profiles"| REC
    REC -.->|"gRPC invoke"| POSE["Pose Agent"]
    POSE -.->|"return poses"| REC

    WPN -.->|"gRPC invoke"| WINT["Weapon Intent<br/>(LLM)"]
    WINT -.->|"return classifications"| WPN

    ALPR -->|"return plates"| DET
    ALPR -.->|"gRPC invoke"| VPROF["Vehicle Profiling<br/>(LLM)"]
    VPROF -.->|"return profiles"| ALPR

    DET -->|"js.publish<br/>(single aggregated message)"| FINAL["NATS: frame.enriched.annotated_frame"]
    FINAL --> PERSIST["Persistence Service"]

    style NATS_IN fill:#2d6a4f,color:#fff
    style DET fill:#e76f51,color:#fff
    style FINAL fill:#264653,color:#fff
    style PERSIST fill:#6c757d,color:#fff
Loading

Key difference: Detection-agent fans out 3 parallel gRPC calls, each chain completes synchronously, and detection-agent aggregates all results before publishing one NATS message to persistence.


Detailed Flow

Step-by-step for a single frame

  1. Detection-agent receives Image from NATS (unchanged)

  2. Runs YOLO + ByteTrack, categorizes into persons/vehicles/objects (unchanged)

  3. Fan-out via asyncio.gather() - three parallel Dapr service invocations:

    Branch Chain Returns
    Person detection-agent -> recognition-agent -> person-profiling -> pose-agent List[Person] with face, profile, pose populated
    Weapon detection-agent -> weapon-detection -> weapon-intent List[Weapon] with alert + description
    Vehicle detection-agent -> alpr-agent -> vehicle-profiling List[Vehicle] with plates + VehicleTrack
  4. Detection-agent merges all responses into a single AnnotatedImage:

    • persons (with face, profile, pose)
    • vehicles (with plates)
    • weapons (with intent)
    • objects (unchanged, no downstream enrichment)
  5. Publishes one message to frame.enriched.annotated_frame

  6. Persistence subscribes to one topic instead of 10+

Vehicle Profiling Exception

The vehicle-profiling agent currently collects multiple frames per track before making an LLM call (5s timeout, up to 30 frames). This stateful, multi-frame behavior doesn't fit a pure request-response model.

Options:

  • Option A: Keep vehicle profiling as async NATS (hybrid). ALPR returns synchronously, vehicle profiling stays as a background NATS consumer that publishes VehicleTrack separately.
  • Option B: Move frame collection into the detection-agent or a dedicated buffer. Invoke vehicle-profiling synchronously once enough frames are collected.
  • Option C (selected): ALPR returns synchronously within the gRPC chain. Vehicle profiling remains async via NATS since it needs temporal data across frames. Persistence gets the per-frame AnnotatedImage immediately (with plate data) and VehicleTrack later when the track finalizes. Vehicle profiling runs as a single instance using Python asyncio for concurrent frame reception and LLM calls. See Appendix A for the full analysis of scaling approaches (including why Dapr Actors are not suitable) and the single-instance design.

Changes Required

1. Proto Definitions

Add gRPC service definitions to proto/api/api.proto. The message types already exist.

// New service definitions for synchronous agent invocation
service PersonEnrichmentService {
    rpc EnrichPersons (AnnotatedImage) returns (AnnotatedImage) {}
}

service WeaponEnrichmentService {
    rpc DetectWeapons (AnnotatedImage) returns (AnnotatedImage) {}
}

service VehicleEnrichmentService {
    rpc EnrichVehicles (AnnotatedImage) returns (AnnotatedImage) {}
}

// Individual agent services (called within chains)
service RecognitionService {
    rpc Recognize (AnnotatedImage) returns (AnnotatedImage) {}
}

service PoseService {
    rpc DetectPose (AnnotatedImage) returns (AnnotatedImage) {}
}

service PersonProfilingService {
    rpc Profile (AnnotatedImage) returns (AnnotatedImage) {}
}

service WeaponDetectionService {
    rpc Detect (AnnotatedImage) returns (AnnotatedImage) {}
}

service WeaponIntentService {
    rpc Classify (AnnotatedImage) returns (AnnotatedImage) {}
}

service ALPRService {
    rpc ReadPlates (AnnotatedImage) returns (AnnotatedImage) {}
}

Note: Using AnnotatedImage as both request and response keeps it simple - each agent enriches the same message and passes it along. Alternatively, define specific request/response types per agent for tighter contracts.

2. Agent Changes

Each agent needs to change from a NATS subscriber loop to a gRPC server (or HTTP server invoked by Dapr).

Agent Current New
detection-agent NATS sub -> process -> js.publish() x3 NATS sub -> process -> dapr.invoke() x3 parallel -> merge -> js.publish() x1
recognition-agent NATS sub loop, js.publish() gRPC server: Recognize(AnnotatedImage) -> AnnotatedImage. Calls person-profiling and pose via Dapr internally, or detection-agent chains them.
pose-agent NATS sub loop, js.publish() gRPC server: DetectPose(AnnotatedImage) -> AnnotatedImage
person-profiling NATS sub loop, js.publish() gRPC server: Profile(AnnotatedImage) -> AnnotatedImage
weapon-detection NATS sub loop, js.publish() gRPC server: Detect(AnnotatedImage) -> AnnotatedImage. Calls weapon-intent via Dapr internally.
weapon-intent NATS sub loop, js.publish() gRPC server: Classify(AnnotatedImage) -> AnnotatedImage
alpr-agent NATS sub loop, js.publish() gRPC server: ReadPlates(AnnotatedImage) -> AnnotatedImage
vehicle-profiling NATS sub loop (stateful, multi-frame) Keep as NATS (see Vehicle Profiling Exception above)

Per-agent implementation pattern (Python):

# Before (NATS subscriber)
async def process_message(js, msg):
    annotated_msg = parse(msg.data)
    result = do_inference(annotated_msg)
    await js.publish("next.topic", result.SerializeToString())

# After (gRPC server via grpcio or Dapr HTTP)
class RecognitionServicer(api_pb2_grpc.RecognitionServiceServicer):
    def Recognize(self, request, context):
        # request is AnnotatedImage
        enriched = do_inference(request)
        return enriched  # AnnotatedImage with face data populated

3. Detection-Agent Orchestration Logic

Replace the three js.publish() calls with parallel gRPC invocations:

# Pseudocode for detection-agent after step 4
import asyncio
from dapr.clients import DaprClient

async def orchestrate(person_data, vehicle_data, object_data, weapon_data):
    async with DaprClient() as dapr:
        # Fan out three chains in parallel
        person_future = dapr.invoke_method(
            app_id="recognition-agent",
            method_name="Recognize",
            data=person_data.SerializeToString(),
            content_type="application/protobuf"
        )
        weapon_future = dapr.invoke_method(
            app_id="weapon-detection-agent",
            method_name="Detect",
            data=person_data.SerializeToString(),  # weapons need person context
            content_type="application/protobuf"
        )
        vehicle_future = dapr.invoke_method(
            app_id="alpr-agent",
            method_name="ReadPlates",
            data=vehicle_data.SerializeToString(),
            content_type="application/protobuf"
        )

        person_result, weapon_result, vehicle_result = await asyncio.gather(
            person_future, weapon_future, vehicle_future
        )

    # Merge into single AnnotatedImage
    final = api_pb2.AnnotatedImage()
    final.persons.extend(person_result.persons)
    final.weapons.extend(weapon_result.weapons)
    final.vehicles.extend(vehicle_result.vehicles)
    final.objects.extend(object_data.objects)
    # ... copy frame_id, device_id, data, timestamps

    # Single publish to persistence
    await js.publish("frame.enriched.annotated_frame", final.SerializeToString())

4. Dapr Sidecar Configuration

Each agent needs a Dapr sidecar with an app-id. The existing docker-compose-dapr.yml already has sidecars for detection, recognition, and pose agents. Extend it for the new agents.

New sidecars needed:

  • weapon-detection-agent-dapr (app-id: weapon-detection-agent)
  • weapon-intent-agent-dapr (app-id: weapon-intent-agent)
  • person-profiling-agent-dapr (app-id: person-profiling-agent)
  • alpr-agent-dapr (app-id: alpr-agent)
  • vehicle-profiling-agent-dapr (app-id: vehicle-profiling-agent) - only if converting to gRPC

Each agent container must expose an app-port for the Dapr sidecar to forward requests to:

weapon-detection-agent-dapr:
    image: daprio/daprd:latest
    command: [
      "./daprd",
      "-app-id", "weapon-detection-agent",
      "-app-port", "50051",          # gRPC port on the agent
      "-app-protocol", "grpc",       # tell Dapr the app speaks gRPC
      "-placement-host-address", "placement:50006",
      "-dapr-http-port", "3500",
      "-dapr-grpc-port", "50001",
      "-components-path", "/components",
      "-config", "/dapr/config.yaml"
    ]
    network_mode: "service:weapon-detection-agent"

5. NATS Topic Simplification

Remove (no longer needed for inter-agent communication):

  • person.detected.annotated_frame
  • person.recognized.annotated_frame
  • person.profiled.annotated_frame
  • person.posed.annotated_frame
  • weapon.detected.annotated_frame
  • weapon.classified.annotated_frame
  • vehicle.detected.annotated_frame
  • vehicle.recognized.annotated_frame

Keep:

  • Image.diffed.frame (ingestion input)
  • frame.enriched.annotated_frame (single output to persistence)
  • objects.detected.annotated_frame (if objects branch has no enrichment, can merge into the single output)
  • vehicle.profiled.vehicle_track (async, multi-frame - see exception above)

Persistence drops from subscribing to 10+ topics to 2-3.

6. Resiliency

Add a Dapr resiliency configuration to handle slow agents (especially LLM calls):

# .devcontainer/dapr/resiliency.yaml
apiVersion: dapr.io/v1alpha1
kind: Resiliency
metadata:
  name: agent-resiliency
spec:
  policies:
    timeouts:
      agentTimeout: 30s        # per-agent call timeout
      llmAgentTimeout: 60s     # LLM agents get more time
    retries:
      agentRetry:
        policy: constant
        maxRetries: 2
        duration: 1s
    circuitBreakers:
      agentCB:
        maxRequests: 1
        interval: 30s
        timeout: 60s
        trip: consecutiveFailures > 3
  targets:
    apps:
      recognition-agent:
        timeout: agentTimeout
        retry: agentRetry
        circuitBreaker: agentCB
      person-profiling-agent:
        timeout: llmAgentTimeout
        retry: agentRetry
        circuitBreaker: agentCB
      weapon-intent-agent:
        timeout: llmAgentTimeout
        retry: agentRetry
        circuitBreaker: agentCB

7. Dependencies

Add to each Python agent's requirements.txt:

grpcio>=1.60.0
grpcio-tools>=1.60.0
dapr>=1.12.0
dapr-ext-grpc>=1.12.0

Migration Strategy

  1. Phase 1: Add gRPC server endpoints to each agent alongside existing NATS subscribers. Both paths work simultaneously.
  2. Phase 2: Update detection-agent to use Dapr service invocation. Disable NATS publishing for inter-agent topics.
  3. Phase 3: Remove NATS subscriber code from downstream agents. Clean up unused NATS streams.
  4. Phase 4: Update persistence to subscribe to the single frame.enriched.annotated_frame topic.

Trade-offs

Aspect NATS Pub/Sub (Current) gRPC via Dapr (Proposed)
Coupling Loose - agents are independent Tighter - detection-agent depends on all downstream agents
Error handling Silent drops, no backpressure Immediate errors, timeouts, circuit breakers
Frame completeness Persistence gets partial data at different times Persistence gets one complete message per frame
Latency Lower per-hop (fire-and-forget) Higher per-frame (waits for all chains)
Scalability Easy to scale consumers independently Still scales independently, but orchestrator is bottleneck
Observability Must correlate across 10+ topics Single request trace through Zipkin via Dapr
Complexity Simple per-agent, complex at system level Complex orchestrator, simple persistence

Appendix A: Vehicle Profiling Scaling — Why Single Instance Works

The Problem

Vehicle profiling is stateful: it collects frames for the same track_id over a 5-second window (up to 25-30 frames at 5 FPS) before making a single LLM call to classify make/model/color/direction. When scaling horizontally with NATS queue groups, frames for the same track get round-robin distributed across instances, splitting the frame buffer and producing incomplete or duplicate LLM calls.

Approaches Evaluated

Approach Verdict Why
Single instance Recommended (Phase 1) Simple, correct, sufficient throughput
NATS subject partitioning Future scale-out Consistent hashing on track_id pins all frames to the same consumer
Dapr Virtual Actors Not suitable Turn-based locking blocks during LLM calls
Redis shared state Viable but complex Shared frame buffer in Redis, any instance can finalize

Why Dapr Actors Don't Work Here

Dapr Virtual Actors use a turn-based, single-threaded execution model — only one message (method call, timer, or reminder) can execute at a time per actor instance. This is fundamentally incompatible with vehicle profiling:

  1. Turn-based locking blocks frame ingestion during LLM calls. When the actor triggers a Gemini/GPT call (30-60s), the actor is locked. All incoming AddFrame() calls queue up and time out. New vehicle detections for that track are silently dropped.

  2. The "fire-and-forget reminder" workaround doesn't help. Setting dueTime=0s on a reminder to offload the LLM call still executes the reminder callback as a turn, which locks the actor for the entire duration of the LLM inference.

  3. Split-brain under pod eviction. Dapr actor placement uses a hashing ring. During Kubernetes pod evictions or rolling updates, there's a window where two instances believe they own the same actor. This causes duplicate LLM calls and corrupted state. (See dapr/dapr#4451)

  4. Dapr Workflows have the same limitation. Workflows are built on top of actors and inherit the turn-based model. Additionally, storing 50MB of JPEG buffers per track in workflow state is impractical.

  5. Semantic Kernel team hit the same wall. The SK team found Dapr Actors unsuitable for orchestrating long-running LLM operations due to the reentrancy and locking constraints. (See microsoft/semantic-kernel#9953)

Why Single Instance Is Sufficient

A single vehicle-profiling instance using Python asyncio can handle the expected load:

Throughput model:
- 10 cameras × 5 concurrent vehicle tracks = 50 active tracks
- Each track: 5 FPS × 5s = 25 frames → 1 LLM call
- LLM call latency: ~2-5s (Gemini Flash)
- At steady state: ~10 LLM calls/sec (50 tracks finalizing over 5s)
- asyncio handles this trivially — frame reception and LLM calls are all async I/O

The key insight: the LLM API is the throughput bottleneck, not the instance count. Adding more instances doesn't increase LLM throughput — it just spreads the same API rate limit across more processes. A single asyncio event loop can:

  • Receive frames from NATS concurrently (non-blocking)
  • Maintain per-track frame buffers in a dict[bytes, list[bytes]]
  • Fire off LLM calls as asyncio.Task when a track's 5s window expires
  • Continue receiving frames for other tracks while LLM calls are in flight
# Simplified single-instance vehicle profiler
class VehicleProfiler:
    def __init__(self):
        self.tracks: dict[bytes, TrackBuffer] = {}

    async def on_frame(self, msg):
        """NATS callback — non-blocking, runs concurrently."""
        track_id = msg.vehicle.track_id
        buf = self.tracks.setdefault(track_id, TrackBuffer())
        buf.add_frame(msg)
        if buf.is_ready():  # 5s elapsed or 30 frames
            asyncio.create_task(self.finalize_track(track_id))

    async def finalize_track(self, track_id):
        """Runs as a separate task — doesn't block frame reception."""
        buf = self.tracks.pop(track_id)
        profile = await self.llm_client.classify(buf.frames)  # async HTTP call
        vehicle_track = build_vehicle_track(buf, profile)
        await self.js.publish("vehicle.profiled.vehicle_track",
                              vehicle_track.SerializeToString())

Memory Budget

Scenario Tracks Frames/Track Frame Size Total
10 cameras, 5 vehicles each 50 25 (5fps × 5s) ~2MB (4K JPEG) 2.5 GB
10 cameras, 2 vehicles each 20 25 ~2MB 1.0 GB
5 cameras, 3 vehicles each 15 25 ~500KB (1080p JPEG) 187 MB

At 4K resolution, budget 4-8 GB for the vehicle-profiling container. At 1080p, 1-2 GB is sufficient.

Future Scale-Out: NATS Subject Partitioning

When a single instance is no longer sufficient (e.g., 50+ cameras), use NATS subject-based partitioning:

# Instead of one subject:
vehicle.detected.annotated_frame

# Partition by track_id hash:
vehicle.detected.partition.0
vehicle.detected.partition.1
vehicle.detected.partition.2
...

# Detection-agent publishes to:
partition = hash(track_id) % NUM_PARTITIONS
await js.publish(f"vehicle.detected.partition.{partition}", data)

Each vehicle-profiling instance subscribes to a specific partition. All frames for a given track_id always hash to the same partition, ensuring correct frame collection without shared state. This is the same pattern used by Kafka consumer groups and NATS Key-Value partitioning.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment