Skip to content

Instantly share code, notes, and snippets.

@colelawrence
Created February 18, 2020 17:27
Show Gist options
  • Select an option

  • Save colelawrence/25d92a23d34fe8a212dd15407279eea1 to your computer and use it in GitHub Desktop.

Select an option

Save colelawrence/25d92a23d34fe8a212dd15407279eea1 to your computer and use it in GitHub Desktop.
const init = Symbol();
const error = Symbol();
/**
* Stream can be thought of like a Promise which can be resolved multiple times.
* Also, a stream does not have a separate channel for errors.
*/
export class Stream<T> {
private listeners = new Set<
[(addValue: T) => any, (rejectError: any) => any]
>();
private value: typeof init | typeof error | T = init;
private error: any;
constructor(
executor: (add: (value: T) => void, reject?: (error: any) => void) => any,
) {
executor(
addValue => {
this.value = addValue;
for (const listener of this.listeners) {
listener[0](addValue);
}
},
rejectError => {
this.value = error;
this.error = rejectError;
for (const listener of this.listeners) {
listener[1](rejectError);
}
},
);
}
catch: <R>(listener: (error: any) => Promise<R> | R) => Stream<R>;
/**
* @param listener is called immediately if the stream has a current value,
* and is called each time an item is added to the stream by the constructing
* executor
*
* @returns a function for unsubscribing
*/
then<R>(listener: (value: T) => Promise<R> | R, rejectListener?: (error: any) => Promise<R> | R): Stream<R> {
return new Stream((add, reject) => {
const wrappedAdd = value => {
const result = listener(value);
if (result instanceof Promise) {
result.then(add);
} else {
add(result);
}
};
const wrappedReject = reject != null ? err => {
const result = rejectListener(err);
if (result instanceof Promise) {
result.catch(rejectListener);
} else {
reject(result);
}
};
this.listeners.add([wrappedAdd, wrappedReject]);
if (this.value != init) {
wrappedAdd(this.value);
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment