Created
June 10, 2022 11:41
-
-
Save eblocha/f06d0d8904b0c40a7aec5c31aa19253c to your computer and use it in GitHub Desktop.
A keyed read-write lock intended to lock files and directories
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { RWLock, FileLock, withPermissions } from './rwlock'; | |
describe('Base RW Lock', () => { | |
enum LockTypes { | |
READ = 'read', | |
WRITE = 'write', | |
} | |
type TestCase = { | |
locks: LockTypes[]; | |
}; | |
const tests: TestCase[] = [ | |
{ locks: [LockTypes.READ, LockTypes.WRITE, LockTypes.READ] }, | |
{ locks: [LockTypes.WRITE, LockTypes.READ] }, | |
{ locks: [LockTypes.READ, LockTypes.WRITE] }, | |
{ | |
locks: [ | |
LockTypes.READ, | |
LockTypes.READ, | |
LockTypes.WRITE, | |
LockTypes.READ, | |
LockTypes.READ, | |
LockTypes.WRITE, | |
LockTypes.READ, | |
], | |
}, | |
{ | |
locks: [ | |
LockTypes.READ, | |
LockTypes.READ, | |
LockTypes.WRITE, | |
LockTypes.READ, | |
LockTypes.READ, | |
LockTypes.READ, | |
LockTypes.READ, | |
LockTypes.WRITE, | |
LockTypes.READ, | |
LockTypes.WRITE, | |
LockTypes.WRITE, | |
LockTypes.READ, | |
], | |
}, | |
]; | |
it.each(tests)('Maintains order: $locks', async ({ locks }) => { | |
const lock = new RWLock(); | |
const data: string[] = []; | |
const expected = locks.map((type, index) => type + index.toString()); | |
const fn = async (index: number) => data.push(expected[index]); | |
await Promise.all( | |
locks.map((type, index) => | |
type === LockTypes.READ | |
? lock.withRead(() => fn(index)) | |
: lock.withWrite(() => fn(index)) | |
) | |
); | |
expect(data).toStrictEqual(expected); | |
}); | |
const asyncNOP = async () => new Promise<void>((resolve) => resolve()); | |
it('Maintains order with uneven event loop ticks', async () => { | |
const lock = new RWLock(); | |
const data: string[] = []; | |
const locks = [ | |
LockTypes.READ, | |
LockTypes.READ, | |
LockTypes.WRITE, | |
LockTypes.READ, | |
LockTypes.READ, | |
LockTypes.WRITE, | |
LockTypes.READ, | |
]; | |
const expected = locks.map((type, index) => type + index.toString()); | |
const ticks = [5, 2, 7, 10, 2, 0, 1]; | |
const fn = async (index: number) => { | |
data.push(expected[index]); | |
for (let i = 0; i < ticks[index]; i++) { | |
await asyncNOP(); | |
} | |
}; | |
await Promise.all( | |
locks.map((type, index) => | |
type === LockTypes.READ | |
? lock.withRead(() => fn(index)) | |
: lock.withWrite(() => fn(index)) | |
) | |
); | |
expect(data).toStrictEqual(expected); | |
}); | |
it('Allows concurrent readers', async () => { | |
const lock = new RWLock(); | |
const data: string[] = []; | |
await Promise.all([ | |
lock.withRead(async () => { | |
await asyncNOP(); | |
data.push('data2'); | |
}), | |
lock.withRead(() => { | |
data.push('data1'); | |
}), | |
]); | |
expect(data).toStrictEqual(['data1', 'data2']); | |
}); | |
it('Forces syncronous writers', async () => { | |
const lock = new RWLock(); | |
const data: string[] = []; | |
await Promise.all([ | |
lock.withWrite(async () => { | |
await asyncNOP(); | |
data.push('data1'); | |
}), | |
lock.withWrite(() => { | |
data.push('data2'); | |
}), | |
]); | |
expect(data).toStrictEqual(['data1', 'data2']); | |
}); | |
}); | |
describe('File lock', () => { | |
it('Maintains order of operations', async () => { | |
/** | |
* 1 2 3 | |
* file A -> R R R | |
* file B -> W R | |
*/ | |
const locks = new FileLock(); | |
const permissions1 = [locks.shareable('fileA'), locks.exclusive('fileB')]; | |
const permissions2 = [locks.shareable('fileA')]; | |
const permissions3 = [locks.shareable('fileA'), locks.shareable('fileB')]; | |
const data: string[] = []; | |
await Promise.all([ | |
withPermissions(permissions1, () => data.push('op1')), | |
withPermissions(permissions2, () => data.push('op2')), | |
withPermissions(permissions3, () => data.push('op3')), | |
]); | |
expect(data).toStrictEqual(['op1', 'op2', 'op3']); | |
}); | |
}); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Mutex, MutexInterface } from 'async-mutex'; | |
type Func<T> = () => T | Promise<T>; | |
/** Forces a function to pause and move itself to the back of the event loop */ | |
const asyncNOP = async () => new Promise<void>((resolve) => resolve()); | |
/** | |
* Single threaded write-preferring read write lock | |
* See: https://gist.github.com/CMCDragonkai/4de5c1526fc58dac259e321db8cf5331 | |
*/ | |
export class RWLock { | |
protected readersLock: Mutex = new Mutex(); | |
protected writersLock: Mutex = new Mutex(); | |
// eslint-disable-next-line @typescript-eslint/no-empty-function | |
protected readersRelease: MutexInterface.Releaser = () => {}; | |
protected readerCountBlocked = 0; | |
protected _readerCount = 0; | |
protected _writerCount = 0; | |
public get readerCount(): number { | |
return this._readerCount + this.readerCountBlocked; | |
} | |
public get writerCount(): number { | |
return this._writerCount; | |
} | |
public async withRead<T>(f: Func<T>): Promise<T> { | |
const release = await this.acquireRead(); | |
try { | |
return await f(); | |
} finally { | |
release(); | |
} | |
} | |
public async withWrite<T>(f: Func<T>): Promise<T> { | |
const release = await this.acquireWrite(); | |
try { | |
return await f(); | |
} finally { | |
release(); | |
} | |
} | |
public async acquireRead(): Promise<() => void> { | |
if (this._writerCount > 0) { | |
++this.readerCountBlocked; | |
const writerCount = this._writerCount; | |
// Wait for every writer that came before us to unlock, not just the first | |
for (let i = 0; i < writerCount; i++) { | |
await this.writersLock.waitForUnlock(); | |
} | |
--this.readerCountBlocked; | |
} | |
const readerCount = ++this._readerCount; | |
// The first reader locks | |
if (readerCount === 1) { | |
this.readersRelease = await this.readersLock.acquire(); | |
} else { | |
// To ensure we use the same number of event loop ticks | |
// whether we need to acquire the lock or not | |
await asyncNOP(); | |
} | |
return () => { | |
const readerCount = --this._readerCount; | |
// The last reader unlocks | |
if (readerCount === 0) { | |
this.readersRelease(); | |
} | |
}; | |
} | |
public async acquireWrite(): Promise<() => void> { | |
++this._writerCount; | |
const writersRelease = await this.writersLock.acquire(); | |
if (this._readerCount) { | |
await this.readersLock.waitForUnlock(); | |
} | |
return () => { | |
writersRelease(); | |
--this._writerCount; | |
}; | |
} | |
public isLocked(): boolean { | |
return this.readersLock.isLocked() || this.writersLock.isLocked(); | |
} | |
public async waitForUnlock(): Promise<void> { | |
const waitForWriters = async () => { | |
const writerCount = this._writerCount; | |
for (let i = 0; i < writerCount; i++) { | |
await this.writersLock.waitForUnlock(); | |
} | |
}; | |
await Promise.all([this.readersLock.waitForUnlock(), waitForWriters()]); | |
return; | |
} | |
} | |
/** | |
* A keyed read-write lock intended to lock files and directories | |
*/ | |
export class FileLock { | |
protected locks: Record<string, RWLock> = {}; | |
resolver: (key: string) => string; | |
constructor(resolver?: (key: string) => string) { | |
this.resolver = resolver || ((key) => key); | |
} | |
private cleanup(key: string) { | |
if (key in this.locks) { | |
const readers = this.locks[key].readerCount; | |
const writers = this.locks[key].writerCount; | |
if (readers === 0 && writers === 0) { | |
delete this.locks[key]; | |
} | |
} | |
} | |
private getOrCreateLock(key: string) { | |
let lock; | |
if (key in this.locks) { | |
lock = this.locks[key]; | |
} else { | |
const newLock = new RWLock(); | |
this.locks[key] = newLock; | |
lock = newLock; | |
} | |
return lock; | |
} | |
public async withShareable<T>(key: string, f: Func<T>): Promise<T> { | |
const release = await this.shareable(this.resolver(key)); | |
try { | |
return await f(); | |
} finally { | |
release(); | |
} | |
} | |
public async withExclusive<T>(key: string, f: Func<T>): Promise<T> { | |
const release = await this.exclusive(this.resolver(key)); | |
try { | |
return await f(); | |
} finally { | |
release(); | |
} | |
} | |
public async shareable(key: string): Promise<() => void> { | |
const resolved = this.resolver(key); | |
const lock = this.getOrCreateLock(resolved); | |
const release = await lock.acquireRead(); | |
return () => { | |
release(); | |
this.cleanup(resolved); | |
}; | |
} | |
public async exclusive(key: string): Promise<() => void> { | |
const resolved = this.resolver(key); | |
const lock = this.getOrCreateLock(resolved); | |
const release = await lock.acquireWrite(); | |
return () => { | |
release(); | |
this.cleanup(resolved); | |
}; | |
} | |
public isLocked(key: string): boolean { | |
return this.locks[this.resolver(key)]?.isLocked() || false; | |
} | |
public async waitForUnlock(key: string): Promise<void> { | |
const resolved = this.resolver(key); | |
if (resolved in this.locks) { | |
return await this.locks[resolved].waitForUnlock(); | |
} | |
} | |
} | |
/** | |
* Execute an async function with permissions | |
* @param permssions An array of promises that will resolve to release functions to release permissions | |
* @param f The function to execute with permissions | |
* @returns The return value of f | |
*/ | |
export const withPermissions = async <T>( | |
permssions: Promise<MutexInterface.Releaser>[], | |
f: Func<T> | |
): Promise<T> => { | |
const releasers = await Promise.all(permssions); | |
try { | |
return await f(); | |
} finally { | |
releasers.forEach((release) => release()); | |
} | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment