Skip to content

Instantly share code, notes, and snippets.

@colelawrence
Created November 10, 2025 13:43
Show Gist options
  • Select an option

  • Save colelawrence/8365716792819657123f4d5c8fe6847e to your computer and use it in GitHub Desktop.

Select an option

Save colelawrence/8365716792819657123f4d5c8fe6847e to your computer and use it in GitHub Desktop.
Bridge between LiveStore & Jotai
import { makeInMemoryAdapter } from "@livestore/adapter-web";
import { type Store, createStorePromise } from "@livestore/livestore";
import { LogLevel } from "effect";
import { JotaiBatchCoordinator } from "./jotai-batch-coordinator.ts";
import { schema } from "./schema.ts";
export async function createLiveStoreForTesting({
batchUpdates,
batchCoordinator,
}: {
batchUpdates?: (run: () => void) => void;
batchCoordinator?: JotaiBatchCoordinator;
}): Promise<Store<typeof schema>> {
const adapter = makeInMemoryAdapter({
// Attempting to make the startup, and execution faster in testing
sync: { livePull: false, onSyncError: "shutdown" },
});
// Create a synchronous batch function for tests
// This batches updates like React but runs synchronously to avoid timing issues in tests
const testBatchFunction = (cb: () => void) => {
cb();
};
const wrappedBatchUpdates = batchCoordinator
? batchCoordinator.createBatchUpdatesWrapper(testBatchFunction)
: (batchUpdates ?? testBatchFunction);
const store = await createStorePromise({
schema,
adapter,
storeId: `test-livestore-${Date.now()}-${Math.floor(Math.random() * 1000000)}`,
logLevel: LogLevel.Warning,
batchUpdates: wrappedBatchUpdates,
confirmUnsavedChanges: false,
// Attempting to reduce startup time by disabling devtools
disableDevtools: true,
});
if (batchCoordinator) {
JotaiBatchCoordinator.markStoreWired(store, batchCoordinator);
}
return store;
}
import { DevError } from "@phosphor/utils";
import type { JotaiStore, PrimitiveAtom } from "jotai";
const WIRED = Symbol.for("phosphor.livestore.jotaiBatchWired");
export interface JotaiBatchCoordinatorOptions {
/**
* Probe to check if a LiveStore dispatch is currently in progress.
* When true, Jotai atom flushes are deferred until dispatch completes.
*/
isDispatching?: () => boolean;
}
/**
* Coordinates batched updates from LiveStore to Jotai atoms with dispatch-aware timing.
*
* When LiveStore processes multiple changes in a batch, this coordinator
* queues all Jotai atom updates and flushes them together when the batch
* completes. This ensures that:
* 1. All related state changes update together
* 2. React components only re-render once per batch
* 3. No intermediate inconsistent states are visible
* 4. Jotai updates occur AFTER LiveStore dispatch completes (not during)
*/
export class JotaiBatchCoordinator {
private static idCounter = 0;
private readonly id = ++JotaiBatchCoordinator.idCounter;
private updateQueue = new Map<PrimitiveAtom<any>, any>();
private batchDepth = 0;
private flushPromise: Promise<void> | null = null;
private flushScheduled = false;
private cancelFlush = false; // Used to cancel pending flushes (mainly for tests)
private reactBatch?: (callback: () => void) => void;
constructor(
private readonly jotaiStore: JotaiStore,
private readonly opts?: JotaiBatchCoordinatorOptions,
) {}
/**
* Mark a LiveStore instance as properly wired to this coordinator.
* Call this after createStore() in your setup code.
*/
static markStoreWired(store: object, coordinator: JotaiBatchCoordinator): void {
(store as any)[WIRED] = coordinator.id;
}
/**
* Assert that a LiveStore instance is properly wired to this coordinator.
* Call this at the integration boundary (e.g., workspace scope creation).
*/
static assertWired(store: object, coordinator: JotaiBatchCoordinator): void {
if ((store as any)[WIRED] !== coordinator.id) {
throw new DevError(
"Missing LiveStore batchUpdates wiring: JotaiBatchCoordinator must wrap and be passed to createStore({ batchUpdates }).",
{
hint: "Pass batchCoordinator.createBatchUpdatesWrapper(reactBatch) to createStore({ batchUpdates }) and call JotaiBatchCoordinator.markStoreWired(liveStore, batchCoordinator).",
code: "LSWIRE1",
},
);
}
}
/**
* Queue an atom update. If we're inside a batch, the update is queued.
* Otherwise, it's applied immediately.
*/
queueUpdate<T>(atom: PrimitiveAtom<T>, value: T): void {
if (this.batchDepth > 0) {
// Inside batch - queue the update
this.updateQueue.set(atom, value);
} else {
// Outside batch - apply immediately
this.jotaiStore.set(atom as any, value);
}
}
/**
* Create a batchUpdates function that can be passed to LiveStore.
* This wraps React's batchUpdates and schedules Jotai flushes to occur
* AFTER LiveStore dispatch completes (not during).
*/
createBatchUpdatesWrapper(reactBatchUpdates: (callback: () => void) => void): (callback: () => void) => void {
// Capture reactBatch for deferred flushes
this.reactBatch = reactBatchUpdates;
const wrapper = (callback: () => void) => {
// Enter batch (supports nesting)
this.batchDepth++;
try {
// Run the LiveStore callback
callback();
} finally {
// Exit batch
this.batchDepth--;
if (this.batchDepth === 0 && this.updateQueue.size > 0) {
// Schedule flush (deferred if dispatching)
this.scheduleFlush();
}
}
};
// Tag the wrapper with this coordinator's id
Object.defineProperty(wrapper, WIRED, { value: this.id, configurable: false });
return wrapper;
}
/**
* Execute the flush immediately (synchronously but inside React batch).
* Only call this when isDispatching is false.
*/
private flushNow(): void {
const doFlush = () => {
const rb = this.reactBatch ?? ((cb) => cb());
rb(() => {
for (const [atom, value] of this.updateQueue) {
try {
this.jotaiStore.set(atom as any, value);
} catch (err) {
const strings = String(err) + (err instanceof AggregateError ? err.errors.map(String).join("\n") : "");
// With proper timing, these errors should be rare
if (!strings.includes("Dispatching happened while dispatching")) {
console.error("Error setting Jotai atom during batch flush:", err);
} else {
// This shouldn't happen with deferred flush - log it
console.warn("Re-entrancy error detected despite deferred flush:", err);
}
}
}
this.updateQueue.clear();
});
};
doFlush();
}
/**
* Schedule a flush, executing immediately if no dispatch is active,
* or deferring to microtask if dispatch is in progress.
* This ensures Jotai updates happen AFTER LiveStore dispatch completes.
*/
private scheduleFlush(): void {
if (this.flushScheduled || this.updateQueue.size === 0) return;
this.flushScheduled = true;
this.cancelFlush = false; // Reset cancel flag
// Check if dispatch is active RIGHT NOW
if (!this.opts?.isDispatching?.()) {
// No dispatch active - flush synchronously
try {
this.flushNow();
} finally {
this.flushScheduled = false;
// In case something queued during flush, schedule again
if (this.updateQueue.size > 0 && !this.cancelFlush) {
this.scheduleFlush();
}
}
return;
}
// Dispatch is active - defer to microtask and retry
let retryCount = 0;
const MAX_RETRIES = 1000; // Guardrail against infinite loops (generous for slow tests)
const attempt = () => {
// Check if flush was cancelled
if (this.cancelFlush) {
this.flushScheduled = false;
this.updateQueue.clear();
return;
}
// Check if dispatch is still in progress
if (this.opts?.isDispatching?.()) {
retryCount++;
if (retryCount > MAX_RETRIES) {
this.flushScheduled = false;
const error = new DevError("Flush retry limit exceeded - isDispatching() never cleared", {
retryCount,
queueSize: this.updateQueue.size,
code: "FLUSH_STARVATION",
});
// Don't throw in microtask - just mark as failed and stop retrying
console.error(error);
return;
}
// Defer to next microtask and retry
queueMicrotask(attempt);
return;
}
// Ready to flush - dispatch has completed
// Execute flush synchronously but wrap in resolved promise for tracking
try {
this.flushNow();
} finally {
this.flushScheduled = false;
this.flushPromise = null;
// In case something queued during flush, schedule again
if (this.updateQueue.size > 0 && !this.cancelFlush) {
this.scheduleFlush();
}
}
};
// Start the retry loop
queueMicrotask(attempt);
}
/**
* Wait for any pending flush to complete.
* Useful in tests or for ensuring updates have been applied.
*/
async waitForFlush(): Promise<void> {
let iterations = 0;
const MAX_ITERATIONS = 50; // Increase limit for slow tests or dispatch delays
// Wait until no pending flush and queue is empty
while (this.flushPromise || this.updateQueue.size > 0 || this.flushScheduled) {
if (this.flushPromise) {
await this.flushPromise;
} else {
// Queue has items or flush is scheduled but hasn't started yet - wait a bit
await new Promise((resolve) => setTimeout(resolve, 20));
}
iterations++;
if (iterations > MAX_ITERATIONS) {
throw new DevError("waitForFlush exceeded iteration limit", {
queueSize: this.updateQueue.size,
flushScheduled: this.flushScheduled,
hasFlushPromise: Boolean(this.flushPromise),
isDispatching: this.opts?.isDispatching?.() ?? false,
code: "WAIT_FLUSH_TIMEOUT",
});
}
}
}
/**
* Get current queue size (mainly for debugging/testing)
*/
get queueSize(): number {
return this.updateQueue.size;
}
/**
* Cancel any pending flush and clear the queue.
* Mainly for test cleanup to prevent background retries from affecting other tests.
*/
cancelPendingFlush(): void {
this.cancelFlush = true;
this.updateQueue.clear();
}
}
import type { Queryable, Store } from "@livestore/livestore";
import { type IPool, shallowEquals } from "@phosphor/utils";
import { type Atom, atom } from "jotai";
import type { JotaiBatchCoordinator } from "./jotai-batch-coordinator.ts";
// Cache to prevent duplicate subscriptions for the same query
// NOTE: not sure if module-scoping is the best approach here,
// an AI agent once hypothesized that this should be scoped per-store.
// But idk.
const queryAtomCache = new WeakMap<Queryable<any>, Atom<any>>();
export interface AsJotaiAtomFn {
<T>(query: Queryable<T>, equals?: (a: T, b: T) => boolean): Atom<T>;
}
/**
* Creates a read-only Jotai atom that stays in sync with a LiveStore query.
* The LiveStore subscription is established immediately and cleaned up
* automatically using the provided DisposePool for resource management.
*
* Updates are coordinated through the batch coordinator to ensure that multiple
* LiveStore changes result in a single batched update to Jotai atoms.
*
* @param pool - DisposePool for managing subscription cleanup
* @param store - LiveStore instance
* @param batchCoordinator - Coordinator for batching Jotai updates with LiveStore
* @returns Function that creates atoms for LiveStore queries with optional equality check
*/
export function createLiveStoreQueryAsJotaiAtomFn(
pool: IPool,
store: Store<any, any>,
batchCoordinator: JotaiBatchCoordinator,
): AsJotaiAtomFn {
return <T>(query: Queryable<T>, equals: (a: T, b: T) => boolean = shallowEquals) => {
// Check cache first to prevent duplicate subscriptions
if (queryAtomCache.has(query)) {
return queryAtomCache.get(query) as Atom<T>;
}
let lastValue = store.query(query);
const queryAtom = atom<T>(lastValue);
queryAtom.debugLabel = `livestore:${(query as any).label}`;
pool.add(
store.subscribe(
query,
(value) => {
// Queue the update through the batch coordinator
// This ensures it's applied together with other LiveStore updates
batchCoordinator.queueUpdate(queryAtom, value);
},
{ label: "as Jotai atom" },
),
);
// Cache the atom to prevent duplicates within this store
queryAtomCache.set(query, queryAtom);
return queryAtom;
};
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment