Created
November 12, 2025 03:13
-
-
Save nileshtrivedi/8b2899c9e4b09c86a9efbb5d3a804135 to your computer and use it in GitHub Desktop.
Simulator for Reactivity in Distributed System with CRDT-based state
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // A CRDT is a wrapper data-structure for a value that knows | |
| // how to synchronize with other replicas to achieve eventual consistency | |
| // A single special method "apply" is expected to incorporate messages received from replicas. | |
| // Local mutations are applied immediately ("optimistically") | |
| // Eventual consistency is achieved with the apply method being commutative, associative and sometimes idempotent | |
| // Mutators must produce a new local CRDT instance (that includes the changes requested) and a Message/Effect to broadcast | |
| // For state-based CRDT, the whole internal state gets broadcasted | |
| // For op-based CRDT, the operation itself gets broadcasted | |
| // message = null is treated as a no-op (for eg: adding an existing item to a set) and skips broadcasting when not needed | |
| type MutationResult<C, MessageType> = {updatedCrdt: C, message: MessageType | null}; | |
| interface CRDT<ValueType, MessageType> { | |
| apply(remoteMessage: MessageType): this | |
| } | |
| // A state-based CRDT receives full states from other replicas and "merges" them into its own | |
| // it demands a weaker comm channel: messages may be lost, arrived out-of-order or multiple times | |
| // Updates are propagated reliably even if the network partitions, as long as eventually connectivity is restored | |
| // Might be inefficient if state is large | |
| interface StateBasedCRDT<ValueType, StateMessageType> extends CRDT<ValueType, StateMessageType> { | |
| readonly state: StateMessageType; // Full serializable state that gets propagated to other replicas. | |
| // apply for StateBasedCRDT is a "merging" of a remote state into the local state. | |
| // This merging must be commutative, associative and idempotent | |
| } | |
| // A delta-based CRDT propagates deltas (changes) instead of full states. | |
| // The "apply" method merges a remote delta. | |
| // The merge function must be commutative, associative, and idempotent (a "join"). | |
| interface DeltaBasedCRDT<ValueType, DeltaMessageType> extends CRDT<ValueType, DeltaMessageType> { | |
| // apply(remoteDelta: DeltaMessageType): DeltaBasedCRDT<ValueType, DeltaMessageType> | |
| } | |
| // Op-based CRDT receives operations from other replicas and applies them to its local state | |
| // Applies a remote operation to the local state. Must be commutative. | |
| // Needs a more reliable network: All updates must be delivered at every replica either in the right order or be commutative | |
| interface OperationBasedCRDT<ValueType, OpMessageType> extends CRDT<ValueType, OpMessageType> { | |
| // No extensions are necessary, but the expectation from apply are weaker, and from channel are stronger | |
| // apply(remoteMessage) must be commutative? associative? idempotent? | |
| } | |
| // A basic CRDT primitive and example | |
| // Last-Writer-Wins CRDT's "merge" of remote state is simply copies the latest state based on the timestamp | |
| type LWWRegisterState<VT> = {timestamp: number, nodeId: string; value: VT}; | |
| class LWWRegister<VT> implements StateBasedCRDT<VT, LWWRegisterState<VT>> { | |
| readonly nodeId: string; | |
| state: LWWRegisterState<VT>; | |
| constructor(nodeId: string, state: LWWRegisterState<VT>) { | |
| this.nodeId = nodeId; | |
| this.state = state; | |
| } | |
| getValue() : VT { | |
| return this.state.value; | |
| } | |
| setValue(newValue: VT): MutationResult<LWWRegister<VT>, LWWRegisterState<VT>> { | |
| const newState: LWWRegisterState<VT> = { | |
| timestamp: Date.now(), // Keeping it simple for now. Might use Lamport clocks in the future | |
| nodeId: this.nodeId, | |
| value: newValue | |
| }; | |
| return {updatedCrdt: new LWWRegister(this.nodeId, newState), message: newState}; | |
| } | |
| apply(remoteState: LWWRegisterState<VT>): this { | |
| if (this.state.timestamp > remoteState.timestamp) return this; // discard the incoming value | |
| if (this.state.timestamp == remoteState.timestamp && this.nodeId > remoteState.nodeId) return this; | |
| return new LWWRegister<VT>(this.nodeId, remoteState) as this; | |
| } | |
| } | |
| /* LWWMap - a container CRDT for named values which are themselves of type LWWRegister */ | |
| // As a map, it will be best implemented as a delta-based CRDT | |
| type LWWMapState<VT> = Map<string, LWWRegisterState<VT | null | undefined>>; | |
| class LWWMap<VT> implements DeltaBasedCRDT<Map<string, VT>, LWWMapState<VT>> { | |
| readonly nodeId: string; | |
| readonly #data = new Map<string, LWWRegister<VT | null | undefined>>(); | |
| private constructor(nodeId: string, data: Map<string, LWWRegister<VT | null | undefined>>) { | |
| this.nodeId = nodeId; | |
| this.#data = data; | |
| } | |
| // Public factory for creating an empty map | |
| static startEmpty<VT>(nodeId: string): LWWMap<VT> { | |
| return new LWWMap(nodeId, new Map()); | |
| } | |
| // Public factory for creating from a state | |
| static fromState<VT>(nodeId: string, state: LWWMapState<VT>): LWWMap<VT> { | |
| const data = new Map<string, LWWRegister<VT | null | undefined>>(); | |
| for (const [key, registerState] of state.entries()) { | |
| data.set(key, new LWWRegister(nodeId, registerState)); | |
| } | |
| return new LWWMap(nodeId, data); | |
| } | |
| getValueForKey(key: string): VT | null | undefined { | |
| return this.#data.get(key)?.getValue() ?? undefined; | |
| } | |
| hasKey(key: string): boolean { | |
| const val = this.#data.get(key)?.getValue(); | |
| return val !== null && val !== undefined; | |
| } | |
| countKeys(): number { | |
| return this.#data.size; | |
| } | |
| getValue(): Map<string, VT> { | |
| const value = new Map<string, VT>(); | |
| for (const [key, register] of this.#data.entries()) { | |
| const registerValue = register.getValue(); | |
| if (registerValue !== null && registerValue !== undefined) { | |
| value.set(key, registerValue as VT); | |
| } | |
| } | |
| return value; | |
| } | |
| // Helper to get or create a register *instance* (not state) | |
| private getOrCreateRegister(key: string): LWWRegister<VT | null | undefined> { | |
| let register = this.#data.get(key); | |
| if (!register) { | |
| register = new LWWRegister<VT | null | undefined>( | |
| this.nodeId, | |
| {timestamp: 0, nodeId: this.nodeId, value: undefined} // use timestamp=0 for a new register | |
| ); | |
| } | |
| return register; | |
| } | |
| removeKey(key: string) : MutationResult<LWWMap<VT>, LWWMapState<VT>> { | |
| const register = this.getOrCreateRegister(key); | |
| const { updatedCrdt: newRegister, message: newRegisterState } = register.setValue(null); | |
| const newData = new Map(this.#data); | |
| newData.set(key, newRegister); | |
| const deltaMessage = new Map([[key, newRegisterState!]]); | |
| const newMapInstance = new LWWMap(this.nodeId, newData); | |
| return {updatedCrdt: newMapInstance as this, message: deltaMessage}; | |
| } | |
| setValueForKey(key: string, value: VT | null) : MutationResult<LWWMap<VT>, LWWMapState<VT>> { | |
| const register = this.getOrCreateRegister(key); | |
| const { updatedCrdt: newRegister, message: newRegisterState } = register.setValue(value); | |
| const newData = new Map(this.#data); | |
| newData.set(key, newRegister); | |
| const deltaMessage = new Map([[key, newRegisterState!]]); | |
| const newMapInstance = new LWWMap<VT>(this.nodeId, newData); | |
| return {updatedCrdt: newMapInstance, message: deltaMessage} | |
| } | |
| get state() { | |
| const state: LWWMapState<VT> = new Map<string, LWWRegisterState<VT>>(); | |
| for (const [key, register] of this.#data.entries()) { | |
| state.set(key, register.state); | |
| } | |
| return state; | |
| } | |
| // This needs to be able to merge deltas - which might be full states | |
| apply(remoteDelta: LWWMapState<VT>): this { | |
| // We need a new data map for the new immutable instance | |
| const newData = new Map(this.#data); | |
| for (const [key, remoteRegisterState] of remoteDelta.entries()) { | |
| const localRegister = this.#data.get(key); | |
| if (localRegister) { | |
| // Register exists, merge it and store the new *register instance* | |
| const mergedRegister = localRegister.apply(remoteRegisterState); | |
| newData.set(key, mergedRegister); | |
| } else { | |
| // New key, create a new register *instance* from the remote state | |
| newData.set(key, new LWWRegister(this.nodeId, remoteRegisterState)); | |
| } | |
| } | |
| return new LWWMap<VT>(this.nodeId, newData) as this; | |
| } | |
| } | |
| // Grow-only Set as an op-based CRDT | |
| // The only operation on GSet is 'add', which carries the value. | |
| type GSetOp<VT> = {type: 'add'; value: VT;}; | |
| class GSet<VT> implements OperationBasedCRDT<Set<VT>, GSetOp<VT>> { | |
| // The internal state is just a standard Set | |
| readonly #data = new Set<VT>(); | |
| constructor(initialSet?: Set<VT>){ | |
| this.#data = new Set(initialSet); | |
| } | |
| getValue() : Set<VT> { | |
| return new Set(this.#data); | |
| } | |
| has(item: VT): boolean { | |
| return this.#data.has(item); | |
| } | |
| add(newItem: VT) : MutationResult<this, GSetOp<VT>> { | |
| if(this.#data.has(newItem)) { // If we already have it, no need to update or broadcast | |
| return {updatedCrdt: this, message: null} | |
| } | |
| // Update local state by simply adding the item to the set. | |
| // Guaranteed to be commutative because Set.add() is. | |
| // Set.add() does nothing, making it idempotent. | |
| const newData = new Set(this.#data); | |
| newData.add(newItem); | |
| // Return the updated crdt and the op to be broadcast | |
| return {updatedCrdt: new GSet<VT>(newData) as this, message: {type: 'add', value: newItem}} | |
| } | |
| apply(remoteOp: GSetOp<VT>): this { | |
| return this.add(remoteOp.value).updatedCrdt; // We ignore message because we RECEIVED this op from outside | |
| } | |
| } | |
| // Now we define types to specify a Distributed System | |
| // Each network zone gets multiple nodes. Each node can have local state and computations. | |
| // Some states are the source of truth. Others are derived values from other states via computations. | |
| // The AppSpec gets used to instantiate an actual runtime App (which is a network of nodes) | |
| // all computations can only depend on the local state | |
| // all local state must be declared with the proper crdt type which determines how updates are generated and applied | |
| // all derived state explicitly specifies the function that compute updates, and its dependencies | |
| // all computations receive an additional event parameter which includes session info that makes access-control possible | |
| type Path = string; // A zone-prefixed path to a state variable or a computation, e.g., "/client/my_todos" | |
| type CrdtType = "LWWRegister" | "LWWMap" | "GSet"; | |
| type ValueType = "object" | "number" | "string" | "boolean" | "array"; | |
| type SourceState = { | |
| crdtType: CrdtType; | |
| valueType: ValueType; | |
| init: any; | |
| }; | |
| type DerivedState = { | |
| crdtType: CrdtType; | |
| valueType: ValueType; | |
| computer: Path; | |
| depends_on: Path[] | |
| }; | |
| type Effect<VT> = { | |
| nodeId: string; | |
| path: Path; | |
| msg: LWWRegisterState<VT> | LWWMapState<VT> | GSetOp<VT> | |
| }; | |
| type Session = { claims: any; signature: string; } | |
| type ZEvent = {nodeId: string; session: Session, payload: any}; | |
| /* | |
| Computation, even reactive, always gets the event as an argument. | |
| This is mostly useful for getting session info so that access-control can work. | |
| Explicit events also make it easy to replay and debug things. | |
| For reactive computations, the framework will provide the event. | |
| */ | |
| type Computation = (node: RunningNode, event: ZEvent) => Effect<any>; | |
| type ZoneName = string; | |
| type NodeSpec = { [varName: string]: SourceState | DerivedState | Computation; } | |
| // Every zone must have atleast one channel with one other zone so that the entire system is connected | |
| type ChannelType = "websocket"; | |
| type Channel = { type: ChannelType; fromZone: ZoneName; toZone: ZoneName; } | |
| type AppSpec = { | |
| zones: { [zoneName: ZoneName]: NodeSpec; }; | |
| channels: Channel[] | |
| } | |
| type Connection = { | |
| fromNodeId: string; | |
| fromZone: ZoneName; | |
| toNodeId: string; | |
| toZone: ZoneName; | |
| channelType: ChannelType; | |
| isActive: boolean; | |
| latency: number; | |
| } | |
| type MsgQueue<VT> = Effect<VT>[]; | |
| type RunningNode = { | |
| id: string; | |
| spec: NodeSpec; | |
| state: {[name: string]: any}; | |
| zone: ZoneName; | |
| signingSecret: string; | |
| incomingQueue: MsgQueue<any>; | |
| outgoingQueue: MsgQueue<any>; | |
| getTimestamp: () => number; | |
| } | |
| /* This is the type for an actual distributed system instantiated from an AppSpec */ | |
| type RunningApp = { | |
| zones: { [zoneName: ZoneName]: RunningNode[]; }; | |
| connections: Connection[]; | |
| } | |
| const initState = (nodeId: string, crdtType: CrdtType, valueType: ValueType, init?: Function) => { | |
| let init_for_derived_state = (name: string) => { console.log("init_for_derived_state", name)}; | |
| let initializer: Function = init || init_for_derived_state; | |
| if(crdtType == "LWWRegister") { | |
| switch(valueType) { | |
| case "boolean": return new LWWRegister<boolean>(nodeId, {nodeId: nodeId, timestamp: Date.now(), value: initializer()}); | |
| case "string": return new LWWRegister<string>(nodeId, {nodeId: nodeId, timestamp: Date.now(), value: initializer()}); | |
| case "number": return new LWWRegister<number>(nodeId, {nodeId: nodeId, timestamp: Date.now(), value: initializer()}); | |
| case "object": return new LWWRegister<object>(nodeId, {nodeId: nodeId, timestamp: Date.now(), value: initializer()}); | |
| case "array": return new LWWRegister<any[]>(nodeId, {nodeId: nodeId, timestamp: Date.now(), value: initializer()}); | |
| } | |
| } else if (crdtType == "LWWMap") { | |
| return LWWMap.fromState<any>(nodeId, new Map()); | |
| } else if (crdtType == "GSet") { | |
| switch(valueType) { | |
| case "boolean": return new GSet<boolean>(new Set(initializer())); | |
| case "string": return new GSet<string>(new Set(initializer())); | |
| case "number": return new GSet<number>(new Set(initializer())); | |
| case "object": return new GSet<object>(new Set(initializer())); | |
| case "array": return new GSet<any[]>(new Set(initializer())); | |
| } | |
| } | |
| } | |
| const nodeSpecToState = (nodeId: string, nodeSpec: NodeSpec) => { | |
| let stateVars: [string, (SourceState | DerivedState)][] = Object.entries(nodeSpec) | |
| .filter(([k,v]) => typeof(v) != "function") as [string, (SourceState | DerivedState)][]; | |
| const stateObject: {[name: string]: any} = {}; | |
| for (const [k, v] of stateVars) { | |
| // Pass nodeId for LWWRegister | |
| stateObject[k] = initState(nodeId, v.crdtType, v.valueType, 'init' in v && v.init); | |
| } | |
| return stateObject; | |
| } | |
| const initNode = (zoneName: ZoneName, nodeSpec: NodeSpec) : RunningNode => { | |
| let nodeId = globalThis.crypto.randomUUID(); | |
| return { | |
| spec: nodeSpec, | |
| state: nodeSpecToState(nodeId, nodeSpec), | |
| zone: zoneName, | |
| id: nodeId, | |
| signingSecret: globalThis.crypto.randomUUID(), | |
| incomingQueue: [] as MsgQueue<any>, | |
| outgoingQueue: [] as MsgQueue<any>, | |
| getTimestamp: () => { return Date.now() } | |
| } | |
| } | |
| const establishConnection = (fromNode: RunningNode, toNode: RunningNode, channelType: ChannelType) : Connection => { | |
| return { | |
| fromNodeId: fromNode.id, | |
| fromZone: fromNode.zone, | |
| toNodeId: toNode.id, | |
| toZone: toNode.zone, | |
| channelType: channelType, | |
| isActive: true, | |
| latency: 0.1 | |
| } | |
| }; | |
| const startApp = (app: AppSpec) : RunningApp => { | |
| // TODO: Remove hard-coding for todoApp | |
| // start at least one node in each zone | |
| let dbNode = initNode("db", app.zones.db); | |
| let serverNodes = [ | |
| initNode("server", app.zones.server), | |
| initNode("server", app.zones.server) | |
| ]; | |
| let clientNodes = [ | |
| initNode("client", app.zones.client), | |
| initNode("client", app.zones.client) | |
| ]; | |
| // establish channels to setup sessions | |
| let conn1 = establishConnection(dbNode, serverNodes[0], "websocket") | |
| let conn2 = establishConnection(serverNodes[0], clientNodes[0], "websocket") | |
| let conn3 = establishConnection(serverNodes[0], clientNodes[1], "websocket") | |
| // TODO: Build dependency graph for auto-pushing reactive updates | |
| return { | |
| zones: { | |
| db: [dbNode], | |
| server: serverNodes, | |
| client: clientNodes | |
| }, | |
| connections: [conn1, conn2, conn3] | |
| } | |
| } | |
| // Example: Spec for a 3-tier todo app | |
| // db is responsible for persistence | |
| // server is needed for access-control and expensive computations | |
| // client is where user-interaction happens which generates effects | |
| // The framework will implement compatibility of effects with the destination CRDT types | |
| // challenge exercise: | |
| // (0) Show that computational work is done only where necessary, eg: for connected clients only | |
| // (1) model multi-layer client storage (RAM -> localStorage/SQLite) | |
| // (2) model database read replicas that take the burden off the master instance which still handles all write queries | |
| // (3) Show computations on server using a different programming language than the client | |
| // (4) Show example of long-running operations which send progress updates to the client | |
| // (5) Show that incompatibility of effects and target data type is detected and raises errors | |
| // (6) Show that access-control works as expected and no data leakage occurs | |
| const todo: AppSpec = { | |
| zones: { | |
| db: { | |
| all_todos: { | |
| crdtType: "LWWMap", | |
| valueType: "object", | |
| init: () => (new Map()) | |
| }, | |
| calculate_user_count: (dbNode: RunningNode, event: ZEvent) => ({ | |
| nodeId: event.nodeId, | |
| path: '/server/user_count', | |
| msg: { | |
| nodeId: dbNode.id, | |
| timestamp: dbNode.getTimestamp(), | |
| value: dbNode.state.all_todos.countKeys() | |
| } | |
| }) | |
| }, | |
| server: { | |
| user_count: { | |
| crdtType: "LWWRegister", | |
| valueType: "number", | |
| computer: '/db/calculate_user_count', | |
| depends_on: ['/db/all_todos'] | |
| }, | |
| filter_todos_for_user: (serverNode: RunningNode, event: ZEvent) => ({ | |
| nodeId: event.nodeId, | |
| path: '/client/my_todos', | |
| msg: { | |
| nodeId: serverNode.id, | |
| timestamp: serverNode.getTimestamp(), | |
| value: serverNode.state.all_todos.getValueForKey(event.session.claims.user_id) || [] | |
| } | |
| }), | |
| }, | |
| client: { | |
| my_todos: { | |
| crdtType: "LWWRegister", | |
| valueType: "array", | |
| computer: '/server/filter_todos_for_user', | |
| depends_on: ['/db/all_todos'] | |
| }, | |
| task_input: { | |
| crdtType: "LWWRegister", | |
| valueType: "string", | |
| init: () => "" | |
| }, | |
| ui: { | |
| crdtType: "LWWMap", | |
| valueType: "object", | |
| computer: '/client/render', | |
| depends_on: ['/client/my_todos'] | |
| }, | |
| render: (clientNode: RunningNode, event: ZEvent) => ({ | |
| nodeId: event.nodeId, | |
| path: '/client/ui', | |
| msg: new Map<string, LWWRegisterState<any>>() | |
| }) | |
| } | |
| }, | |
| channels: [ | |
| {type: "websocket", fromZone: "server", toZone: "client"}, | |
| {type: "websocket", fromZone: "db", toZone: "server"}, | |
| ] | |
| } | |
| let app = startApp(todo); // starts all the nodes and sets up the network to start ticking | |
| console.log({app}); | |
| const injectLocalMutation = (app: RunningApp, nodeId: string, path: string, fn: (crdt: CRDT<any, any>) => void) => { | |
| let zone = path.split("/")[0]; | |
| let node = app.zones[zone].find((n) => n.id == nodeId); | |
| // TODO: Complete this | |
| } | |
| const tick = (app: RunningApp) => { | |
| // TODO: Process msgs in the queues | |
| } | |
| const simulate = (app: RunningApp) => { | |
| // TODO: Simulate user-interaction and system ticks here | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment