This architecture replaces the standard Dapr Configuration Store component with a custom, high-performance Go SDK. It enables dynamic, partitioned processing of large configuration lists across a cluster of Go workers.
CockroachDB (CRDB) serves as the strict System of Record (SoR). To ensure security and decoupling, Go worker nodes possess zero database access. All configuration state, cluster topology, and event signaling are routed exclusively through NATS (JetStream, KV, and Core Request/Reply). The client SDK exposes an API identical to github.com/dapr/go-sdk/client to minimize developer friction.
- Complete DB Isolation: Worker nodes are stateless NATS clients. The database is accessed exclusively by a central gRPC Configuration Service.
- Key-Level Partitioning: Hash rings are formed dynamically per configuration key. A node only participates in the routing topology for keys it explicitly subscribes to.
- Immutable Ring Size: The partition count for a store is locked in NATS KV upon initial cluster bootstrap to prevent split-brain routing from misconfigured nodes.
- API Parity: The Go SDK exposes the exact method signatures of the Dapr Go client.
- Dual Consumption Modes: Supports mutually exclusive distributed processing (
PartitionedMode) and global replicated caching (FullMode).
A logical Store (e.g., gateway_rules) acts as a namespace boundary, isolating KV buckets and JetStream subjects.
- Metadata (Immutable Config):
config_meta_<store>(NATS KV)
- Key:
partition_count(e.g.,256)
- Topology (Cluster Membership):
config_nodes_<store>(NATS KV)
- Keys:
<config_key>.<worker_id>(e.g.,allowlist.node-1,routing.node-1) - TTL: 10 seconds (Workers renew every 5 seconds).
- Notifications (JetStream):
config.notify.<store>.<config_key>.<partition>
- Used by the gRPC service to push targeted change events.
- Data Fetching (Core Request/Reply):
config.fetch.<store>.<config_key>.<partition>
- Used by workers to request initial state from the gRPC service.
This central service manages the CRDB connection. It writes updates to the database, emits JetStream notifications, and runs NATS Responders to serve data to bootstrapping workers.
// 1. Write Path: Update CRDB, hash the row ID, and ping JetStream
func (m *ConfigManager) NotifyChange(store, configKey, rowID string, totalParts int) {
partID := computeHash(rowID) % totalParts
subject := fmt.Sprintf("config.notify.%s.%s.%d", store, configKey, partID)
m.nc.Request(subject, []byte(rowID), time.Second) // Payload is just the ID
}
// 2. Read Path: NATS Responders serving state to workers
func (m *ConfigManager) StartDataResponders(store string) {
// Listen for partition bootstrap requests: config.fetch.mystore.allowlist.42
m.nc.Subscribe(fmt.Sprintf("config.fetch.%s.*.*", store), func(msg *nats.Msg) {
configKey, partID := extractSubjectParams(msg.Subject)
data := m.queryDBForPartition(configKey, partID) // SELECT * FROM ... WHERE ...
msg.Respond(serialize(data))
})
}The consumer client strictly enforces the NATS-only boundary. It branches its internal architecture based on a typed consumption mode.
type ConsumptionMode int
const (
PartitionedMode ConsumptionMode = iota // Mutually exclusive cluster distribution
FullMode // Global broadcast to all nodes
)
// NewConsumer accepts NO database connection.
func NewConsumer(nc *nats.Conn, workerID, store string, requestedParts int, mode ConsumptionMode) (ConfigClient, error) {
// ... initialize NATS JetStream and KV clients ...
// 1. Enforce Immutable Partition Count
metaKV, _ := js.KeyValue(ctx, fmt.Sprintf("config_meta_%s", store))
totalParts := enforcePartitionCount(metaKV, requestedParts)
client := &ShardedClient{ /* ... */ }
// 2. Branch topology logic (FullMode skips KV watch and JumpHash)
if mode == FullMode {
go client.startFullConsumption()
}
return client, nil
}If a worker is in PartitionedMode, calling SubscribeConfigurationItems triggers the dynamic formation of a micro-cluster.
-
Subscription: The app calls
client.SubscribeConfigurationItems(ctx, "mystore", []string{"allowlist"}, handler). -
Heartbeat: The SDK spawns a goroutine writing to
config_nodes_mystorewith keyallowlist.<worker_id>(10s TTL, 5s refresh). -
Topology Watch: The SDK watches
config_nodes_mystoreforallowlist.*. -
Rebalance & JumpHash: When the watcher fires (node joins/leaves), the SDK extracts all active worker IDs for
allowlist. It runs JumpHash ($0$ to$N-1$ ) against the active IDs to determine which partitions of theallowlistthis specific worker owns (e.g., partitions[7, 42]). -
Dynamic Filtering: The SDK updates a JetStream durable consumer with
FilterSubjects: ["config.notify.mystore.allowlist.7", "config.notify.mystore.allowlist.42"].
A TOCTOU gap exists between topology observation and FilterSubjects update: events emitted for newly acquired partitions during this window may be missed.
Fix: JetStream consumers are updated with a DeliverLastPerSubjectPolicy anchor. After updating FilterSubjects, the worker performs a bootstrap fetch (Section 6.1) for each newly acquired partition using the sequence number from the last message seen on that subject. Any events with a higher sequence number than what the bootstrap response was built from are replayed from JetStream before the application callback is enabled.
func (c *ShardedClient) rebalance(newPartitions []int) {
prevSeq := c.consumer.LastSeqPerSubject() // snapshot before filter update
c.updateFilterSubjects(newPartitions)
for _, p := range newPartitions {
subject := fmt.Sprintf("config.notify.%s.%s.%d", c.store, configKey, p)
lastKnownSeq := prevSeq[subject] // 0 if never seen
// Bootstrap fetches state as-of now; returns the DB sequence it was built from
items, dbSeq := c.bootstrapPartition(configKey, p)
c.applyItems(items)
// Replay any JetStream events that arrived after the DB snapshot
c.replayFrom(subject, lastKnownSeq, dbSeq)
}
}Workers never execute SQL queries. They fetch state directly from the gRPC Producer over NATS.
When a node acquires new partitions via the JumpHash ring, it requests the full payload for those partitions:
for _, p := range newlyAcquiredPartitions {
subject := fmt.Sprintf("config.fetch.%s.%s.%d", c.store, configKey, p)
msg, _ := c.nc.Request(subject, nil, 5*time.Second)
items := deserializeToDaprFormat(msg.Data)
c.pushToSubscribers(items) // Push to application code
}When the JetStream consumer catches an event on config.notify.mystore.allowlist.42, notifications are micro-batched before fetching to avoid N sequential round-trips under bulk update load.
func (c *ShardedClient) runNotificationLoop() {
pending := map[string]struct{}{} // rowID → dedup set
timer := time.NewTimer(100 * time.Millisecond)
defer timer.Stop()
for {
select {
case msg := <-c.jetStreamCh:
rowID := string(msg.Data)
pending[rowID] = struct{}{} // deduplicate within window
msg.Ack()
if !timer.Stop() { <-timer.C }
timer.Reset(100 * time.Millisecond)
case <-timer.C:
if len(pending) == 0 {
timer.Reset(100 * time.Millisecond)
continue
}
ids := keys(pending)
pending = map[string]struct{}{}
// Single batched NATS request for all pending IDs
subject := fmt.Sprintf("config.fetch.%s.%s.batch", c.store, configKey)
msg, _ := c.nc.Request(subject, serializeIDs(ids), 5*time.Second)
items := deserializeToDaprFormat(msg.Data)
c.pushToSubscribers(items)
timer.Reset(100 * time.Millisecond)
}
}
}The gRPC service exposes a config.fetch.<store>.<key>.batch responder that accepts a list of row IDs and returns them in a single response.
In FullMode, every node receives every update for the store — there is no partitioning. This mode is mutually exclusive with PartitionedMode at the per-store level: a store is declared as one or the other at bootstrap time and stored in config_meta_<store> alongside partition_count.
Behavior differences from PartitionedMode:
| Property | PartitionedMode | FullMode |
|---|---|---|
| Heartbeat / topology KV | Yes | No |
| JumpHash ring | Yes | No |
| JetStream filter subjects | Per-owned partition | config.notify.<store>.<key>.* (wildcard) |
| Bootstrap fetch | Per acquired partition | Full key fetch on startup |
| Rebalance | On membership change | Never (no cluster) |
FullMode nodes are read-only caches. All nodes receive the same events and maintain identical in-memory state. This is suitable for small, frequently-read configuration keys (feature flags, routing tables) where every service instance needs the full dataset locally.
func (c *ShardedClient) startFullConsumption(configKey string) {
// Fetch full dataset once at startup
subject := fmt.Sprintf("config.fetch.%s.%s.full", c.store, configKey)
msg, _ := c.nc.Request(subject, nil, 5*time.Second)
c.applyItems(deserializeToDaprFormat(msg.Data))
// Subscribe to all partitions via wildcard — no filtering needed
sub, _ := c.js.Subscribe(
fmt.Sprintf("config.notify.%s.%s.*", c.store, configKey),
c.handleNotification,
nats.Durable(fmt.Sprintf("full-%s-%s", c.store, c.workerID)),
nats.DeliverLastPerSubjectPolicy(),
)
c.subs = append(c.subs, sub)
}PartitionedMode and FullMode nodes cannot coexist on the same store — the bootstrap would fail with a mode mismatch error if config_meta_<store> records a different mode than the one requested.
A missed heartbeat (GC pause, network blip) causes the TTL to expire and the node to appear departed. When the node recovers and re-registers, the topology watch fires a membership change that may cause the node to re-acquire partitions it never actually lost from its own perspective.
Guard: Before applying a new partition set from rebalance, the worker compares its current in-memory partition set. Partitions that are re-acquired (present in new set AND already owned) skip the bootstrap fetch and JetStream replay — they are considered already consistent. Only genuinely new partitions (not previously owned) trigger the Section 5.1 sealing flow.
func (c *ShardedClient) applyNewPartitionSet(newSet []int) {
current := c.ownedPartitions // current in-memory set
genuinelyNew := subtract(newSet, current)
retained := intersect(newSet, current)
dropped := subtract(current, newSet)
c.unsubscribePartitions(dropped)
c.ownedPartitions = append(retained, genuinelyNew...)
// Only bootstrap genuinely new partitions — retained ones are already consistent
c.rebalance(genuinelyNew)
}Additionally, the heartbeat goroutine uses an exponential backoff on NATS publish failures (max 3 retries over 2s) before logging a warning. If heartbeat fails for a full TTL period (10s), the node logs a critical event and begins a self-heal bootstrap of all its partitions on next successful heartbeat.
JetStream delivers at-least-once. The 100ms micro-batch window in Section 6.2 deduplicates row IDs within a batch. However, the same row ID can appear across multiple batch windows (e.g., rapid successive updates).
Application contract: The ConfigurationUpdateEvent callback must be idempotent. The SDK does not provide cross-batch deduplication — the application is responsible for version-checking or last-write-wins logic on received values. The gRPC service includes a monotonic version field on each row payload to allow receivers to discard stale updates:
type ConfigItem struct {
Key string
Value string
Version int64 // monotonically increasing; discard if version <= local
Metadata map[string]string
}The application developer is completely shielded from JumpHash, NATS subjects, and partitioning logic.
package main
import (
dapr "github.com/dapr/go-sdk/client"
"yourcompany/pkg/shardedconfig"
)
func main() {
client, _ := shardedconfig.NewConsumer(natsConn, "worker-1", "gateway", 256, shardedconfig.PartitionedMode)
client.SubscribeConfigurationItems(ctx, "gateway", []string{"allowlist", "routing"}, func(ctx context.Context, e *dapr.ConfigurationUpdateEvent) {
// App processes only the partitions of 'allowlist' and 'routing' it currently owns.
// It remains entirely unaware of cluster rebalances or NATS Request/Reply mechanics.
for key, item := range e.Items {
processWorkload(key, item.Value)
}
})
<-make(chan struct{})
}Multiple gRPC service instances run concurrently — each with its own CRDB connection and independent NATS Responder subscriptions. There is no primary/replica distinction; all instances serve config.fetch.* requests. NATS load-balances across queue-group subscribers automatically:
// All instances join the same queue group — NATS picks one per request
nc.QueueSubscribe(fmt.Sprintf("config.fetch.%s.*.*", store), "config-service", handler)If all instances are transiently unavailable, worker bootstrap requests will time out (5s). Workers retry with exponential backoff (1s, 2s, 4s, max 30s). In-memory state from before the outage remains valid and continues to be served to the application. No data loss occurs because CRDB and JetStream are both durable.
JetStream consumers are durable. If NATS is unreachable:
- Workers continue serving their last-known in-memory state to the application.
- Heartbeat failures will eventually cause TTL expiry — on reconnect, a full rebalance + bootstrap is triggered automatically.
- JetStream replay catches up any missed notifications from the durable consumer's last acknowledged sequence.
If a worker crashes after updating FilterSubjects but before completing the bootstrap fetch:
- On restart, the worker re-registers its heartbeat. The topology watch fires and triggers a fresh rebalance.
- The JetStream durable consumer resumes from its last acked sequence — unacked messages are redelivered.
- The bootstrap fetch re-runs for all acquired partitions, sealing the window as per Section 5.1.
The immutable partition count (Section 2) prevents routing inconsistencies from misconfigured nodes. If a node requests a different partition_count than what is stored in config_meta_<store>, enforcePartitionCount returns an error and the node refuses to join the cluster:
func enforcePartitionCount(kv nats.KeyValue, requested int) (int, error) {
entry, err := kv.Get(ctx, "partition_count")
if errors.Is(err, nats.ErrKeyNotFound) {
// First node — write the value and lock it
kv.Create(ctx, "partition_count", []byte(strconv.Itoa(requested)))
return requested, nil
}
stored := mustParseInt(entry.Value())
if stored != requested {
return 0, fmt.Errorf("partition count mismatch: cluster=%d, requested=%d", stored, requested)
}
return stored, nil
}All SDK operations emit structured metrics and traces. The following are mandatory instrumentation points:
| Metric | Type | Labels | Description |
|---|---|---|---|
config_partitions_owned |
Gauge | store, key, worker_id |
Current partition count owned by this worker |
config_rebalances_total |
Counter | store, key, trigger |
Rebalance events (join/leave/heartbeat-miss) |
config_bootstrap_duration_seconds |
Histogram | store, key, partition |
Bootstrap fetch latency per partition |
config_notification_batch_size |
Histogram | store, key |
Row IDs per 100ms batch window |
config_notification_lag_seconds |
Histogram | store, key |
Time from JetStream publish to app callback |
config_heartbeat_failures_total |
Counter | store, key, worker_id |
Consecutive heartbeat write failures |
config_stale_updates_discarded_total |
Counter | store, key |
Updates dropped due to version check |
Spans are created for:
config.bootstrap.partition— full partition fetch (includes DB query time viaW3C-Trace-Contextpropagated through NATS headers)config.rebalance— full rebalance cycle, with child spans per acquired/dropped partitionconfig.notification.batch— 100ms batch processing, tagged with batch size and partition IDs
All log lines include store, key, worker_id, and partition fields. The SDK uses slog with a caller-provided handler. Key events:
INFO: partition acquired/released, rebalance triggered, heartbeat restoredWARN: heartbeat failure (with retry count), bootstrap timeout (with retry attempt)ERROR: mode mismatch on join, partition count mismatch, bootstrap exhausted retries
This design is validated against the following operating envelope:
| Parameter | Target | Hard Limit |
|---|---|---|
| Stores per cluster | 10 | 50 |
| Config keys per store | 20 | 100 |
| Partition count per store | 256 | 4096 |
| Workers per key | 50 | 200 |
| Notifications per second (per partition) | 500 | 5,000 |
| Row payload size | ≤ 4 KB | 1 MB (NATS max) |
| Bootstrap payload size (full partition) | ≤ 1 MB | 8 MB |
Bottleneck: At >5,000 notifications/sec per partition, the 100ms batch window saturates the single batch NATS request. Mitigation: reduce partition_count to spread load, or increase the number of gRPC service instances (NATS queue-group scales horizontally).
Not designed for: sub-millisecond notification latency (JetStream + 100ms batching adds ~100–150ms end-to-end), or payloads requiring strong read-after-write consistency within a single client (use direct CRDB query for those cases).