Created
June 4, 2020 14:38
-
-
Save vbfox/32cc61b2d0f354e582d579e9336e83ce to your computer and use it in GitHub Desktop.
A queue of promises keyed by a value
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Queue } from 'src/utils/queue'; | |
import { assertDefined } from 'src/utils/asserts'; | |
interface QueueItem { | |
readonly generator: () => PromiseLike<any>; | |
readonly resolve: (value: any) => void; | |
readonly reject: (error: any) => void; | |
} | |
interface PerKeyQueue<TKey> { | |
readonly key: TKey; | |
current: QueueItem | undefined; | |
readonly remaining: Queue<QueueItem>; | |
} | |
/** | |
* For each key a queue of promise is maintained, when one finishes the next start executing until no more promise | |
* remain for the key. | |
* | |
* Promises for different keys don't wait each other and run concurrently. | |
*/ | |
export class KeyedPromiseQueue<TKey> { | |
private queues = new Map<TKey, PerKeyQueue<TKey>>(); | |
private getOrCreateForKey(key: TKey): PerKeyQueue<TKey> { | |
const existing = this.queues.get(key); | |
if (existing) { | |
return existing; | |
} | |
const newValue: PerKeyQueue<TKey> = { key, current: undefined, remaining: new Queue<QueueItem>() }; | |
this.queues.set(key, newValue); | |
return newValue; | |
} | |
private runAndSetCurrent(key: TKey, queue: PerKeyQueue<TKey>, queueItem: QueueItem | undefined) { | |
// eslint-disable-next-line no-param-reassign | |
queue.current = queueItem; | |
if (queueItem !== undefined) { | |
queueItem.generator().then( | |
v => this.onPromiseResolved(key, v), | |
e => this.onPromiseRejected(key, e)); | |
} | |
} | |
private onPromiseResolved(key: TKey, value: any) { | |
const currentQueue = this.queues.get(key); | |
assertDefined(currentQueue, 'Queue should exist'); | |
assertDefined(currentQueue.current, 'Queue should have a current promise'); | |
const resolve = currentQueue.current.resolve; | |
this.runAndSetCurrent(key, currentQueue, currentQueue.remaining.dequeue()); | |
resolve(value); | |
} | |
private onPromiseRejected(key: TKey, error: any) { | |
const currentQueue = this.queues.get(key); | |
assertDefined(currentQueue, 'Queue should exist'); | |
assertDefined(currentQueue.current, 'Queue should have a current promise'); | |
const reject = currentQueue.current.reject; | |
this.runAndSetCurrent(key, currentQueue, currentQueue.remaining.dequeue()); | |
reject(error); | |
} | |
enqueue<T>(key: TKey, promiseGenerator: () => PromiseLike<T>): PromiseLike<T> { | |
return new Promise<T>((resolve, reject) => { | |
const currentQueue = this.getOrCreateForKey(key); | |
const queueItem: QueueItem = { generator: promiseGenerator, resolve, reject }; | |
// No current ? we're now the current. | |
if (currentQueue.current === undefined) { | |
this.runAndSetCurrent(key, currentQueue, queueItem); | |
} else { | |
// Otherwise queue for later | |
currentQueue.remaining.enqueue(queueItem); | |
} | |
}); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* A mutable queue, javascript arrays already support being used as a queue but their interface isn't as intuitive. | |
*/ | |
export class Queue<T> { | |
private items: T[]; | |
constructor(items?: T[]) { | |
this.items = items ? [...items] : []; | |
} | |
enqueue(item: T) { | |
this.items.push(item); | |
} | |
dequeue(): T | undefined { | |
return this.items.shift(); | |
} | |
get length(): number { | |
return this.items.length; | |
} | |
get isEmpty(): boolean { | |
return this.items.length === 0; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment