Skip to content

Instantly share code, notes, and snippets.

@gvergnaud
Last active September 9, 2022 23:04
Show Gist options
  • Save gvergnaud/ee6ac4aed308e27a072b to your computer and use it in GitHub Desktop.
Save gvergnaud/ee6ac4aed308e27a072b to your computer and use it in GitHub Desktop.
Test implementation of the Observable spec.
export type Option<T> =
| { readonly type: 'some'; readonly value: T }
| { readonly type: 'none' };
export const none: Option<never> = { type: 'none' };
export const some = <T>(value: T): Option<T> => ({ type: 'some', value });
const compose =
<A, B, C>(f: (value: B) => C, g: (value: A) => B): ((value: A) => C) =>
(x) =>
f(g(x));
const isPromiseLike = (obj: any): obj is PromiseLike<unknown> =>
obj !== null &&
(typeof obj === 'object' || typeof obj === 'function') &&
typeof obj.then === 'function';
const isCancelablePromise = (
p: PromiseLike<unknown>,
): p is PromiseLike<unknown> & { cancel: () => void } => {
return Boolean('cancel' in p && typeof (p as any).cancel === 'function');
};
type Unwrap<observable> = observable extends Observable<infer T> ? T : never;
type UnwrapAll<
observables,
output extends any[] = [],
> = observables extends readonly [infer first, ...infer rest]
? UnwrapAll<rest, [...output, Unwrap<first>]>
: output;
/**
* Observable.all() is similar to Promise.all().
* It takes an array of Observables and return an Observable
* of array of values.
*/
function all<Os extends readonly [Observable<any>, ...Observable<any>[]]>(
observables: Os,
): Observable<UnwrapAll<Os>>;
function all<T>(observables: Observable<T>[]): Observable<T[]>;
function all<T>(observables: Observable<T>[]): Observable<T[]> {
return new Observable((observer) => {
if (observables.length === 0) {
observer.next([]);
observer.complete();
return;
}
const completed = observables.map(() => false);
const optionalValues: Option<T>[] = observables.map(() => none);
const areAllSome = (
vals: Option<T>[],
): vals is { type: 'some'; value: T }[] =>
vals.every((v) => v.type === 'some');
const subscriptions = observables.map((observable, index) =>
observable.subscribe({
error: (err) => {
observer.error(err);
unsubscribe();
},
complete: () => {
completed[index] = true;
if (completed.every((c) => c)) {
observer.complete();
}
},
next: (value) => {
optionalValues[index] = some(value);
if (areAllSome(optionalValues)) {
observer.next(optionalValues.map((v) => v.value));
}
},
}),
);
const unsubscribe = () => {
subscriptions.forEach((sub) => sub.unsubscribe());
};
return { unsubscribe };
});
}
export type Observer<T> = {
complete: () => void;
error: (err: any) => void;
next: (value: T) => void;
};
export type Subscription = {
unsubscribe: () => void;
};
/**
* # Observable
* A constructor to represent an asynchronous value that can change over time.
*/
export class Observable<T> {
constructor(private f: (observer: Observer<T>) => void | Subscription) {
this.f = f;
}
['Symbol.observable']() {
return this;
}
static get [Symbol.species]() {
return Observable;
}
/**
* Create an Observable containing one or several values.
* @param items array of values the returned Observable will emit in the same order.
*/
static of<T>(...items: T[]): Observable<T> {
return new Observable<T>((observer) => {
items.forEach((item, i) => {
observer.next(item);
if (i === items.length - 1) {
observer.complete();
}
});
});
}
/**
* Transform an Iterable, a Promise or an Observable into an Observable.
*/
static from<T>(
x: Iterable<T> | PromiseLike<T> | Observable<T>,
): Observable<T> {
return isPromiseLike(x)
? Observable.fromPromise(x)
: x instanceof Observable
? x
: Observable.fromIterable(x);
}
/**
* Creates an observable that never completes
*/
static never(): Observable<never> {
return new Observable(() => {});
}
/**
* Creates an observable that completes without emiting a value
*/
static empty(): Observable<never> {
return new Observable((obs) => obs.complete());
}
/**
* Create an Observable that will throw an error.
* Similar to Promise.reject()
*/
static throwError<T>(err?: any): Observable<never> {
return new Observable((observer) => {
observer.error(err);
});
}
/**
* Create an Observable from a Promise.
*/
static fromPromise<T>(promise: PromiseLike<T>): Observable<T> {
return new Observable((observer) => {
promise.then((value) => {
observer.next(value);
observer.complete();
}, observer.error);
return {
unsubscribe: () => {
if (isCancelablePromise(promise)) {
promise.cancel();
}
},
};
});
}
/**
* Create an Observable that emits an infinite sequence of ascending integers, with a constant interval of time of your choosing between those emissions.
* The first emission is not sent immediately, but only after the first period has passed.
* Simplified version of "RxJS interval".
* @param period Interval of time between the emissions.
*/
static interval(period: number = 0): Observable<number> {
return new Observable<number>((observer) => {
let count = 0;
const intervalId = setInterval(
() => observer.next(count++),
period,
);
return {
unsubscribe: () => clearInterval(intervalId),
};
});
}
/**
* Create an Observable from an Interable.
*/
static fromIterable<T>(x: Iterable<T>): Observable<T> {
return Observable.of(...x);
}
static all = all;
/**
* Transform the Observable instance into a Promise resolving with the
* last value of the observable.
*/
toPromise(): Promise<T> {
return new Promise((resolve, reject) => {
let latestValue: Option<T> = none;
this.subscribe({
next: (value) => {
latestValue = some(value);
},
error: reject,
complete: () => {
if (latestValue.type !== 'none') {
resolve(latestValue.value);
} else {
reject(
new Error(
'Observable completed without any value emitted.',
),
);
}
},
});
});
}
/**
* Transform the Observable instance into a Promise resolving with the
* last value of the observable.
* Return a subscription. Calling subscription.unsubscribe will abort
* The computation.
*/
subscribe(
onNext: (value: T) => void,
onError?: (error: any) => void,
onComplete?: () => void,
): Subscription;
// eslint-disable-next-line no-dupe-class-members
subscribe(observer: Partial<Observer<T>>): Subscription;
// eslint-disable-next-line no-dupe-class-members
subscribe(
observerOrOnNext: Partial<Observer<T>> | ((value: T) => void),
onErrorCallback?: (error: any) => void,
onCompleteCallback?: () => void,
): Subscription {
let isClosed = false;
const noOp = () => {};
const observer: Observer<T> =
typeof observerOrOnNext === 'function'
? {
next: observerOrOnNext,
error: onErrorCallback ?? noOp,
complete: onCompleteCallback ?? noOp,
}
: {
next: observerOrOnNext.next ?? noOp,
error: observerOrOnNext.error ?? noOp,
complete: observerOrOnNext.complete ?? noOp,
};
let subscription: Subscription | void;
const onUnsubscribe = () => {
isClosed = true;
subscription?.unsubscribe();
// remove reference so this can be garbage collected
subscription = undefined;
};
const onError = (err: any) => {
if (!isClosed) {
isClosed = true;
subscription = undefined;
observer.error(err);
}
};
const onComplete = () => {
if (!isClosed) {
isClosed = true;
subscription = undefined;
observer.complete();
}
};
const onNext = (value: T) => {
if (!isClosed) {
try {
observer.next(value);
} catch (e) {
onError(e);
onUnsubscribe();
}
}
};
try {
subscription = this.f({
next: onNext,
error: onError,
complete: onComplete,
});
} catch (e) {
onError(e);
}
return {
unsubscribe: onUnsubscribe,
};
}
/**
* Take a callback called when a new value is emitted
* and returns a promise.
*/
forEach(callback: (value: T) => void) {
return new Promise<void>((res, rej) =>
this.subscribe({
error: rej,
complete: res,
next: callback,
}),
);
}
/**
* Helper to produce side effects when the observable state updates,
* without changing the observable itself.
*
* .tap() doesn't subscribe to the observable. Nothing will happen
* until .subscribe() is called on the observable returned in output.
*
* Take either an onNext callback called when a new value is emitted,
* or an partial observer object of shape {next, complete, error}.
* Each callback will be called appropriately when the Observable updates.
*/
tap(observer: Partial<Observer<T>>): Observable<T>;
// eslint-disable-next-line no-dupe-class-members
tap(
onNext: (value: T) => void,
onError?: (error: any) => void,
onComplete?: () => void,
): Observable<T>;
// eslint-disable-next-line no-dupe-class-members
tap(
observerOrOnNext: Partial<Observer<T>> | ((value: T) => void),
onErrorCallback?: (error: any) => void,
onCompleteCallback?: () => void,
): Observable<T> {
const tapObserver: Partial<Observer<T>> =
typeof observerOrOnNext === 'object'
? observerOrOnNext
: {
next: observerOrOnNext,
error: onErrorCallback,
complete: onCompleteCallback,
};
return new Observable((observer) => {
return this.subscribe({
error: (err) => {
try {
tapObserver.error?.(err);
} catch (e) {
console.error(e);
}
observer.error(err);
},
next: (x) => {
try {
tapObserver.next?.(x);
} catch (e) {
console.error(e);
}
observer.next(x);
},
complete: () => {
try {
tapObserver.complete?.();
} catch (e) {
console.error(e);
}
observer.complete();
},
});
});
}
/**
* Take a predicate and filter observable values.
* Similar to Array.prototype.filter.
*/
filter(predicate: (value: T) => unknown): Observable<T> {
return new Observable((observer) => {
return this.subscribe({
error: observer.error,
next: (x) => {
if (predicate(x)) {
observer.next(x);
}
},
complete: observer.complete,
});
});
}
/**
* Take a function transforming each value of the Observable.
* Similar to Array.prototype.map.
*/
map<U>(mapper: (value: T) => U): Observable<U> {
return new Observable((observer) => {
return this.subscribe({
error: observer.error,
next: compose(observer.next, mapper),
complete: observer.complete,
});
});
}
/**
* Take a reducer function combining each value of the Observable
* with an accumulator value. Emits each intermediate value.
* Similar to Array.prototype.reduce but the reduction is done
* over time using the previously emitted value, and `.scan` emits
* all intermediary values, not only the last one.
*/
scan<U>(scanner: (acc: U, value: T) => U, seed: U): Observable<U> {
let acc: U = seed;
const scanValue = (x: T): U => {
acc = scanner(acc, x);
return acc;
};
return new Observable((observer) => {
return this.subscribe({
error: observer.error,
next: compose(observer.next, scanValue),
complete: observer.complete,
});
});
}
/**
* Take the n first values emitted by the observable.
*/
take(max: number): Observable<T> {
return new Observable((observer) => {
let n = 0;
let synchronousUnsubscribe = false;
const sub = this.subscribe({
error: observer.error,
complete: observer.complete,
next: (value) => {
if (n <= max) {
n++;
observer.next(value);
if (n === max) {
if (sub) {
sub.unsubscribe();
} else {
synchronousUnsubscribe = true;
}
observer.complete();
}
}
},
});
if (synchronousUnsubscribe) {
sub.unsubscribe();
}
return sub;
});
}
/**
* Operator to chain Observables together, similar
* to Promise.then.
* Every time the current Observable emits a value,
* we unsubscribe from the chained Observable, and
* create a new one using the `switchMapper` function.
* for more information, see https://www.learnrxjs.io/learn-rxjs/operators/transformation/switchmap
*/
switchMap<U>(switchMapper: (value: T) => Observable<U>): Observable<U> {
return new Observable((observer) => {
let subscribtion: Subscription | undefined;
let isInnerComplete = false;
let isOuterComplete = false;
const onComplete = () => {
if (isInnerComplete && isOuterComplete) {
observer.complete();
}
};
const sub = this.subscribe({
error: observer.error,
next: (value) => {
if (subscribtion) {
subscribtion.unsubscribe();
}
isOuterComplete = false;
subscribtion = switchMapper(value).subscribe({
error: observer.error,
next: observer.next,
complete: () => {
isOuterComplete = true;
onComplete();
},
});
},
complete: () => {
isInnerComplete = true;
onComplete();
},
});
return {
unsubscribe: () => {
if (subscribtion) {
subscribtion.unsubscribe();
}
sub.unsubscribe();
},
};
});
}
/**
* Operator taking the initial value the observable should emit
*/
startWith<U>(init: U): Observable<T | U> {
return new Observable((observer) => {
observer.next(init);
return this.subscribe(observer);
});
}
/**
* `shareReplay` will automatically share the current observable process
* between any number of subscriptions on the observable returned in output.
* When a new subscriber call .subscribe() on this observable, it will also
* synchronously emit the current state of the shared observable to make sure
* all subscribers end up in a consistent state.
*
* This operator is useful when we want to share a resource or a side effect
* between several streams instead of executing the side effect several times.
* Since Observables are lazily evaluated, an Observable containing an HTTP request
* would normally execute this request once for each subscriber. `shareReplay`
* let us only execute the upstream observable once for every subscriber.
*/
shareReplay(): Observable<T> {
type ObservableState<T> =
| { type: 'loading' }
| { type: 'partial'; value: T }
| { type: 'error'; error: any }
| { type: 'complete'; value: Option<T> };
type StartedState =
| { type: 'started'; sub: Subscription }
| { type: 'stopped' };
const observers: Set<Observer<T>> = new Set();
let state: ObservableState<T> = { type: 'loading' };
let startedState: StartedState = { type: 'stopped' };
const replay = (observer: Observer<T>) => {
switch (state.type) {
case 'loading':
break;
case 'partial':
observer.next(state.value);
break;
case 'error':
observer.error(state.error);
break;
case 'complete':
if (state.value.type === 'some') {
observer.next(state.value.value);
}
observer.complete();
break;
}
};
const start = () => {
if (startedState.type === 'stopped') {
startedState = {
type: 'started',
sub: this.subscribe({
complete: () => {
state = {
type: 'complete',
value:
state.type === 'partial'
? some(state.value)
: none,
};
observers.forEach((o) => o.complete());
},
next: (value) => {
state = { type: 'partial', value };
observers.forEach((o) => o.next(value));
},
error: (err) => {
state = { type: 'error', error: err };
observers.forEach((o) => o.error(err));
},
}),
};
}
};
const stop = () => {
if (startedState.type === 'started' && observers.size === 0) {
startedState.sub.unsubscribe();
startedState = { type: 'stopped' };
}
};
return new Observable((observer) => {
replay(observer);
if (['loading', 'partial'].includes(state.type)) {
observers.add(observer);
start();
return {
unsubscribe: () => {
observers.delete(observer);
stop();
},
};
}
return {
unsubscribe: () => {},
};
});
}
/**
* catchError let you handle the error of your Observable
* by returning a new Observable that will be used after the first
* one has errored.
*/
catchError<U>(
selector: (err: any, caught: Observable<T>) => Observable<U>,
): Observable<T | U> {
return new Observable((observer) => {
let innerSub: Subscription | undefined;
const outerSub = this.subscribe({
next: observer.next,
complete: observer.complete,
error: (err) => {
innerSub = selector(err, this).subscribe(observer);
},
});
return {
unsubscribe: () => {
outerSub?.unsubscribe();
innerSub?.unsubscribe();
},
};
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment