Replace the async NATS pub/sub chain after step 4 (detection + tracking + categorization) with synchronous gRPC calls using Dapr Service Invocation. The detection-agent becomes the orchestrator: it invokes downstream agents via gRPC, waits for responses, aggregates all enrichment results into a single AnnotatedImage, and publishes one final message per frame to NATS for persistence.
Steps 1-4 remain unchanged (NATS ingestion -> YOLO -> ByteTrack -> categorize). NATS is also retained at the output edge for persistence fan-out.
Each agent subscribes to a NATS topic, processes the message, and publishes to the next topic. No agent knows when the full pipeline is complete for a given frame.
flowchart TD
NATS_IN["NATS: Image.diffed.frame"] --> DET["Detection Agent<br/>(YOLO + ByteTrack)"]
DET -->|"js.publish(person.detected)"| P_TOPIC["NATS: person.detected"]
DET -->|"js.publish(vehicle.detected)"| V_TOPIC["NATS: vehicle.detected"]
DET -->|"js.publish(objects.detected)"| O_TOPIC["NATS: objects.detected"]
P_TOPIC --> REC["Recognition Agent"]
P_TOPIC --> WPN["Weapon Detection Agent"]
REC -->|"js.publish(person.recognized)"| PR_TOPIC["NATS: person.recognized"]
WPN -->|"js.publish(weapon.detected)"| WD_TOPIC["NATS: weapon.detected"]
PR_TOPIC --> PROF["Person Profiling Agent<br/>(LLM)"]
WD_TOPIC --> WINT["Weapon Intent Agent<br/>(LLM)"]
PROF -->|"js.publish(person.profiled)"| PP_TOPIC["NATS: person.profiled"]
WINT -->|"js.publish(weapon.classified)"| WC_TOPIC["NATS: weapon.classified"]
PP_TOPIC --> POSE["Pose Agent"]
POSE -->|"js.publish(person.posed)"| POSED_TOPIC["NATS: person.posed"]
V_TOPIC --> ALPR["ALPR Agent"]
ALPR -->|"js.publish(vehicle.recognized)"| VR_TOPIC["NATS: vehicle.recognized"]
VR_TOPIC --> VPROF["Vehicle Profiling Agent<br/>(LLM)"]
VPROF -->|"js.publish(vehicle.profiled)"| VP_TOPIC["NATS: vehicle.profiled"]
O_TOPIC --> PERSIST_O["Persistence"]
POSED_TOPIC --> PERSIST_P["Persistence"]
WC_TOPIC --> PERSIST_W["Persistence"]
VP_TOPIC --> PERSIST_V["Persistence"]
style NATS_IN fill:#2d6a4f,color:#fff
style O_TOPIC fill:#264653,color:#fff
style PERSIST_O fill:#6c757d,color:#fff
style PERSIST_P fill:#6c757d,color:#fff
style PERSIST_W fill:#6c757d,color:#fff
style PERSIST_V fill:#6c757d,color:#fff
Problems with current approach:
- Persistence subscribes to 10+ separate NATS topics, receiving partial data at different times
- No way to know when all enrichment for a frame is "done"
- Silent message drops (no error propagation back to detection-agent)
- Difficult to correlate/merge person, vehicle, weapon, and object data per frame
The detection-agent orchestrates all downstream processing synchronously. Each agent exposes a gRPC (or HTTP) endpoint invoked via Dapr Service Invocation. Results flow back to the caller.
flowchart TD
NATS_IN["NATS: Image.diffed.frame"] --> DET["Detection Agent<br/>(YOLO + ByteTrack)<br/>== ORCHESTRATOR =="]
DET -->|"gRPC invoke<br/>(parallel)"| REC["Recognition Agent"]
DET -->|"gRPC invoke<br/>(parallel)"| WPN["Weapon Detection Agent"]
DET -->|"gRPC invoke<br/>(parallel)"| ALPR["ALPR Agent"]
REC -->|"return enriched persons"| DET
WPN -->|"return weapons"| DET
REC -.->|"gRPC invoke"| PROF["Person Profiling<br/>(LLM)"]
PROF -.->|"return profiles"| REC
REC -.->|"gRPC invoke"| POSE["Pose Agent"]
POSE -.->|"return poses"| REC
WPN -.->|"gRPC invoke"| WINT["Weapon Intent<br/>(LLM)"]
WINT -.->|"return classifications"| WPN
ALPR -->|"return plates"| DET
ALPR -.->|"gRPC invoke"| VPROF["Vehicle Profiling<br/>(LLM)"]
VPROF -.->|"return profiles"| ALPR
DET -->|"js.publish<br/>(single aggregated message)"| FINAL["NATS: frame.enriched.annotated_frame"]
FINAL --> PERSIST["Persistence Service"]
style NATS_IN fill:#2d6a4f,color:#fff
style DET fill:#e76f51,color:#fff
style FINAL fill:#264653,color:#fff
style PERSIST fill:#6c757d,color:#fff
Key difference: Detection-agent fans out 3 parallel gRPC calls, each chain completes synchronously, and detection-agent aggregates all results before publishing one NATS message to persistence.
-
Detection-agent receives
Imagefrom NATS (unchanged) -
Runs YOLO + ByteTrack, categorizes into persons/vehicles/objects (unchanged)
-
Fan-out via
asyncio.gather()- three parallel Dapr service invocations:Branch Chain Returns Person detection-agent -> recognition-agent -> person-profiling -> pose-agent List[Person]with face, profile, pose populatedWeapon detection-agent -> weapon-detection -> weapon-intent List[Weapon]with alert + descriptionVehicle detection-agent -> alpr-agent -> vehicle-profiling List[Vehicle]with plates + VehicleTrack -
Detection-agent merges all responses into a single
AnnotatedImage:persons(with face, profile, pose)vehicles(with plates)weapons(with intent)objects(unchanged, no downstream enrichment)
-
Publishes one message to
frame.enriched.annotated_frame -
Persistence subscribes to one topic instead of 10+
The vehicle-profiling agent currently collects multiple frames per track before making an LLM call (5s timeout, up to 30 frames). This stateful, multi-frame behavior doesn't fit a pure request-response model.
Options:
- Option A: Keep vehicle profiling as async NATS (hybrid). ALPR returns synchronously, vehicle profiling stays as a background NATS consumer that publishes
VehicleTrackseparately. - Option B: Move frame collection into the detection-agent or a dedicated buffer. Invoke vehicle-profiling synchronously once enough frames are collected.
- Option C (selected): ALPR returns synchronously within the gRPC chain. Vehicle profiling remains async via NATS since it needs temporal data across frames. Persistence gets the per-frame
AnnotatedImageimmediately (with plate data) andVehicleTracklater when the track finalizes. Vehicle profiling runs as a single instance using Python asyncio for concurrent frame reception and LLM calls. See Appendix A for the full analysis of scaling approaches (including why Dapr Actors are not suitable) and the single-instance design.
Add gRPC service definitions to proto/api/api.proto. The message types already exist.
// New service definitions for synchronous agent invocation
service PersonEnrichmentService {
rpc EnrichPersons (AnnotatedImage) returns (AnnotatedImage) {}
}
service WeaponEnrichmentService {
rpc DetectWeapons (AnnotatedImage) returns (AnnotatedImage) {}
}
service VehicleEnrichmentService {
rpc EnrichVehicles (AnnotatedImage) returns (AnnotatedImage) {}
}
// Individual agent services (called within chains)
service RecognitionService {
rpc Recognize (AnnotatedImage) returns (AnnotatedImage) {}
}
service PoseService {
rpc DetectPose (AnnotatedImage) returns (AnnotatedImage) {}
}
service PersonProfilingService {
rpc Profile (AnnotatedImage) returns (AnnotatedImage) {}
}
service WeaponDetectionService {
rpc Detect (AnnotatedImage) returns (AnnotatedImage) {}
}
service WeaponIntentService {
rpc Classify (AnnotatedImage) returns (AnnotatedImage) {}
}
service ALPRService {
rpc ReadPlates (AnnotatedImage) returns (AnnotatedImage) {}
}Note: Using AnnotatedImage as both request and response keeps it simple - each agent enriches the same message and passes it along. Alternatively, define specific request/response types per agent for tighter contracts.
Each agent needs to change from a NATS subscriber loop to a gRPC server (or HTTP server invoked by Dapr).
| Agent | Current | New |
|---|---|---|
| detection-agent | NATS sub -> process -> js.publish() x3 |
NATS sub -> process -> dapr.invoke() x3 parallel -> merge -> js.publish() x1 |
| recognition-agent | NATS sub loop, js.publish() |
gRPC server: Recognize(AnnotatedImage) -> AnnotatedImage. Calls person-profiling and pose via Dapr internally, or detection-agent chains them. |
| pose-agent | NATS sub loop, js.publish() |
gRPC server: DetectPose(AnnotatedImage) -> AnnotatedImage |
| person-profiling | NATS sub loop, js.publish() |
gRPC server: Profile(AnnotatedImage) -> AnnotatedImage |
| weapon-detection | NATS sub loop, js.publish() |
gRPC server: Detect(AnnotatedImage) -> AnnotatedImage. Calls weapon-intent via Dapr internally. |
| weapon-intent | NATS sub loop, js.publish() |
gRPC server: Classify(AnnotatedImage) -> AnnotatedImage |
| alpr-agent | NATS sub loop, js.publish() |
gRPC server: ReadPlates(AnnotatedImage) -> AnnotatedImage |
| vehicle-profiling | NATS sub loop (stateful, multi-frame) | Keep as NATS (see Vehicle Profiling Exception above) |
Per-agent implementation pattern (Python):
# Before (NATS subscriber)
async def process_message(js, msg):
annotated_msg = parse(msg.data)
result = do_inference(annotated_msg)
await js.publish("next.topic", result.SerializeToString())
# After (gRPC server via grpcio or Dapr HTTP)
class RecognitionServicer(api_pb2_grpc.RecognitionServiceServicer):
def Recognize(self, request, context):
# request is AnnotatedImage
enriched = do_inference(request)
return enriched # AnnotatedImage with face data populatedReplace the three js.publish() calls with parallel gRPC invocations:
# Pseudocode for detection-agent after step 4
import asyncio
from dapr.clients import DaprClient
async def orchestrate(person_data, vehicle_data, object_data, weapon_data):
async with DaprClient() as dapr:
# Fan out three chains in parallel
person_future = dapr.invoke_method(
app_id="recognition-agent",
method_name="Recognize",
data=person_data.SerializeToString(),
content_type="application/protobuf"
)
weapon_future = dapr.invoke_method(
app_id="weapon-detection-agent",
method_name="Detect",
data=person_data.SerializeToString(), # weapons need person context
content_type="application/protobuf"
)
vehicle_future = dapr.invoke_method(
app_id="alpr-agent",
method_name="ReadPlates",
data=vehicle_data.SerializeToString(),
content_type="application/protobuf"
)
person_result, weapon_result, vehicle_result = await asyncio.gather(
person_future, weapon_future, vehicle_future
)
# Merge into single AnnotatedImage
final = api_pb2.AnnotatedImage()
final.persons.extend(person_result.persons)
final.weapons.extend(weapon_result.weapons)
final.vehicles.extend(vehicle_result.vehicles)
final.objects.extend(object_data.objects)
# ... copy frame_id, device_id, data, timestamps
# Single publish to persistence
await js.publish("frame.enriched.annotated_frame", final.SerializeToString())Each agent needs a Dapr sidecar with an app-id. The existing docker-compose-dapr.yml already has sidecars for detection, recognition, and pose agents. Extend it for the new agents.
New sidecars needed:
weapon-detection-agent-dapr(app-id:weapon-detection-agent)weapon-intent-agent-dapr(app-id:weapon-intent-agent)person-profiling-agent-dapr(app-id:person-profiling-agent)alpr-agent-dapr(app-id:alpr-agent)vehicle-profiling-agent-dapr(app-id:vehicle-profiling-agent) - only if converting to gRPC
Each agent container must expose an app-port for the Dapr sidecar to forward requests to:
weapon-detection-agent-dapr:
image: daprio/daprd:latest
command: [
"./daprd",
"-app-id", "weapon-detection-agent",
"-app-port", "50051", # gRPC port on the agent
"-app-protocol", "grpc", # tell Dapr the app speaks gRPC
"-placement-host-address", "placement:50006",
"-dapr-http-port", "3500",
"-dapr-grpc-port", "50001",
"-components-path", "/components",
"-config", "/dapr/config.yaml"
]
network_mode: "service:weapon-detection-agent"Remove (no longer needed for inter-agent communication):
person.detected.annotated_frameperson.recognized.annotated_frameperson.profiled.annotated_frameperson.posed.annotated_frameweapon.detected.annotated_frameweapon.classified.annotated_framevehicle.detected.annotated_framevehicle.recognized.annotated_frame
Keep:
Image.diffed.frame(ingestion input)frame.enriched.annotated_frame(single output to persistence)objects.detected.annotated_frame(if objects branch has no enrichment, can merge into the single output)vehicle.profiled.vehicle_track(async, multi-frame - see exception above)
Persistence drops from subscribing to 10+ topics to 2-3.
Add a Dapr resiliency configuration to handle slow agents (especially LLM calls):
# .devcontainer/dapr/resiliency.yaml
apiVersion: dapr.io/v1alpha1
kind: Resiliency
metadata:
name: agent-resiliency
spec:
policies:
timeouts:
agentTimeout: 30s # per-agent call timeout
llmAgentTimeout: 60s # LLM agents get more time
retries:
agentRetry:
policy: constant
maxRetries: 2
duration: 1s
circuitBreakers:
agentCB:
maxRequests: 1
interval: 30s
timeout: 60s
trip: consecutiveFailures > 3
targets:
apps:
recognition-agent:
timeout: agentTimeout
retry: agentRetry
circuitBreaker: agentCB
person-profiling-agent:
timeout: llmAgentTimeout
retry: agentRetry
circuitBreaker: agentCB
weapon-intent-agent:
timeout: llmAgentTimeout
retry: agentRetry
circuitBreaker: agentCBAdd to each Python agent's requirements.txt:
grpcio>=1.60.0
grpcio-tools>=1.60.0
dapr>=1.12.0
dapr-ext-grpc>=1.12.0
- Phase 1: Add gRPC server endpoints to each agent alongside existing NATS subscribers. Both paths work simultaneously.
- Phase 2: Update detection-agent to use Dapr service invocation. Disable NATS publishing for inter-agent topics.
- Phase 3: Remove NATS subscriber code from downstream agents. Clean up unused NATS streams.
- Phase 4: Update persistence to subscribe to the single
frame.enriched.annotated_frametopic.
| Aspect | NATS Pub/Sub (Current) | gRPC via Dapr (Proposed) |
|---|---|---|
| Coupling | Loose - agents are independent | Tighter - detection-agent depends on all downstream agents |
| Error handling | Silent drops, no backpressure | Immediate errors, timeouts, circuit breakers |
| Frame completeness | Persistence gets partial data at different times | Persistence gets one complete message per frame |
| Latency | Lower per-hop (fire-and-forget) | Higher per-frame (waits for all chains) |
| Scalability | Easy to scale consumers independently | Still scales independently, but orchestrator is bottleneck |
| Observability | Must correlate across 10+ topics | Single request trace through Zipkin via Dapr |
| Complexity | Simple per-agent, complex at system level | Complex orchestrator, simple persistence |
Vehicle profiling is stateful: it collects frames for the same track_id over a 5-second window (up to 25-30 frames at 5 FPS) before making a single LLM call to classify make/model/color/direction. When scaling horizontally with NATS queue groups, frames for the same track get round-robin distributed across instances, splitting the frame buffer and producing incomplete or duplicate LLM calls.
| Approach | Verdict | Why |
|---|---|---|
| Single instance | Recommended (Phase 1) | Simple, correct, sufficient throughput |
| NATS subject partitioning | Future scale-out | Consistent hashing on track_id pins all frames to the same consumer |
| Dapr Virtual Actors | Not suitable | Turn-based locking blocks during LLM calls |
| Redis shared state | Viable but complex | Shared frame buffer in Redis, any instance can finalize |
Dapr Virtual Actors use a turn-based, single-threaded execution model — only one message (method call, timer, or reminder) can execute at a time per actor instance. This is fundamentally incompatible with vehicle profiling:
-
Turn-based locking blocks frame ingestion during LLM calls. When the actor triggers a Gemini/GPT call (30-60s), the actor is locked. All incoming
AddFrame()calls queue up and time out. New vehicle detections for that track are silently dropped. -
The "fire-and-forget reminder" workaround doesn't help. Setting
dueTime=0son a reminder to offload the LLM call still executes the reminder callback as a turn, which locks the actor for the entire duration of the LLM inference. -
Split-brain under pod eviction. Dapr actor placement uses a hashing ring. During Kubernetes pod evictions or rolling updates, there's a window where two instances believe they own the same actor. This causes duplicate LLM calls and corrupted state. (See dapr/dapr#4451)
-
Dapr Workflows have the same limitation. Workflows are built on top of actors and inherit the turn-based model. Additionally, storing 50MB of JPEG buffers per track in workflow state is impractical.
-
Semantic Kernel team hit the same wall. The SK team found Dapr Actors unsuitable for orchestrating long-running LLM operations due to the reentrancy and locking constraints. (See microsoft/semantic-kernel#9953)
A single vehicle-profiling instance using Python asyncio can handle the expected load:
Throughput model:
- 10 cameras × 5 concurrent vehicle tracks = 50 active tracks
- Each track: 5 FPS × 5s = 25 frames → 1 LLM call
- LLM call latency: ~2-5s (Gemini Flash)
- At steady state: ~10 LLM calls/sec (50 tracks finalizing over 5s)
- asyncio handles this trivially — frame reception and LLM calls are all async I/O
The key insight: the LLM API is the throughput bottleneck, not the instance count. Adding more instances doesn't increase LLM throughput — it just spreads the same API rate limit across more processes. A single asyncio event loop can:
- Receive frames from NATS concurrently (non-blocking)
- Maintain per-track frame buffers in a
dict[bytes, list[bytes]] - Fire off LLM calls as
asyncio.Taskwhen a track's 5s window expires - Continue receiving frames for other tracks while LLM calls are in flight
# Simplified single-instance vehicle profiler
class VehicleProfiler:
def __init__(self):
self.tracks: dict[bytes, TrackBuffer] = {}
async def on_frame(self, msg):
"""NATS callback — non-blocking, runs concurrently."""
track_id = msg.vehicle.track_id
buf = self.tracks.setdefault(track_id, TrackBuffer())
buf.add_frame(msg)
if buf.is_ready(): # 5s elapsed or 30 frames
asyncio.create_task(self.finalize_track(track_id))
async def finalize_track(self, track_id):
"""Runs as a separate task — doesn't block frame reception."""
buf = self.tracks.pop(track_id)
profile = await self.llm_client.classify(buf.frames) # async HTTP call
vehicle_track = build_vehicle_track(buf, profile)
await self.js.publish("vehicle.profiled.vehicle_track",
vehicle_track.SerializeToString())| Scenario | Tracks | Frames/Track | Frame Size | Total |
|---|---|---|---|---|
| 10 cameras, 5 vehicles each | 50 | 25 (5fps × 5s) | ~2MB (4K JPEG) | 2.5 GB |
| 10 cameras, 2 vehicles each | 20 | 25 | ~2MB | 1.0 GB |
| 5 cameras, 3 vehicles each | 15 | 25 | ~500KB (1080p JPEG) | 187 MB |
At 4K resolution, budget 4-8 GB for the vehicle-profiling container. At 1080p, 1-2 GB is sufficient.
When a single instance is no longer sufficient (e.g., 50+ cameras), use NATS subject-based partitioning:
# Instead of one subject:
vehicle.detected.annotated_frame
# Partition by track_id hash:
vehicle.detected.partition.0
vehicle.detected.partition.1
vehicle.detected.partition.2
...
# Detection-agent publishes to:
partition = hash(track_id) % NUM_PARTITIONS
await js.publish(f"vehicle.detected.partition.{partition}", data)
Each vehicle-profiling instance subscribes to a specific partition. All frames for a given track_id always hash to the same partition, ensuring correct frame collection without shared state. This is the same pattern used by Kafka consumer groups and NATS Key-Value partitioning.