Created
September 29, 2025 03:47
-
-
Save amit08255/5f40c81db674b36008d6a71296baa5a4 to your computer and use it in GitHub Desktop.
This Little-Known JavaScript API Changes How We Handle Async States Across Threads
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // App.jsx | |
| import { AtomicState, useAtomicValue } from "./shared-atomic-state"; | |
| import './App.css'; | |
| // Create a shared state instance (could be moved to a context or module) | |
| const atomicState = new AtomicState(); | |
| // Create two atomic values | |
| const counterA = atomicState.createAtomicValue(0, "int"); | |
| const counterB = atomicState.createAtomicValue(100, "int"); | |
| function CounterA() { | |
| const value = useAtomicValue(counterA); | |
| return ( | |
| <div> | |
| <h3>Counter A: {value}</h3> | |
| <button onClick={() => counterA.setValue(value + 1)}>Increment A</button> | |
| </div> | |
| ); | |
| } | |
| function CounterB() { | |
| const value = useAtomicValue(counterB); | |
| return ( | |
| <div> | |
| <h3>Counter B: {value}</h3> | |
| <button onClick={() => counterB.setValue(value + 10)}>Increment B</button> | |
| </div> | |
| ); | |
| } | |
| function AtomicDemo() { | |
| return ( | |
| <div> | |
| <h2>Atomic State Demo</h2> | |
| <CounterA /> | |
| <CounterB /> | |
| </div> | |
| ); | |
| } | |
| function App() { | |
| return ( | |
| <AtomicDemo /> | |
| ) | |
| } | |
| export default App; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // App.jsx | |
| import React, { useRef } from "react"; | |
| import { CoordinationHub, useChannel } from "./coordination-hub"; | |
| // A broadcaster component that updates the channel state | |
| function Broadcaster({ hub, channelName }) { | |
| const inputRef = useRef(); | |
| const handleBroadcast = () => { | |
| const value = parseInt(inputRef.current.value, 10) || 0; | |
| const channel = hub.getChannel(channelName) || hub.createChannel(channelName); | |
| channel.broadcast(value); | |
| }; | |
| return ( | |
| <div style={{ marginBottom: 20 }}> | |
| <h2>Broadcaster</h2> | |
| <input ref={inputRef} type="number" defaultValue={0} /> | |
| <button onClick={handleBroadcast}>Broadcast State</button> | |
| </div> | |
| ); | |
| } | |
| // A listener component that subscribes to channel updates | |
| function Listener({ hub, channelName }) { | |
| const state = useChannel(hub, channelName); | |
| return ( | |
| <div> | |
| <h2>Listener</h2> | |
| <div>Current Channel State: {state}</div> | |
| </div> | |
| ); | |
| } | |
| // Main App component | |
| export default function App() { | |
| // Only create one hub instance for the app | |
| const hubRef = useRef(); | |
| if (!hubRef.current) hubRef.current = new CoordinationHub(); | |
| return ( | |
| <div style={{ padding: 40 }}> | |
| <h1>Coordination Hub Demo</h1> | |
| <Broadcaster hub={hubRef.current} channelName="demo" /> | |
| <Listener hub={hubRef.current} channelName="demo" /> | |
| </div> | |
| ); | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import { useEffect, useState } from "react"; | |
| // coordination-hub.js | |
| export class CoordinationHub { | |
| constructor() { | |
| this.buffer = new SharedArrayBuffer(256); | |
| this.view = new Int32Array(this.buffer); | |
| this.channels = new Map(); | |
| this.nextChannelId = 0; | |
| } | |
| createChannel(name) { | |
| const channelId = this.nextChannelId++; | |
| const stateIndex = channelId * 2; // Current state | |
| const signalIndex = channelId * 2 + 1; // Signal counter | |
| Atomics.store(this.view, stateIndex, 0); | |
| Atomics.store(this.view, signalIndex, 0); | |
| const channel = { | |
| name, | |
| channelId, | |
| stateIndex, | |
| signalIndex, | |
| broadcast: (state) => { | |
| Atomics.store(this.view, stateIndex, state); | |
| const signalCount = Atomics.add(this.view, signalIndex, 1) + 1; | |
| Atomics.notify(this.view, signalIndex); | |
| return signalCount; | |
| }, | |
| waitForBroadcast: async (lastSignalCount = 0, timeout = 10000) => { | |
| const result = Atomics.waitAsync( | |
| this.view, | |
| signalIndex, | |
| lastSignalCount, | |
| timeout | |
| ); | |
| if (result.async) { | |
| const status = await result.value; | |
| return status === 'ok' ? { | |
| state: Atomics.load(this.view, stateIndex), | |
| signalCount: Atomics.load(this.view, signalIndex) | |
| } : null; | |
| } | |
| return null; | |
| }, | |
| getCurrentState: () => ({ | |
| state: Atomics.load(this.view, stateIndex), | |
| signalCount: Atomics.load(this.view, signalIndex) | |
| }) | |
| }; | |
| this.channels.set(name, channel); | |
| return channel; | |
| } | |
| getChannel(name) { | |
| return this.channels.get(name); | |
| } | |
| } | |
| // React hook for channel subscription | |
| export function useChannel(hub, channelName) { | |
| const [channelState, setChannelState] = useState(null); | |
| useEffect(() => { | |
| const channel = hub.getChannel(channelName) || hub.createChannel(channelName); | |
| let lastSignalCount = channel.getCurrentState().signalCount; | |
| async function listenForBroadcasts() { | |
| while (true) { | |
| const update = await channel.waitForBroadcast(lastSignalCount); | |
| if (update) { | |
| setChannelState(update.state); | |
| lastSignalCount = update.signalCount; | |
| } | |
| } | |
| } | |
| // Set initial state | |
| setChannelState(channel.getCurrentState().state); | |
| listenForBroadcasts(); | |
| }, [hub, channelName]); | |
| return channelState; | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| const ResilientAtomicState = require('./resilient-state'); | |
| // Instantiate the state manager | |
| const state = new ResilientAtomicState(); | |
| // Example: Update state with a function | |
| async function demoUpdate() { | |
| try { | |
| const result = await state.updateState((current) => current + 5); | |
| console.log('Update result:', result); | |
| } catch (err) { | |
| console.error('Update failed:', err); | |
| } | |
| } | |
| // Example: Listen for checkpoints (in background) | |
| async function listenCheckpoints() { | |
| for await (const checkpoint of state.listenForCheckpoints()) { | |
| console.log('Checkpoint reached:', checkpoint); | |
| } | |
| } | |
| // Example: Trigger multiple updates | |
| async function runDemo() { | |
| // Start checkpoint listener | |
| listenCheckpoints(); | |
| // Perform several updates | |
| for (let i = 0; i < 15; i++) { | |
| await demoUpdate(); | |
| } | |
| // Get current state | |
| const current = state.getState(); | |
| console.log('Current state:', current); | |
| } | |
| runDemo(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // resilient-state.js | |
| class ResilientAtomicState { | |
| constructor() { | |
| // State management buffer: | |
| // [0]: primary state value | |
| // [1]: backup state value | |
| // [2]: state version/generation | |
| // [3]: error count | |
| // [4]: last error code | |
| // [5]: recovery attempts | |
| // [6]: health check flag | |
| // [7]: checkpoint trigger | |
| this.buffer = new SharedArrayBuffer(32); | |
| this.coords = new Int32Array(this.buffer); | |
| // Initialize state | |
| Atomics.store(this.coords, 0, 0); // primary state | |
| Atomics.store(this.coords, 1, 0); // backup state | |
| Atomics.store(this.coords, 2, 1); // version | |
| Atomics.store(this.coords, 3, 0); // error count | |
| Atomics.store(this.coords, 4, 0); // last error | |
| Atomics.store(this.coords, 5, 0); // recovery attempts | |
| Atomics.store(this.coords, 6, 1); // health (1 = healthy) | |
| Atomics.store(this.coords, 7, 0); // checkpoint trigger | |
| } | |
| // Resilient state update with automatic recovery | |
| async updateState(updateFn, maxRetries = 3, backoffMs = 100) { | |
| for (let attempt = 0; attempt < maxRetries; attempt++) { | |
| try { | |
| // Check system health before attempting update | |
| const health = Atomics.load(this.coords, 6); | |
| if (health === 0) { | |
| await this.waitForRecovery(5000); | |
| } | |
| // Read current state | |
| const currentState = Atomics.load(this.coords, 0); | |
| const currentVersion = Atomics.load(this.coords, 2); | |
| // Calculate new state | |
| const newState = await updateFn(currentState); | |
| // Attempt atomic update with version check | |
| const success = Atomics.compareExchange( | |
| this.coords, 0, // primary state index | |
| currentState, // expected current value | |
| newState // new value | |
| ) === currentState; | |
| if (success) { | |
| // Update successful, increment version and backup state | |
| const newVersion = Atomics.add(this.coords, 2, 1) + 1; | |
| Atomics.store(this.coords, 1, newState); // Update backup | |
| // Trigger checkpoint if needed | |
| if (newVersion % 10 === 0) { | |
| Atomics.store(this.coords, 7, newVersion); | |
| Atomics.notify(this.coords, 7); // Notify checkpoint handlers | |
| } | |
| // Reset error count on success | |
| Atomics.store(this.coords, 3, 0); | |
| return { success: true, value: newState, version: newVersion }; | |
| } else { | |
| // CAS failed, another thread updated state | |
| if (attempt < maxRetries - 1) { | |
| await this.sleep(backoffMs * Math.pow(2, attempt)); | |
| continue; | |
| } | |
| } | |
| } catch (error) { | |
| // Handle errors and update error metrics | |
| const errorCount = Atomics.add(this.coords, 3, 1) + 1; | |
| Atomics.store(this.coords, 4, error.code || -1); | |
| // Trigger recovery if error threshold exceeded | |
| if (errorCount >= 5) { | |
| await this.triggerRecovery(); | |
| } | |
| if (attempt === maxRetries - 1) { | |
| throw new Error(`State update failed after ${maxRetries} attempts: ${error.message}`); | |
| } | |
| await this.sleep(backoffMs * Math.pow(2, attempt)); | |
| } | |
| } | |
| return { success: false, error: 'Max retries exceeded' }; | |
| } | |
| // Trigger recovery process | |
| async triggerRecovery() { | |
| // Mark system as unhealthy | |
| Atomics.store(this.coords, 6, 0); | |
| // Increment recovery attempt counter | |
| Atomics.add(this.coords, 5, 1); | |
| // Attempt to restore from backup | |
| const backupState = Atomics.load(this.coords, 1); | |
| const primaryState = Atomics.load(this.coords, 0); | |
| // If primary state is corrupted, restore from backup | |
| if (this.validateState(primaryState) === false) { | |
| Atomics.store(this.coords, 0, backupState); | |
| console.log('Restored primary state from backup'); | |
| } | |
| // Reset error count | |
| Atomics.store(this.coords, 3, 0); | |
| // Mark system as healthy | |
| Atomics.store(this.coords, 6, 1); | |
| // Notify waiting operations | |
| Atomics.notify(this.coords, 6); | |
| } | |
| // Wait for system recovery | |
| async waitForRecovery(timeout = 10000) { | |
| const result = Atomics.waitAsync(this.coords, 6, 0, timeout); | |
| if (result.async) { | |
| const status = await result.value; | |
| return status === 'ok'; | |
| } | |
| return false; | |
| } | |
| // Listen for checkpoint triggers | |
| async *listenForCheckpoints() { | |
| let lastCheckpoint = 0; | |
| while (true) { | |
| const result = Atomics.waitAsync(this.coords, 7, lastCheckpoint, 30000); | |
| if (result.async) { | |
| const status = await result.value; | |
| if (status === 'ok') { | |
| lastCheckpoint = Atomics.load(this.coords, 7); | |
| yield { | |
| version: lastCheckpoint, | |
| state: Atomics.load(this.coords, 0), | |
| backup: Atomics.load(this.coords, 1), | |
| timestamp: Date.now() | |
| }; | |
| } | |
| } | |
| } | |
| } | |
| // Get current state safely | |
| getState() { | |
| return { | |
| primary: Atomics.load(this.coords, 0), | |
| backup: Atomics.load(this.coords, 1), | |
| version: Atomics.load(this.coords, 2), | |
| healthy: Atomics.load(this.coords, 6) === 1, | |
| errorCount: Atomics.load(this.coords, 3), | |
| recoveryAttempts: Atomics.load(this.coords, 5) | |
| }; | |
| } | |
| // Validate state integrity (implement based on your needs) | |
| validateState(state) { | |
| // Add your validation logic here | |
| return state !== null && state !== undefined && !isNaN(state); | |
| } | |
| sleep(ms) { | |
| return new Promise(resolve => setTimeout(resolve, ms)); | |
| } | |
| } | |
| module.exports = ResilientAtomicState; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| const { Worker, isMainThread, workerData, parentPort } = require('worker_threads'); | |
| if (isMainThread) { | |
| // --- Main Thread: Coordinator --- | |
| const coordinationBuffer = new SharedArrayBuffer(1024); | |
| const coords = new Int32Array(coordinationBuffer); | |
| // Indices: 0 = head, 1 = tail, 2 = active worker count | |
| Atomics.store(coords, 0, 0); // Task queue head | |
| Atomics.store(coords, 1, 0); // Task queue tail | |
| Atomics.store(coords, 2, 0); // Active worker count | |
| const cpuCount = require('os').cpus().length; | |
| const workers = []; | |
| for (let i = 0; i < cpuCount; i++) { | |
| const worker = new Worker(__filename, { | |
| workerData: { coordinationBuffer, workerId: i } | |
| }); | |
| worker.on('message', (msg) => { | |
| console.log(`Worker ${i}: ${msg}`); | |
| }); | |
| worker.on('error', (err) => { | |
| console.error(`Worker ${i} error:`, err); | |
| }); | |
| worker.on('exit', (code) => { | |
| if (code !== 0) console.error(`Worker ${i} stopped with exit code ${code}`); | |
| }); | |
| workers.push(worker); | |
| } | |
| // Periodically distribute random work | |
| setInterval(() => { | |
| const taskCount = Math.floor(Math.random() * 10) + 1; | |
| distributeWork(coords, taskCount); | |
| }, 1000); | |
| function distributeWork(coords, taskCount) { | |
| // Atomically add tasks to queue | |
| const oldTail = Atomics.add(coords, 1, taskCount); | |
| // Notify waiting workers | |
| const awakened = Atomics.notify(coords, 1); | |
| console.log(`Added ${taskCount} tasks, awakened ${awakened} workers`); | |
| } | |
| } else { | |
| // --- Worker Thread --- | |
| const coords = new Int32Array(workerData.coordinationBuffer); | |
| runWorker(coords, workerData.workerId, parentPort); | |
| async function runWorker(coords, workerId, parentPort) { | |
| Atomics.add(coords, 2, 1); // Increment active worker count | |
| while (true) { | |
| const currentTail = Atomics.load(coords, 1); | |
| const currentHead = Atomics.load(coords, 0); | |
| if (currentTail <= currentHead) { | |
| // No work available, wait for new tasks | |
| const result = Atomics.waitAsync(coords, 1, currentTail, 5000); | |
| if (result.async) { | |
| const status = await result.value; | |
| if (status === 'timed-out') { | |
| continue; // Check again | |
| } | |
| } | |
| } | |
| // Try to claim a task | |
| const taskId = Atomics.add(coords, 0, 1); // Increment head atomically | |
| const availableTasks = Atomics.load(coords, 1); | |
| if (taskId < availableTasks) { | |
| // Successfully claimed task | |
| const taskResult = await processTask(taskId, workerId); | |
| parentPort.postMessage(`Completed task ${taskId}: ${taskResult}`); | |
| } | |
| } | |
| } | |
| async function processTask(taskId, workerId) { | |
| // Simulate work | |
| await new Promise(resolve => setTimeout(resolve, Math.random() * 1000)); | |
| return `Task-${taskId}-by-Worker-${workerId}`; | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment