Last active
July 5, 2022 08:50
-
-
Save tomhicks/2e006ff9e5de165bb8d35accd4975cdd to your computer and use it in GitHub Desktop.
Lets you continually push to a "stream" of promises, to be notified of their resolution/rejection, with the guarantee that the resolved values will be emitted in order, skipping any that come back out of order.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type PromiseSequencer<T> = { | |
/** | |
* Add a promise to the end of the queue. When this promise resolves, you | |
* may be notified via onLatest, assuming that no later promise has resolved | |
* in the meantime. | |
*/ | |
push(promise: Promise<T>): void; | |
/** | |
* Remove all promises from the queue. Any promises in the queue can still | |
* resolve or reject, but you will not be notified via the onLatest or onError | |
* callbacks. | |
*/ | |
clear(): void; | |
}; | |
type SequencerParams<T> = { | |
/** | |
* Called when a result comes back. Guaranteed to always be called in the same | |
* order as the promises were pushed, but you may not get a call for every | |
* promise. | |
*/ | |
onLatest(result: T): void; | |
/** | |
* Called when the last promise in the queue rejects. This means we won't ever | |
* reach our final desired state. | |
*/ | |
onErrorWithNoRemainingPromises?(err: unknown): void; | |
/** | |
* Called when a non-terminal promise rejects. A later promise in the queue could | |
* still give us a more up-to-date value, so this is less "serious" than the case | |
* when the final promise rejects. | |
*/ | |
onErrorWithRemainingPromises?(err: unknown): void; | |
}; | |
export function makePromiseSequencer<T>({ | |
onLatest, | |
onErrorWithRemainingPromises, | |
onErrorWithNoRemainingPromises, | |
}: SequencerParams<T>): PromiseSequencer<T> { | |
let promises: Array<Promise<T>> = []; | |
return { | |
clear() { | |
promises = []; | |
}, | |
push(promise) { | |
promises.push(promise); | |
promise.then( | |
function (r) { | |
const indexOfThisPromise = promises.indexOf(promise); | |
if (indexOfThisPromise > -1) { | |
// remove all older promises as this result supersedes any | |
// older ones that might resolve or reject later. | |
promises = promises.slice(indexOfThisPromise + 1); | |
onLatest(r); | |
} | |
}, | |
function (e: unknown) { | |
const indexOfThisPromise = promises.indexOf(promise); | |
if (indexOfThisPromise > -1) { | |
// there's a semantic difference between the last promise in the queue | |
// rejecting, or any other one rejecting. If the last one errors, then | |
// we know we never reached our desired "final" state, which is a different | |
// class of problem than if an older item fails. | |
if (indexOfThisPromise === promises.length - 1) { | |
onErrorWithNoRemainingPromises?.(e); | |
} else { | |
onErrorWithRemainingPromises?.(e); | |
} | |
// remove this promise from the queue, but leave olders ones | |
// as they are still relevant, because this one errored. | |
promises.splice(indexOfThisPromise, 1); | |
} | |
} | |
); | |
}, | |
}; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment