Last active
October 18, 2024 18:23
-
-
Save omranjamal/5d46cb502e8b5aa8e8cf46207d753bfe to your computer and use it in GitHub Desktop.
Locks, Semaphonres, Events & Barriers using a powerful primitive to build synchronization primitives
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function atom<T>(initialState: T) { | |
let state = initialState; | |
const pendingUpdates = new Map<Function, Function>(); | |
const blocked = new Map<Function, Function>(); | |
function update(predicate: (state: T) => boolean, nextState: T | ((state: T) => T)): Promise<T> { | |
function attemptToApplyPendingUpdates() { | |
for (const [predicate, update] of pendingUpdates) { | |
if (predicate(state)) { | |
pendingUpdates.delete(predicate); | |
update(); | |
break; | |
} | |
} | |
} | |
function attempToUnblock() { | |
for (const [predicate, unblock] of blocked) { | |
if (predicate(state)) { | |
blocked.delete(predicate); | |
unblock(); | |
} | |
} | |
} | |
if (predicate(state)) { | |
if (nextState instanceof Function) { | |
state = nextState(state); | |
} else { | |
state = nextState; | |
} | |
return new Promise((resolve) => { | |
resolve(state); | |
attemptToApplyPendingUpdates(); | |
attempToUnblock(); | |
}); | |
} else { | |
return new Promise((resolve) => { | |
pendingUpdates.set(predicate, () => { | |
if (nextState instanceof Function) { | |
state = nextState(state); | |
} else { | |
state = nextState; | |
} | |
resolve(state); | |
attemptToApplyPendingUpdates(); | |
attempToUnblock(); | |
}); | |
}); | |
} | |
} | |
function when(predicate: (state: T) => boolean, reaction: (state: T) => void): void; | |
function when(predicate: (state: T) => boolean): Promise<T>; | |
function when(predicate: (state: T) => boolean, reaction?: ((state: T) => void)): any { | |
if (reaction) { | |
if (predicate(state)) { | |
reaction(state); | |
} else { | |
blocked.set(predicate, (state: T) => reaction(state)); | |
} | |
} else { | |
if (predicate(state)) { | |
return Promise.resolve(state); | |
} else { | |
return new Promise<T>((resolve) => { | |
blocked.set(predicate, (state: T) => resolve(state)); | |
}); | |
} | |
} | |
} | |
return { update, when }; | |
} | |
const rlockAtom = atom<[number, number]>([-1, 0]); | |
async function testRLock(n: number) { | |
await rlockAtom.update(([thread, locked]) => (thread === -1 || thread === n), ([thread, locked]) => [n, locked + 1]); | |
await rlockAtom.update(([thread, locked]) => (thread === -1 || thread === n), ([thread, locked]) => [n, locked + 1]); | |
console.log(`LCOKED ${n}`); | |
await rlockAtom.update(() => true, ([thread, locked]) => locked === 1 ? [-1, 0] : [n, locked - 1]); | |
await rlockAtom.update(() => true, ([thread, locked]) => locked === 1 ? [-1, 0] : [n, locked - 1]); | |
console.log(`RELEASING: ${n}`); | |
} | |
// Promise.all([testRLock(1), testRLock(2), testRLock(3), testRLock(4), testRLock(5)]); | |
const semaphoreAtom = atom(3); | |
async function testSemaphore(n: number) { | |
await semaphoreAtom.update( | |
(available) => available > 0, | |
(available) => available - 1 | |
); | |
console.log(`LOCKED: ${n}`); | |
await new Promise(accept => setTimeout(accept, 1000 + Math.round(Math.random() * 1000))); | |
console.log(`RELEASING: ${n}`); | |
semaphoreAtom.update(() => true, available => available + 1); | |
} | |
// Promise.all([testSemaphore(1), testSemaphore(2), testSemaphore(3), testSemaphore(4), testSemaphore(5)]); | |
const barrierAtom = atom(5); | |
async function testBarrier(n: number) { | |
await barrierAtom.update( | |
() => true, | |
(remainingSeats) => remainingSeats - 1 | |
); | |
console.log(`${n}: WAITING FOR SEATS TO FILL`); | |
await barrierAtom.when((remainingSeats) => remainingSeats === 0); | |
console.log(`${n}: RUNNING ${n}`); | |
} | |
// Promise.all([testBarrier(1), testBarrier(2), testBarrier(3), testBarrier(4), testBarrier(5)]); | |
const eventAtom = atom(false); | |
async function testEvent(n: number) { | |
console.log(`${n}: WAITING`); | |
await eventAtom.when((isSet) => isSet === true); | |
console.log(`${n}: RUNNING ${n}`); | |
} | |
// Promise.all([testEvent(1), testEvent(2), testEvent(3), testEvent(4), testEvent(5)]); | |
// console.log('TRIGGERING EVENT'); | |
// eventAtom.update(() => true, true); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment