Skip to content

Instantly share code, notes, and snippets.

@enachb
Created March 7, 2026 18:49
Show Gist options
  • Select an option

  • Save enachb/40c79a4debec9346bc64a41c131c1d6c to your computer and use it in GitHub Desktop.

Select an option

Save enachb/40c79a4debec9346bc64a41c131c1d6c to your computer and use it in GitHub Desktop.
Distributed Configuration Orchestration Layer - Design Document

Design Document: Distributed Configuration Orchestration Layer

1. Overview

This architecture replaces the standard Dapr Configuration Store component with a custom, high-performance Go SDK. It enables dynamic, partitioned processing of large configuration lists across a cluster of Go workers.

CockroachDB (CRDB) serves as the strict System of Record (SoR). To ensure security and decoupling, Go worker nodes possess zero database access. All configuration state, cluster topology, and event signaling are routed exclusively through NATS (JetStream, KV, and Core Request/Reply). The client SDK exposes an API identical to github.com/dapr/go-sdk/client to minimize developer friction.

2. Design Goals & Constraints

  • Complete DB Isolation: Worker nodes are stateless NATS clients. The database is accessed exclusively by a central gRPC Configuration Service.
  • Key-Level Partitioning: Hash rings are formed dynamically per configuration key. A node only participates in the routing topology for keys it explicitly subscribes to.
  • Immutable Ring Size: The partition count for a store is locked in NATS KV upon initial cluster bootstrap to prevent split-brain routing from misconfigured nodes.
  • API Parity: The Go SDK exposes the exact method signatures of the Dapr Go client.
  • Dual Consumption Modes: Supports mutually exclusive distributed processing (PartitionedMode) and global replicated caching (FullMode).

3. Infrastructure Taxonomy & NATS Topology

A logical Store (e.g., gateway_rules) acts as a namespace boundary, isolating KV buckets and JetStream subjects.

NATS Namespaces

  1. Metadata (Immutable Config): config_meta_<store> (NATS KV)
  • Key: partition_count (e.g., 256)
  1. Topology (Cluster Membership): config_nodes_<store> (NATS KV)
  • Keys: <config_key>.<worker_id> (e.g., allowlist.node-1, routing.node-1)
  • TTL: 10 seconds (Workers renew every 5 seconds).
  1. Notifications (JetStream): config.notify.<store>.<config_key>.<partition>
  • Used by the gRPC service to push targeted change events.
  1. Data Fetching (Core Request/Reply): config.fetch.<store>.<config_key>.<partition>
  • Used by workers to request initial state from the gRPC service.

4. Component Architecture

4.1. The Producer: gRPC Configuration Service

This central service manages the CRDB connection. It writes updates to the database, emits JetStream notifications, and runs NATS Responders to serve data to bootstrapping workers.

// 1. Write Path: Update CRDB, hash the row ID, and ping JetStream
func (m *ConfigManager) NotifyChange(store, configKey, rowID string, totalParts int) {
    partID := computeHash(rowID) % totalParts
    subject := fmt.Sprintf("config.notify.%s.%s.%d", store, configKey, partID)
    m.nc.Request(subject, []byte(rowID), time.Second) // Payload is just the ID
}

// 2. Read Path: NATS Responders serving state to workers
func (m *ConfigManager) StartDataResponders(store string) {
    // Listen for partition bootstrap requests: config.fetch.mystore.allowlist.42
    m.nc.Subscribe(fmt.Sprintf("config.fetch.%s.*.*", store), func(msg *nats.Msg) {
        configKey, partID := extractSubjectParams(msg.Subject)
        data := m.queryDBForPartition(configKey, partID) // SELECT * FROM ... WHERE ...
        msg.Respond(serialize(data))
    })
}

4.2. The Consumer: Go SDK Client

The consumer client strictly enforces the NATS-only boundary. It branches its internal architecture based on a typed consumption mode.

type ConsumptionMode int

const (
    PartitionedMode ConsumptionMode = iota // Mutually exclusive cluster distribution
    FullMode                               // Global broadcast to all nodes
)

// NewConsumer accepts NO database connection.
func NewConsumer(nc *nats.Conn, workerID, store string, requestedParts int, mode ConsumptionMode) (ConfigClient, error) {
    // ... initialize NATS JetStream and KV clients ...

    // 1. Enforce Immutable Partition Count
    metaKV, _ := js.KeyValue(ctx, fmt.Sprintf("config_meta_%s", store))
    totalParts := enforcePartitionCount(metaKV, requestedParts) 

    client := &ShardedClient{ /* ... */ }

    // 2. Branch topology logic (FullMode skips KV watch and JumpHash)
    if mode == FullMode {
        go client.startFullConsumption() 
    }

    return client, nil
}

5. Key-Level Partitioning & Orchestration Flow

If a worker is in PartitionedMode, calling SubscribeConfigurationItems triggers the dynamic formation of a micro-cluster.

  1. Subscription: The app calls client.SubscribeConfigurationItems(ctx, "mystore", []string{"allowlist"}, handler).
  2. Heartbeat: The SDK spawns a goroutine writing to config_nodes_mystore with key allowlist.<worker_id> (10s TTL, 5s refresh).
  3. Topology Watch: The SDK watches config_nodes_mystore for allowlist.*.
  4. Rebalance & JumpHash: When the watcher fires (node joins/leaves), the SDK extracts all active worker IDs for allowlist. It runs JumpHash ($0$ to $N-1$) against the active IDs to determine which partitions of the allowlist this specific worker owns (e.g., partitions [7, 42]).
  5. Dynamic Filtering: The SDK updates a JetStream durable consumer with FilterSubjects: ["config.notify.mystore.allowlist.7", "config.notify.mystore.allowlist.42"].

5.1. Rebalance Race Condition & Sealing

A TOCTOU gap exists between topology observation and FilterSubjects update: events emitted for newly acquired partitions during this window may be missed.

Fix: JetStream consumers are updated with a DeliverLastPerSubjectPolicy anchor. After updating FilterSubjects, the worker performs a bootstrap fetch (Section 6.1) for each newly acquired partition using the sequence number from the last message seen on that subject. Any events with a higher sequence number than what the bootstrap response was built from are replayed from JetStream before the application callback is enabled.

func (c *ShardedClient) rebalance(newPartitions []int) {
    prevSeq := c.consumer.LastSeqPerSubject() // snapshot before filter update
    c.updateFilterSubjects(newPartitions)

    for _, p := range newPartitions {
        subject := fmt.Sprintf("config.notify.%s.%s.%d", c.store, configKey, p)
        lastKnownSeq := prevSeq[subject] // 0 if never seen

        // Bootstrap fetches state as-of now; returns the DB sequence it was built from
        items, dbSeq := c.bootstrapPartition(configKey, p)
        c.applyItems(items)

        // Replay any JetStream events that arrived after the DB snapshot
        c.replayFrom(subject, lastKnownSeq, dbSeq)
    }
}

6. Data Fetching (The NATS Bootstrap)

Workers never execute SQL queries. They fetch state directly from the gRPC Producer over NATS.

6.1. Partition Bootstrap (Rebalancing)

When a node acquires new partitions via the JumpHash ring, it requests the full payload for those partitions:

for _, p := range newlyAcquiredPartitions {
    subject := fmt.Sprintf("config.fetch.%s.%s.%d", c.store, configKey, p)
    msg, _ := c.nc.Request(subject, nil, 5*time.Second) 
    
    items := deserializeToDaprFormat(msg.Data)
    c.pushToSubscribers(items) // Push to application code
}

6.2. Targeted Notification Handling

When the JetStream consumer catches an event on config.notify.mystore.allowlist.42, notifications are micro-batched before fetching to avoid N sequential round-trips under bulk update load.

func (c *ShardedClient) runNotificationLoop() {
    pending := map[string]struct{}{} // rowID → dedup set
    timer := time.NewTimer(100 * time.Millisecond)
    defer timer.Stop()

    for {
        select {
        case msg := <-c.jetStreamCh:
            rowID := string(msg.Data)
            pending[rowID] = struct{}{} // deduplicate within window
            msg.Ack()
            if !timer.Stop() { <-timer.C }
            timer.Reset(100 * time.Millisecond)

        case <-timer.C:
            if len(pending) == 0 {
                timer.Reset(100 * time.Millisecond)
                continue
            }
            ids := keys(pending)
            pending = map[string]struct{}{}

            // Single batched NATS request for all pending IDs
            subject := fmt.Sprintf("config.fetch.%s.%s.batch", c.store, configKey)
            msg, _ := c.nc.Request(subject, serializeIDs(ids), 5*time.Second)
            items := deserializeToDaprFormat(msg.Data)
            c.pushToSubscribers(items)
            timer.Reset(100 * time.Millisecond)
        }
    }
}

The gRPC service exposes a config.fetch.<store>.<key>.batch responder that accepts a list of row IDs and returns them in a single response.


6.3. FullMode Consumption

In FullMode, every node receives every update for the store — there is no partitioning. This mode is mutually exclusive with PartitionedMode at the per-store level: a store is declared as one or the other at bootstrap time and stored in config_meta_<store> alongside partition_count.

Behavior differences from PartitionedMode:

Property PartitionedMode FullMode
Heartbeat / topology KV Yes No
JumpHash ring Yes No
JetStream filter subjects Per-owned partition config.notify.<store>.<key>.* (wildcard)
Bootstrap fetch Per acquired partition Full key fetch on startup
Rebalance On membership change Never (no cluster)

FullMode nodes are read-only caches. All nodes receive the same events and maintain identical in-memory state. This is suitable for small, frequently-read configuration keys (feature flags, routing tables) where every service instance needs the full dataset locally.

func (c *ShardedClient) startFullConsumption(configKey string) {
    // Fetch full dataset once at startup
    subject := fmt.Sprintf("config.fetch.%s.%s.full", c.store, configKey)
    msg, _ := c.nc.Request(subject, nil, 5*time.Second)
    c.applyItems(deserializeToDaprFormat(msg.Data))

    // Subscribe to all partitions via wildcard — no filtering needed
    sub, _ := c.js.Subscribe(
        fmt.Sprintf("config.notify.%s.%s.*", c.store, configKey),
        c.handleNotification,
        nats.Durable(fmt.Sprintf("full-%s-%s", c.store, c.workerID)),
        nats.DeliverLastPerSubjectPolicy(),
    )
    c.subs = append(c.subs, sub)
}

PartitionedMode and FullMode nodes cannot coexist on the same store — the bootstrap would fail with a mode mismatch error if config_meta_<store> records a different mode than the one requested.


6.4. Heartbeat Failure & Idempotent Re-Registration

A missed heartbeat (GC pause, network blip) causes the TTL to expire and the node to appear departed. When the node recovers and re-registers, the topology watch fires a membership change that may cause the node to re-acquire partitions it never actually lost from its own perspective.

Guard: Before applying a new partition set from rebalance, the worker compares its current in-memory partition set. Partitions that are re-acquired (present in new set AND already owned) skip the bootstrap fetch and JetStream replay — they are considered already consistent. Only genuinely new partitions (not previously owned) trigger the Section 5.1 sealing flow.

func (c *ShardedClient) applyNewPartitionSet(newSet []int) {
    current := c.ownedPartitions // current in-memory set
    genuinelyNew := subtract(newSet, current)
    retained := intersect(newSet, current)
    dropped := subtract(current, newSet)

    c.unsubscribePartitions(dropped)
    c.ownedPartitions = append(retained, genuinelyNew...)

    // Only bootstrap genuinely new partitions — retained ones are already consistent
    c.rebalance(genuinelyNew)
}

Additionally, the heartbeat goroutine uses an exponential backoff on NATS publish failures (max 3 retries over 2s) before logging a warning. If heartbeat fails for a full TTL period (10s), the node logs a critical event and begins a self-heal bootstrap of all its partitions on next successful heartbeat.


6.5. Deduplication & Idempotency

JetStream delivers at-least-once. The 100ms micro-batch window in Section 6.2 deduplicates row IDs within a batch. However, the same row ID can appear across multiple batch windows (e.g., rapid successive updates).

Application contract: The ConfigurationUpdateEvent callback must be idempotent. The SDK does not provide cross-batch deduplication — the application is responsible for version-checking or last-write-wins logic on received values. The gRPC service includes a monotonic version field on each row payload to allow receivers to discard stale updates:

type ConfigItem struct {
    Key     string
    Value   string
    Version int64 // monotonically increasing; discard if version <= local
    Metadata map[string]string
}

7. Developer Experience

The application developer is completely shielded from JumpHash, NATS subjects, and partitioning logic.

package main

import (
    dapr "github.com/dapr/go-sdk/client"
    "yourcompany/pkg/shardedconfig"
)

func main() {
    client, _ := shardedconfig.NewConsumer(natsConn, "worker-1", "gateway", 256, shardedconfig.PartitionedMode)
    
    client.SubscribeConfigurationItems(ctx, "gateway", []string{"allowlist", "routing"}, func(ctx context.Context, e *dapr.ConfigurationUpdateEvent) {
        // App processes only the partitions of 'allowlist' and 'routing' it currently owns.
        // It remains entirely unaware of cluster rebalances or NATS Request/Reply mechanics.
        for key, item := range e.Items {
            processWorkload(key, item.Value)
        }
    })
    
    <-make(chan struct{})
}

8. Failure Recovery

8.1. gRPC Configuration Service Restart

Multiple gRPC service instances run concurrently — each with its own CRDB connection and independent NATS Responder subscriptions. There is no primary/replica distinction; all instances serve config.fetch.* requests. NATS load-balances across queue-group subscribers automatically:

// All instances join the same queue group — NATS picks one per request
nc.QueueSubscribe(fmt.Sprintf("config.fetch.%s.*.*", store), "config-service", handler)

If all instances are transiently unavailable, worker bootstrap requests will time out (5s). Workers retry with exponential backoff (1s, 2s, 4s, max 30s). In-memory state from before the outage remains valid and continues to be served to the application. No data loss occurs because CRDB and JetStream are both durable.

8.2. NATS Partition / Unavailability

JetStream consumers are durable. If NATS is unreachable:

  • Workers continue serving their last-known in-memory state to the application.
  • Heartbeat failures will eventually cause TTL expiry — on reconnect, a full rebalance + bootstrap is triggered automatically.
  • JetStream replay catches up any missed notifications from the durable consumer's last acknowledged sequence.

8.3. Worker Crash Mid-Rebalance

If a worker crashes after updating FilterSubjects but before completing the bootstrap fetch:

  • On restart, the worker re-registers its heartbeat. The topology watch fires and triggers a fresh rebalance.
  • The JetStream durable consumer resumes from its last acked sequence — unacked messages are redelivered.
  • The bootstrap fetch re-runs for all acquired partitions, sealing the window as per Section 5.1.

8.4. Stale Topology (Split-Brain Prevention)

The immutable partition count (Section 2) prevents routing inconsistencies from misconfigured nodes. If a node requests a different partition_count than what is stored in config_meta_<store>, enforcePartitionCount returns an error and the node refuses to join the cluster:

func enforcePartitionCount(kv nats.KeyValue, requested int) (int, error) {
    entry, err := kv.Get(ctx, "partition_count")
    if errors.Is(err, nats.ErrKeyNotFound) {
        // First node — write the value and lock it
        kv.Create(ctx, "partition_count", []byte(strconv.Itoa(requested)))
        return requested, nil
    }
    stored := mustParseInt(entry.Value())
    if stored != requested {
        return 0, fmt.Errorf("partition count mismatch: cluster=%d, requested=%d", stored, requested)
    }
    return stored, nil
}

9. Observability

All SDK operations emit structured metrics and traces. The following are mandatory instrumentation points:

9.1. Metrics (Prometheus)

Metric Type Labels Description
config_partitions_owned Gauge store, key, worker_id Current partition count owned by this worker
config_rebalances_total Counter store, key, trigger Rebalance events (join/leave/heartbeat-miss)
config_bootstrap_duration_seconds Histogram store, key, partition Bootstrap fetch latency per partition
config_notification_batch_size Histogram store, key Row IDs per 100ms batch window
config_notification_lag_seconds Histogram store, key Time from JetStream publish to app callback
config_heartbeat_failures_total Counter store, key, worker_id Consecutive heartbeat write failures
config_stale_updates_discarded_total Counter store, key Updates dropped due to version check

9.2. Distributed Tracing (OpenTelemetry)

Spans are created for:

  • config.bootstrap.partition — full partition fetch (includes DB query time via W3C-Trace-Context propagated through NATS headers)
  • config.rebalance — full rebalance cycle, with child spans per acquired/dropped partition
  • config.notification.batch — 100ms batch processing, tagged with batch size and partition IDs

9.3. Structured Logging

All log lines include store, key, worker_id, and partition fields. The SDK uses slog with a caller-provided handler. Key events:

  • INFO: partition acquired/released, rebalance triggered, heartbeat restored
  • WARN: heartbeat failure (with retry count), bootstrap timeout (with retry attempt)
  • ERROR: mode mismatch on join, partition count mismatch, bootstrap exhausted retries

10. Scale Assumptions

This design is validated against the following operating envelope:

Parameter Target Hard Limit
Stores per cluster 10 50
Config keys per store 20 100
Partition count per store 256 4096
Workers per key 50 200
Notifications per second (per partition) 500 5,000
Row payload size ≤ 4 KB 1 MB (NATS max)
Bootstrap payload size (full partition) ≤ 1 MB 8 MB

Bottleneck: At >5,000 notifications/sec per partition, the 100ms batch window saturates the single batch NATS request. Mitigation: reduce partition_count to spread load, or increase the number of gRPC service instances (NATS queue-group scales horizontally).

Not designed for: sub-millisecond notification latency (JetStream + 100ms batching adds ~100–150ms end-to-end), or payloads requiring strong read-after-write consistency within a single client (use direct CRDB query for those cases).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment