Created
November 10, 2025 13:43
-
-
Save colelawrence/8365716792819657123f4d5c8fe6847e to your computer and use it in GitHub Desktop.
Bridge between LiveStore & Jotai
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import { makeInMemoryAdapter } from "@livestore/adapter-web"; | |
| import { type Store, createStorePromise } from "@livestore/livestore"; | |
| import { LogLevel } from "effect"; | |
| import { JotaiBatchCoordinator } from "./jotai-batch-coordinator.ts"; | |
| import { schema } from "./schema.ts"; | |
| export async function createLiveStoreForTesting({ | |
| batchUpdates, | |
| batchCoordinator, | |
| }: { | |
| batchUpdates?: (run: () => void) => void; | |
| batchCoordinator?: JotaiBatchCoordinator; | |
| }): Promise<Store<typeof schema>> { | |
| const adapter = makeInMemoryAdapter({ | |
| // Attempting to make the startup, and execution faster in testing | |
| sync: { livePull: false, onSyncError: "shutdown" }, | |
| }); | |
| // Create a synchronous batch function for tests | |
| // This batches updates like React but runs synchronously to avoid timing issues in tests | |
| const testBatchFunction = (cb: () => void) => { | |
| cb(); | |
| }; | |
| const wrappedBatchUpdates = batchCoordinator | |
| ? batchCoordinator.createBatchUpdatesWrapper(testBatchFunction) | |
| : (batchUpdates ?? testBatchFunction); | |
| const store = await createStorePromise({ | |
| schema, | |
| adapter, | |
| storeId: `test-livestore-${Date.now()}-${Math.floor(Math.random() * 1000000)}`, | |
| logLevel: LogLevel.Warning, | |
| batchUpdates: wrappedBatchUpdates, | |
| confirmUnsavedChanges: false, | |
| // Attempting to reduce startup time by disabling devtools | |
| disableDevtools: true, | |
| }); | |
| if (batchCoordinator) { | |
| JotaiBatchCoordinator.markStoreWired(store, batchCoordinator); | |
| } | |
| return store; | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import { DevError } from "@phosphor/utils"; | |
| import type { JotaiStore, PrimitiveAtom } from "jotai"; | |
| const WIRED = Symbol.for("phosphor.livestore.jotaiBatchWired"); | |
| export interface JotaiBatchCoordinatorOptions { | |
| /** | |
| * Probe to check if a LiveStore dispatch is currently in progress. | |
| * When true, Jotai atom flushes are deferred until dispatch completes. | |
| */ | |
| isDispatching?: () => boolean; | |
| } | |
| /** | |
| * Coordinates batched updates from LiveStore to Jotai atoms with dispatch-aware timing. | |
| * | |
| * When LiveStore processes multiple changes in a batch, this coordinator | |
| * queues all Jotai atom updates and flushes them together when the batch | |
| * completes. This ensures that: | |
| * 1. All related state changes update together | |
| * 2. React components only re-render once per batch | |
| * 3. No intermediate inconsistent states are visible | |
| * 4. Jotai updates occur AFTER LiveStore dispatch completes (not during) | |
| */ | |
| export class JotaiBatchCoordinator { | |
| private static idCounter = 0; | |
| private readonly id = ++JotaiBatchCoordinator.idCounter; | |
| private updateQueue = new Map<PrimitiveAtom<any>, any>(); | |
| private batchDepth = 0; | |
| private flushPromise: Promise<void> | null = null; | |
| private flushScheduled = false; | |
| private cancelFlush = false; // Used to cancel pending flushes (mainly for tests) | |
| private reactBatch?: (callback: () => void) => void; | |
| constructor( | |
| private readonly jotaiStore: JotaiStore, | |
| private readonly opts?: JotaiBatchCoordinatorOptions, | |
| ) {} | |
| /** | |
| * Mark a LiveStore instance as properly wired to this coordinator. | |
| * Call this after createStore() in your setup code. | |
| */ | |
| static markStoreWired(store: object, coordinator: JotaiBatchCoordinator): void { | |
| (store as any)[WIRED] = coordinator.id; | |
| } | |
| /** | |
| * Assert that a LiveStore instance is properly wired to this coordinator. | |
| * Call this at the integration boundary (e.g., workspace scope creation). | |
| */ | |
| static assertWired(store: object, coordinator: JotaiBatchCoordinator): void { | |
| if ((store as any)[WIRED] !== coordinator.id) { | |
| throw new DevError( | |
| "Missing LiveStore batchUpdates wiring: JotaiBatchCoordinator must wrap and be passed to createStore({ batchUpdates }).", | |
| { | |
| hint: "Pass batchCoordinator.createBatchUpdatesWrapper(reactBatch) to createStore({ batchUpdates }) and call JotaiBatchCoordinator.markStoreWired(liveStore, batchCoordinator).", | |
| code: "LSWIRE1", | |
| }, | |
| ); | |
| } | |
| } | |
| /** | |
| * Queue an atom update. If we're inside a batch, the update is queued. | |
| * Otherwise, it's applied immediately. | |
| */ | |
| queueUpdate<T>(atom: PrimitiveAtom<T>, value: T): void { | |
| if (this.batchDepth > 0) { | |
| // Inside batch - queue the update | |
| this.updateQueue.set(atom, value); | |
| } else { | |
| // Outside batch - apply immediately | |
| this.jotaiStore.set(atom as any, value); | |
| } | |
| } | |
| /** | |
| * Create a batchUpdates function that can be passed to LiveStore. | |
| * This wraps React's batchUpdates and schedules Jotai flushes to occur | |
| * AFTER LiveStore dispatch completes (not during). | |
| */ | |
| createBatchUpdatesWrapper(reactBatchUpdates: (callback: () => void) => void): (callback: () => void) => void { | |
| // Capture reactBatch for deferred flushes | |
| this.reactBatch = reactBatchUpdates; | |
| const wrapper = (callback: () => void) => { | |
| // Enter batch (supports nesting) | |
| this.batchDepth++; | |
| try { | |
| // Run the LiveStore callback | |
| callback(); | |
| } finally { | |
| // Exit batch | |
| this.batchDepth--; | |
| if (this.batchDepth === 0 && this.updateQueue.size > 0) { | |
| // Schedule flush (deferred if dispatching) | |
| this.scheduleFlush(); | |
| } | |
| } | |
| }; | |
| // Tag the wrapper with this coordinator's id | |
| Object.defineProperty(wrapper, WIRED, { value: this.id, configurable: false }); | |
| return wrapper; | |
| } | |
| /** | |
| * Execute the flush immediately (synchronously but inside React batch). | |
| * Only call this when isDispatching is false. | |
| */ | |
| private flushNow(): void { | |
| const doFlush = () => { | |
| const rb = this.reactBatch ?? ((cb) => cb()); | |
| rb(() => { | |
| for (const [atom, value] of this.updateQueue) { | |
| try { | |
| this.jotaiStore.set(atom as any, value); | |
| } catch (err) { | |
| const strings = String(err) + (err instanceof AggregateError ? err.errors.map(String).join("\n") : ""); | |
| // With proper timing, these errors should be rare | |
| if (!strings.includes("Dispatching happened while dispatching")) { | |
| console.error("Error setting Jotai atom during batch flush:", err); | |
| } else { | |
| // This shouldn't happen with deferred flush - log it | |
| console.warn("Re-entrancy error detected despite deferred flush:", err); | |
| } | |
| } | |
| } | |
| this.updateQueue.clear(); | |
| }); | |
| }; | |
| doFlush(); | |
| } | |
| /** | |
| * Schedule a flush, executing immediately if no dispatch is active, | |
| * or deferring to microtask if dispatch is in progress. | |
| * This ensures Jotai updates happen AFTER LiveStore dispatch completes. | |
| */ | |
| private scheduleFlush(): void { | |
| if (this.flushScheduled || this.updateQueue.size === 0) return; | |
| this.flushScheduled = true; | |
| this.cancelFlush = false; // Reset cancel flag | |
| // Check if dispatch is active RIGHT NOW | |
| if (!this.opts?.isDispatching?.()) { | |
| // No dispatch active - flush synchronously | |
| try { | |
| this.flushNow(); | |
| } finally { | |
| this.flushScheduled = false; | |
| // In case something queued during flush, schedule again | |
| if (this.updateQueue.size > 0 && !this.cancelFlush) { | |
| this.scheduleFlush(); | |
| } | |
| } | |
| return; | |
| } | |
| // Dispatch is active - defer to microtask and retry | |
| let retryCount = 0; | |
| const MAX_RETRIES = 1000; // Guardrail against infinite loops (generous for slow tests) | |
| const attempt = () => { | |
| // Check if flush was cancelled | |
| if (this.cancelFlush) { | |
| this.flushScheduled = false; | |
| this.updateQueue.clear(); | |
| return; | |
| } | |
| // Check if dispatch is still in progress | |
| if (this.opts?.isDispatching?.()) { | |
| retryCount++; | |
| if (retryCount > MAX_RETRIES) { | |
| this.flushScheduled = false; | |
| const error = new DevError("Flush retry limit exceeded - isDispatching() never cleared", { | |
| retryCount, | |
| queueSize: this.updateQueue.size, | |
| code: "FLUSH_STARVATION", | |
| }); | |
| // Don't throw in microtask - just mark as failed and stop retrying | |
| console.error(error); | |
| return; | |
| } | |
| // Defer to next microtask and retry | |
| queueMicrotask(attempt); | |
| return; | |
| } | |
| // Ready to flush - dispatch has completed | |
| // Execute flush synchronously but wrap in resolved promise for tracking | |
| try { | |
| this.flushNow(); | |
| } finally { | |
| this.flushScheduled = false; | |
| this.flushPromise = null; | |
| // In case something queued during flush, schedule again | |
| if (this.updateQueue.size > 0 && !this.cancelFlush) { | |
| this.scheduleFlush(); | |
| } | |
| } | |
| }; | |
| // Start the retry loop | |
| queueMicrotask(attempt); | |
| } | |
| /** | |
| * Wait for any pending flush to complete. | |
| * Useful in tests or for ensuring updates have been applied. | |
| */ | |
| async waitForFlush(): Promise<void> { | |
| let iterations = 0; | |
| const MAX_ITERATIONS = 50; // Increase limit for slow tests or dispatch delays | |
| // Wait until no pending flush and queue is empty | |
| while (this.flushPromise || this.updateQueue.size > 0 || this.flushScheduled) { | |
| if (this.flushPromise) { | |
| await this.flushPromise; | |
| } else { | |
| // Queue has items or flush is scheduled but hasn't started yet - wait a bit | |
| await new Promise((resolve) => setTimeout(resolve, 20)); | |
| } | |
| iterations++; | |
| if (iterations > MAX_ITERATIONS) { | |
| throw new DevError("waitForFlush exceeded iteration limit", { | |
| queueSize: this.updateQueue.size, | |
| flushScheduled: this.flushScheduled, | |
| hasFlushPromise: Boolean(this.flushPromise), | |
| isDispatching: this.opts?.isDispatching?.() ?? false, | |
| code: "WAIT_FLUSH_TIMEOUT", | |
| }); | |
| } | |
| } | |
| } | |
| /** | |
| * Get current queue size (mainly for debugging/testing) | |
| */ | |
| get queueSize(): number { | |
| return this.updateQueue.size; | |
| } | |
| /** | |
| * Cancel any pending flush and clear the queue. | |
| * Mainly for test cleanup to prevent background retries from affecting other tests. | |
| */ | |
| cancelPendingFlush(): void { | |
| this.cancelFlush = true; | |
| this.updateQueue.clear(); | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import type { Queryable, Store } from "@livestore/livestore"; | |
| import { type IPool, shallowEquals } from "@phosphor/utils"; | |
| import { type Atom, atom } from "jotai"; | |
| import type { JotaiBatchCoordinator } from "./jotai-batch-coordinator.ts"; | |
| // Cache to prevent duplicate subscriptions for the same query | |
| // NOTE: not sure if module-scoping is the best approach here, | |
| // an AI agent once hypothesized that this should be scoped per-store. | |
| // But idk. | |
| const queryAtomCache = new WeakMap<Queryable<any>, Atom<any>>(); | |
| export interface AsJotaiAtomFn { | |
| <T>(query: Queryable<T>, equals?: (a: T, b: T) => boolean): Atom<T>; | |
| } | |
| /** | |
| * Creates a read-only Jotai atom that stays in sync with a LiveStore query. | |
| * The LiveStore subscription is established immediately and cleaned up | |
| * automatically using the provided DisposePool for resource management. | |
| * | |
| * Updates are coordinated through the batch coordinator to ensure that multiple | |
| * LiveStore changes result in a single batched update to Jotai atoms. | |
| * | |
| * @param pool - DisposePool for managing subscription cleanup | |
| * @param store - LiveStore instance | |
| * @param batchCoordinator - Coordinator for batching Jotai updates with LiveStore | |
| * @returns Function that creates atoms for LiveStore queries with optional equality check | |
| */ | |
| export function createLiveStoreQueryAsJotaiAtomFn( | |
| pool: IPool, | |
| store: Store<any, any>, | |
| batchCoordinator: JotaiBatchCoordinator, | |
| ): AsJotaiAtomFn { | |
| return <T>(query: Queryable<T>, equals: (a: T, b: T) => boolean = shallowEquals) => { | |
| // Check cache first to prevent duplicate subscriptions | |
| if (queryAtomCache.has(query)) { | |
| return queryAtomCache.get(query) as Atom<T>; | |
| } | |
| let lastValue = store.query(query); | |
| const queryAtom = atom<T>(lastValue); | |
| queryAtom.debugLabel = `livestore:${(query as any).label}`; | |
| pool.add( | |
| store.subscribe( | |
| query, | |
| (value) => { | |
| // Queue the update through the batch coordinator | |
| // This ensures it's applied together with other LiveStore updates | |
| batchCoordinator.queueUpdate(queryAtom, value); | |
| }, | |
| { label: "as Jotai atom" }, | |
| ), | |
| ); | |
| // Cache the atom to prevent duplicates within this store | |
| queryAtomCache.set(query, queryAtom); | |
| return queryAtom; | |
| }; | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment