Last active
October 13, 2024 13:16
-
-
Save ATGardner/07dd1d61add451940dbd8b23df944453 to your computer and use it in GitHub Desktop.
this is a simple Node.js TypeScript implementation of a read-write-lock using a queue, without the need for any mutex or other synchronization mechanism.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { setTimeout } from 'node:timers/promises' | |
import { ReadWriteLock, ReadWriteQueueLock } from './read-write-lock' | |
describe('ReadWriteQueueLock', () => { | |
// copied test code from https://eli.thegreenplace.net/2019/implementing-reader-writer-locks/ | |
// console.logs are useful for understanding what happens when, and making sure the lock works as expected | |
async function reader(lock: ReadWriteLock, id: number, data: number[]) { | |
await setTimeout(Math.random() * 100) | |
console.log(`reader ${id} locking`) | |
await lock.rLock() | |
console.log(`reader ${id} reading, data: ${data.join(',')}`) | |
for (let i = 1; i < data.length; i++) { | |
await setTimeout(Math.random() * 100) | |
expect(data[i]).toEqual(data[i - 1] + 1) | |
} | |
console.log(`reader ${id} unlocking, data: ${data.join(',')}`) | |
await lock.rUnlock() | |
} | |
async function writer(lock: ReadWriteLock, id: number, data: number[], inc: number) { | |
await setTimeout(Math.random() * 100) | |
console.log(` writer ${id} locking`) | |
await lock.wLock() | |
console.log(` writer ${id} writing`) | |
for (let j = 0; j < data.length; j++) { | |
await setTimeout(Math.random() * 100) | |
data[j] += inc | |
} | |
console.log(` writer ${id} unlocking, data: ${data.join(',')}`) | |
await lock.wUnlock() | |
} | |
it('works with multiple readers and writers', async () => { | |
const lock = new ReadWriteQueueLock() | |
const data = Array.from({ length: 10 }, (_, i) => i) | |
const readerPromises = Array.from({ length: 10 }, (_, i) => reader(lock, i + 1, data)) | |
const writerPromises = Array.from({ length: 10 }, (_, i) => writer(lock, i + 1, data, i + 1)) | |
await Promise.all([...readerPromises, ...writerPromises]) | |
console.log('done') | |
}) | |
}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
enum LockType { | |
READ, | |
WRITE, | |
} | |
type LockRequest = { | |
type: LockType | |
resolve: () => void | |
} | |
export interface ReadWriteLock { | |
rLock() | |
rUnlock() | |
wLock() | |
wUnlock() | |
} | |
// this implementation is not using any mutex (neither in-memory nor in redis) | |
// the code that touches the queue (length check, push and shift) is all done in non-async functions | |
// so it doesn't need to lock anything | |
export class ReadWriteQueueLock implements ReadWriteLock { | |
#queue: LockRequest[] | |
#readers: number | |
#writing: boolean | |
constructor() { | |
this.#queue = [] | |
this.#readers = 0 | |
this.#writing = false | |
} | |
async rLock(): Promise<void> { | |
return new Promise((resolve) => { | |
if (this.#writing || this.#queue.length > 0) { | |
this.#queue.push({ type: LockType.READ, resolve }) | |
} else { | |
this.#readers++ | |
resolve() | |
} | |
}) | |
} | |
rUnlock() { | |
this.#readers-- | |
if (this.#readers === 0) { | |
this.#processQueue() | |
} | |
} | |
async wLock(): Promise<void> { | |
return new Promise((resolve) => { | |
if (this.#writing || this.#readers > 0) { | |
const writer = { type: LockType.WRITE, resolve } | |
this.#queue.push(writer) | |
} else { | |
this.#writing = true | |
resolve() | |
} | |
}) | |
} | |
wUnlock() { | |
this.#writing = false | |
this.#processQueue() | |
} | |
#processQueue() { | |
const nextRequest = this.#queue.shift() | |
if (!nextRequest) { | |
return | |
} | |
if (nextRequest.type === LockType.WRITE) { | |
this.#writing = true | |
nextRequest.resolve() | |
} else { | |
this.#readers++ | |
nextRequest.resolve() | |
// keep on resolving READ promises until we hit a WRITE request | |
while (this.#queue.length > 0) { | |
const followingRequest = this.#queue.shift()! | |
if (followingRequest.type === LockType.WRITE) { | |
// if we hit a WRITE request, we need to put it back at the head of the queue | |
this.#queue.unshift(followingRequest) | |
// and stop processing the queue | |
return | |
} else { | |
this.#readers++ | |
followingRequest.resolve() | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment