Created
February 18, 2020 17:27
-
-
Save colelawrence/25d92a23d34fe8a212dd15407279eea1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| const init = Symbol(); | |
| const error = Symbol(); | |
| /** | |
| * Stream can be thought of like a Promise which can be resolved multiple times. | |
| * Also, a stream does not have a separate channel for errors. | |
| */ | |
| export class Stream<T> { | |
| private listeners = new Set< | |
| [(addValue: T) => any, (rejectError: any) => any] | |
| >(); | |
| private value: typeof init | typeof error | T = init; | |
| private error: any; | |
| constructor( | |
| executor: (add: (value: T) => void, reject?: (error: any) => void) => any, | |
| ) { | |
| executor( | |
| addValue => { | |
| this.value = addValue; | |
| for (const listener of this.listeners) { | |
| listener[0](addValue); | |
| } | |
| }, | |
| rejectError => { | |
| this.value = error; | |
| this.error = rejectError; | |
| for (const listener of this.listeners) { | |
| listener[1](rejectError); | |
| } | |
| }, | |
| ); | |
| } | |
| catch: <R>(listener: (error: any) => Promise<R> | R) => Stream<R>; | |
| /** | |
| * @param listener is called immediately if the stream has a current value, | |
| * and is called each time an item is added to the stream by the constructing | |
| * executor | |
| * | |
| * @returns a function for unsubscribing | |
| */ | |
| then<R>(listener: (value: T) => Promise<R> | R, rejectListener?: (error: any) => Promise<R> | R): Stream<R> { | |
| return new Stream((add, reject) => { | |
| const wrappedAdd = value => { | |
| const result = listener(value); | |
| if (result instanceof Promise) { | |
| result.then(add); | |
| } else { | |
| add(result); | |
| } | |
| }; | |
| const wrappedReject = reject != null ? err => { | |
| const result = rejectListener(err); | |
| if (result instanceof Promise) { | |
| result.catch(rejectListener); | |
| } else { | |
| reject(result); | |
| } | |
| }; | |
| this.listeners.add([wrappedAdd, wrappedReject]); | |
| if (this.value != init) { | |
| wrappedAdd(this.value); | |
| } | |
| }); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment