Skip to content

Instantly share code, notes, and snippets.

@nileshtrivedi
Created November 12, 2025 03:13
Show Gist options
  • Select an option

  • Save nileshtrivedi/8b2899c9e4b09c86a9efbb5d3a804135 to your computer and use it in GitHub Desktop.

Select an option

Save nileshtrivedi/8b2899c9e4b09c86a9efbb5d3a804135 to your computer and use it in GitHub Desktop.
Simulator for Reactivity in Distributed System with CRDT-based state
// A CRDT is a wrapper data-structure for a value that knows
// how to synchronize with other replicas to achieve eventual consistency
// A single special method "apply" is expected to incorporate messages received from replicas.
// Local mutations are applied immediately ("optimistically")
// Eventual consistency is achieved with the apply method being commutative, associative and sometimes idempotent
// Mutators must produce a new local CRDT instance (that includes the changes requested) and a Message/Effect to broadcast
// For state-based CRDT, the whole internal state gets broadcasted
// For op-based CRDT, the operation itself gets broadcasted
// message = null is treated as a no-op (for eg: adding an existing item to a set) and skips broadcasting when not needed
type MutationResult<C, MessageType> = {updatedCrdt: C, message: MessageType | null};
interface CRDT<ValueType, MessageType> {
apply(remoteMessage: MessageType): this
}
// A state-based CRDT receives full states from other replicas and "merges" them into its own
// it demands a weaker comm channel: messages may be lost, arrived out-of-order or multiple times
// Updates are propagated reliably even if the network partitions, as long as eventually connectivity is restored
// Might be inefficient if state is large
interface StateBasedCRDT<ValueType, StateMessageType> extends CRDT<ValueType, StateMessageType> {
readonly state: StateMessageType; // Full serializable state that gets propagated to other replicas.
// apply for StateBasedCRDT is a "merging" of a remote state into the local state.
// This merging must be commutative, associative and idempotent
}
// A delta-based CRDT propagates deltas (changes) instead of full states.
// The "apply" method merges a remote delta.
// The merge function must be commutative, associative, and idempotent (a "join").
interface DeltaBasedCRDT<ValueType, DeltaMessageType> extends CRDT<ValueType, DeltaMessageType> {
// apply(remoteDelta: DeltaMessageType): DeltaBasedCRDT<ValueType, DeltaMessageType>
}
// Op-based CRDT receives operations from other replicas and applies them to its local state
// Applies a remote operation to the local state. Must be commutative.
// Needs a more reliable network: All updates must be delivered at every replica either in the right order or be commutative
interface OperationBasedCRDT<ValueType, OpMessageType> extends CRDT<ValueType, OpMessageType> {
// No extensions are necessary, but the expectation from apply are weaker, and from channel are stronger
// apply(remoteMessage) must be commutative? associative? idempotent?
}
// A basic CRDT primitive and example
// Last-Writer-Wins CRDT's "merge" of remote state is simply copies the latest state based on the timestamp
type LWWRegisterState<VT> = {timestamp: number, nodeId: string; value: VT};
class LWWRegister<VT> implements StateBasedCRDT<VT, LWWRegisterState<VT>> {
readonly nodeId: string;
state: LWWRegisterState<VT>;
constructor(nodeId: string, state: LWWRegisterState<VT>) {
this.nodeId = nodeId;
this.state = state;
}
getValue() : VT {
return this.state.value;
}
setValue(newValue: VT): MutationResult<LWWRegister<VT>, LWWRegisterState<VT>> {
const newState: LWWRegisterState<VT> = {
timestamp: Date.now(), // Keeping it simple for now. Might use Lamport clocks in the future
nodeId: this.nodeId,
value: newValue
};
return {updatedCrdt: new LWWRegister(this.nodeId, newState), message: newState};
}
apply(remoteState: LWWRegisterState<VT>): this {
if (this.state.timestamp > remoteState.timestamp) return this; // discard the incoming value
if (this.state.timestamp == remoteState.timestamp && this.nodeId > remoteState.nodeId) return this;
return new LWWRegister<VT>(this.nodeId, remoteState) as this;
}
}
/* LWWMap - a container CRDT for named values which are themselves of type LWWRegister */
// As a map, it will be best implemented as a delta-based CRDT
type LWWMapState<VT> = Map<string, LWWRegisterState<VT | null | undefined>>;
class LWWMap<VT> implements DeltaBasedCRDT<Map<string, VT>, LWWMapState<VT>> {
readonly nodeId: string;
readonly #data = new Map<string, LWWRegister<VT | null | undefined>>();
private constructor(nodeId: string, data: Map<string, LWWRegister<VT | null | undefined>>) {
this.nodeId = nodeId;
this.#data = data;
}
// Public factory for creating an empty map
static startEmpty<VT>(nodeId: string): LWWMap<VT> {
return new LWWMap(nodeId, new Map());
}
// Public factory for creating from a state
static fromState<VT>(nodeId: string, state: LWWMapState<VT>): LWWMap<VT> {
const data = new Map<string, LWWRegister<VT | null | undefined>>();
for (const [key, registerState] of state.entries()) {
data.set(key, new LWWRegister(nodeId, registerState));
}
return new LWWMap(nodeId, data);
}
getValueForKey(key: string): VT | null | undefined {
return this.#data.get(key)?.getValue() ?? undefined;
}
hasKey(key: string): boolean {
const val = this.#data.get(key)?.getValue();
return val !== null && val !== undefined;
}
countKeys(): number {
return this.#data.size;
}
getValue(): Map<string, VT> {
const value = new Map<string, VT>();
for (const [key, register] of this.#data.entries()) {
const registerValue = register.getValue();
if (registerValue !== null && registerValue !== undefined) {
value.set(key, registerValue as VT);
}
}
return value;
}
// Helper to get or create a register *instance* (not state)
private getOrCreateRegister(key: string): LWWRegister<VT | null | undefined> {
let register = this.#data.get(key);
if (!register) {
register = new LWWRegister<VT | null | undefined>(
this.nodeId,
{timestamp: 0, nodeId: this.nodeId, value: undefined} // use timestamp=0 for a new register
);
}
return register;
}
removeKey(key: string) : MutationResult<LWWMap<VT>, LWWMapState<VT>> {
const register = this.getOrCreateRegister(key);
const { updatedCrdt: newRegister, message: newRegisterState } = register.setValue(null);
const newData = new Map(this.#data);
newData.set(key, newRegister);
const deltaMessage = new Map([[key, newRegisterState!]]);
const newMapInstance = new LWWMap(this.nodeId, newData);
return {updatedCrdt: newMapInstance as this, message: deltaMessage};
}
setValueForKey(key: string, value: VT | null) : MutationResult<LWWMap<VT>, LWWMapState<VT>> {
const register = this.getOrCreateRegister(key);
const { updatedCrdt: newRegister, message: newRegisterState } = register.setValue(value);
const newData = new Map(this.#data);
newData.set(key, newRegister);
const deltaMessage = new Map([[key, newRegisterState!]]);
const newMapInstance = new LWWMap<VT>(this.nodeId, newData);
return {updatedCrdt: newMapInstance, message: deltaMessage}
}
get state() {
const state: LWWMapState<VT> = new Map<string, LWWRegisterState<VT>>();
for (const [key, register] of this.#data.entries()) {
state.set(key, register.state);
}
return state;
}
// This needs to be able to merge deltas - which might be full states
apply(remoteDelta: LWWMapState<VT>): this {
// We need a new data map for the new immutable instance
const newData = new Map(this.#data);
for (const [key, remoteRegisterState] of remoteDelta.entries()) {
const localRegister = this.#data.get(key);
if (localRegister) {
// Register exists, merge it and store the new *register instance*
const mergedRegister = localRegister.apply(remoteRegisterState);
newData.set(key, mergedRegister);
} else {
// New key, create a new register *instance* from the remote state
newData.set(key, new LWWRegister(this.nodeId, remoteRegisterState));
}
}
return new LWWMap<VT>(this.nodeId, newData) as this;
}
}
// Grow-only Set as an op-based CRDT
// The only operation on GSet is 'add', which carries the value.
type GSetOp<VT> = {type: 'add'; value: VT;};
class GSet<VT> implements OperationBasedCRDT<Set<VT>, GSetOp<VT>> {
// The internal state is just a standard Set
readonly #data = new Set<VT>();
constructor(initialSet?: Set<VT>){
this.#data = new Set(initialSet);
}
getValue() : Set<VT> {
return new Set(this.#data);
}
has(item: VT): boolean {
return this.#data.has(item);
}
add(newItem: VT) : MutationResult<this, GSetOp<VT>> {
if(this.#data.has(newItem)) { // If we already have it, no need to update or broadcast
return {updatedCrdt: this, message: null}
}
// Update local state by simply adding the item to the set.
// Guaranteed to be commutative because Set.add() is.
// Set.add() does nothing, making it idempotent.
const newData = new Set(this.#data);
newData.add(newItem);
// Return the updated crdt and the op to be broadcast
return {updatedCrdt: new GSet<VT>(newData) as this, message: {type: 'add', value: newItem}}
}
apply(remoteOp: GSetOp<VT>): this {
return this.add(remoteOp.value).updatedCrdt; // We ignore message because we RECEIVED this op from outside
}
}
// Now we define types to specify a Distributed System
// Each network zone gets multiple nodes. Each node can have local state and computations.
// Some states are the source of truth. Others are derived values from other states via computations.
// The AppSpec gets used to instantiate an actual runtime App (which is a network of nodes)
// all computations can only depend on the local state
// all local state must be declared with the proper crdt type which determines how updates are generated and applied
// all derived state explicitly specifies the function that compute updates, and its dependencies
// all computations receive an additional event parameter which includes session info that makes access-control possible
type Path = string; // A zone-prefixed path to a state variable or a computation, e.g., "/client/my_todos"
type CrdtType = "LWWRegister" | "LWWMap" | "GSet";
type ValueType = "object" | "number" | "string" | "boolean" | "array";
type SourceState = {
crdtType: CrdtType;
valueType: ValueType;
init: any;
};
type DerivedState = {
crdtType: CrdtType;
valueType: ValueType;
computer: Path;
depends_on: Path[]
};
type Effect<VT> = {
nodeId: string;
path: Path;
msg: LWWRegisterState<VT> | LWWMapState<VT> | GSetOp<VT>
};
type Session = { claims: any; signature: string; }
type ZEvent = {nodeId: string; session: Session, payload: any};
/*
Computation, even reactive, always gets the event as an argument.
This is mostly useful for getting session info so that access-control can work.
Explicit events also make it easy to replay and debug things.
For reactive computations, the framework will provide the event.
*/
type Computation = (node: RunningNode, event: ZEvent) => Effect<any>;
type ZoneName = string;
type NodeSpec = { [varName: string]: SourceState | DerivedState | Computation; }
// Every zone must have atleast one channel with one other zone so that the entire system is connected
type ChannelType = "websocket";
type Channel = { type: ChannelType; fromZone: ZoneName; toZone: ZoneName; }
type AppSpec = {
zones: { [zoneName: ZoneName]: NodeSpec; };
channels: Channel[]
}
type Connection = {
fromNodeId: string;
fromZone: ZoneName;
toNodeId: string;
toZone: ZoneName;
channelType: ChannelType;
isActive: boolean;
latency: number;
}
type MsgQueue<VT> = Effect<VT>[];
type RunningNode = {
id: string;
spec: NodeSpec;
state: {[name: string]: any};
zone: ZoneName;
signingSecret: string;
incomingQueue: MsgQueue<any>;
outgoingQueue: MsgQueue<any>;
getTimestamp: () => number;
}
/* This is the type for an actual distributed system instantiated from an AppSpec */
type RunningApp = {
zones: { [zoneName: ZoneName]: RunningNode[]; };
connections: Connection[];
}
const initState = (nodeId: string, crdtType: CrdtType, valueType: ValueType, init?: Function) => {
let init_for_derived_state = (name: string) => { console.log("init_for_derived_state", name)};
let initializer: Function = init || init_for_derived_state;
if(crdtType == "LWWRegister") {
switch(valueType) {
case "boolean": return new LWWRegister<boolean>(nodeId, {nodeId: nodeId, timestamp: Date.now(), value: initializer()});
case "string": return new LWWRegister<string>(nodeId, {nodeId: nodeId, timestamp: Date.now(), value: initializer()});
case "number": return new LWWRegister<number>(nodeId, {nodeId: nodeId, timestamp: Date.now(), value: initializer()});
case "object": return new LWWRegister<object>(nodeId, {nodeId: nodeId, timestamp: Date.now(), value: initializer()});
case "array": return new LWWRegister<any[]>(nodeId, {nodeId: nodeId, timestamp: Date.now(), value: initializer()});
}
} else if (crdtType == "LWWMap") {
return LWWMap.fromState<any>(nodeId, new Map());
} else if (crdtType == "GSet") {
switch(valueType) {
case "boolean": return new GSet<boolean>(new Set(initializer()));
case "string": return new GSet<string>(new Set(initializer()));
case "number": return new GSet<number>(new Set(initializer()));
case "object": return new GSet<object>(new Set(initializer()));
case "array": return new GSet<any[]>(new Set(initializer()));
}
}
}
const nodeSpecToState = (nodeId: string, nodeSpec: NodeSpec) => {
let stateVars: [string, (SourceState | DerivedState)][] = Object.entries(nodeSpec)
.filter(([k,v]) => typeof(v) != "function") as [string, (SourceState | DerivedState)][];
const stateObject: {[name: string]: any} = {};
for (const [k, v] of stateVars) {
// Pass nodeId for LWWRegister
stateObject[k] = initState(nodeId, v.crdtType, v.valueType, 'init' in v && v.init);
}
return stateObject;
}
const initNode = (zoneName: ZoneName, nodeSpec: NodeSpec) : RunningNode => {
let nodeId = globalThis.crypto.randomUUID();
return {
spec: nodeSpec,
state: nodeSpecToState(nodeId, nodeSpec),
zone: zoneName,
id: nodeId,
signingSecret: globalThis.crypto.randomUUID(),
incomingQueue: [] as MsgQueue<any>,
outgoingQueue: [] as MsgQueue<any>,
getTimestamp: () => { return Date.now() }
}
}
const establishConnection = (fromNode: RunningNode, toNode: RunningNode, channelType: ChannelType) : Connection => {
return {
fromNodeId: fromNode.id,
fromZone: fromNode.zone,
toNodeId: toNode.id,
toZone: toNode.zone,
channelType: channelType,
isActive: true,
latency: 0.1
}
};
const startApp = (app: AppSpec) : RunningApp => {
// TODO: Remove hard-coding for todoApp
// start at least one node in each zone
let dbNode = initNode("db", app.zones.db);
let serverNodes = [
initNode("server", app.zones.server),
initNode("server", app.zones.server)
];
let clientNodes = [
initNode("client", app.zones.client),
initNode("client", app.zones.client)
];
// establish channels to setup sessions
let conn1 = establishConnection(dbNode, serverNodes[0], "websocket")
let conn2 = establishConnection(serverNodes[0], clientNodes[0], "websocket")
let conn3 = establishConnection(serverNodes[0], clientNodes[1], "websocket")
// TODO: Build dependency graph for auto-pushing reactive updates
return {
zones: {
db: [dbNode],
server: serverNodes,
client: clientNodes
},
connections: [conn1, conn2, conn3]
}
}
// Example: Spec for a 3-tier todo app
// db is responsible for persistence
// server is needed for access-control and expensive computations
// client is where user-interaction happens which generates effects
// The framework will implement compatibility of effects with the destination CRDT types
// challenge exercise:
// (0) Show that computational work is done only where necessary, eg: for connected clients only
// (1) model multi-layer client storage (RAM -> localStorage/SQLite)
// (2) model database read replicas that take the burden off the master instance which still handles all write queries
// (3) Show computations on server using a different programming language than the client
// (4) Show example of long-running operations which send progress updates to the client
// (5) Show that incompatibility of effects and target data type is detected and raises errors
// (6) Show that access-control works as expected and no data leakage occurs
const todo: AppSpec = {
zones: {
db: {
all_todos: {
crdtType: "LWWMap",
valueType: "object",
init: () => (new Map())
},
calculate_user_count: (dbNode: RunningNode, event: ZEvent) => ({
nodeId: event.nodeId,
path: '/server/user_count',
msg: {
nodeId: dbNode.id,
timestamp: dbNode.getTimestamp(),
value: dbNode.state.all_todos.countKeys()
}
})
},
server: {
user_count: {
crdtType: "LWWRegister",
valueType: "number",
computer: '/db/calculate_user_count',
depends_on: ['/db/all_todos']
},
filter_todos_for_user: (serverNode: RunningNode, event: ZEvent) => ({
nodeId: event.nodeId,
path: '/client/my_todos',
msg: {
nodeId: serverNode.id,
timestamp: serverNode.getTimestamp(),
value: serverNode.state.all_todos.getValueForKey(event.session.claims.user_id) || []
}
}),
},
client: {
my_todos: {
crdtType: "LWWRegister",
valueType: "array",
computer: '/server/filter_todos_for_user',
depends_on: ['/db/all_todos']
},
task_input: {
crdtType: "LWWRegister",
valueType: "string",
init: () => ""
},
ui: {
crdtType: "LWWMap",
valueType: "object",
computer: '/client/render',
depends_on: ['/client/my_todos']
},
render: (clientNode: RunningNode, event: ZEvent) => ({
nodeId: event.nodeId,
path: '/client/ui',
msg: new Map<string, LWWRegisterState<any>>()
})
}
},
channels: [
{type: "websocket", fromZone: "server", toZone: "client"},
{type: "websocket", fromZone: "db", toZone: "server"},
]
}
let app = startApp(todo); // starts all the nodes and sets up the network to start ticking
console.log({app});
const injectLocalMutation = (app: RunningApp, nodeId: string, path: string, fn: (crdt: CRDT<any, any>) => void) => {
let zone = path.split("/")[0];
let node = app.zones[zone].find((n) => n.id == nodeId);
// TODO: Complete this
}
const tick = (app: RunningApp) => {
// TODO: Process msgs in the queues
}
const simulate = (app: RunningApp) => {
// TODO: Simulate user-interaction and system ticks here
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment