Last active
September 9, 2022 23:04
-
-
Save gvergnaud/ee6ac4aed308e27a072b to your computer and use it in GitHub Desktop.
Test implementation of the Observable spec.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export type Option<T> = | |
| { readonly type: 'some'; readonly value: T } | |
| { readonly type: 'none' }; | |
export const none: Option<never> = { type: 'none' }; | |
export const some = <T>(value: T): Option<T> => ({ type: 'some', value }); | |
const compose = | |
<A, B, C>(f: (value: B) => C, g: (value: A) => B): ((value: A) => C) => | |
(x) => | |
f(g(x)); | |
const isPromiseLike = (obj: any): obj is PromiseLike<unknown> => | |
obj !== null && | |
(typeof obj === 'object' || typeof obj === 'function') && | |
typeof obj.then === 'function'; | |
const isCancelablePromise = ( | |
p: PromiseLike<unknown>, | |
): p is PromiseLike<unknown> & { cancel: () => void } => { | |
return Boolean('cancel' in p && typeof (p as any).cancel === 'function'); | |
}; | |
type Unwrap<observable> = observable extends Observable<infer T> ? T : never; | |
type UnwrapAll< | |
observables, | |
output extends any[] = [], | |
> = observables extends readonly [infer first, ...infer rest] | |
? UnwrapAll<rest, [...output, Unwrap<first>]> | |
: output; | |
/** | |
* Observable.all() is similar to Promise.all(). | |
* It takes an array of Observables and return an Observable | |
* of array of values. | |
*/ | |
function all<Os extends readonly [Observable<any>, ...Observable<any>[]]>( | |
observables: Os, | |
): Observable<UnwrapAll<Os>>; | |
function all<T>(observables: Observable<T>[]): Observable<T[]>; | |
function all<T>(observables: Observable<T>[]): Observable<T[]> { | |
return new Observable((observer) => { | |
if (observables.length === 0) { | |
observer.next([]); | |
observer.complete(); | |
return; | |
} | |
const completed = observables.map(() => false); | |
const optionalValues: Option<T>[] = observables.map(() => none); | |
const areAllSome = ( | |
vals: Option<T>[], | |
): vals is { type: 'some'; value: T }[] => | |
vals.every((v) => v.type === 'some'); | |
const subscriptions = observables.map((observable, index) => | |
observable.subscribe({ | |
error: (err) => { | |
observer.error(err); | |
unsubscribe(); | |
}, | |
complete: () => { | |
completed[index] = true; | |
if (completed.every((c) => c)) { | |
observer.complete(); | |
} | |
}, | |
next: (value) => { | |
optionalValues[index] = some(value); | |
if (areAllSome(optionalValues)) { | |
observer.next(optionalValues.map((v) => v.value)); | |
} | |
}, | |
}), | |
); | |
const unsubscribe = () => { | |
subscriptions.forEach((sub) => sub.unsubscribe()); | |
}; | |
return { unsubscribe }; | |
}); | |
} | |
export type Observer<T> = { | |
complete: () => void; | |
error: (err: any) => void; | |
next: (value: T) => void; | |
}; | |
export type Subscription = { | |
unsubscribe: () => void; | |
}; | |
/** | |
* # Observable | |
* A constructor to represent an asynchronous value that can change over time. | |
*/ | |
export class Observable<T> { | |
constructor(private f: (observer: Observer<T>) => void | Subscription) { | |
this.f = f; | |
} | |
['Symbol.observable']() { | |
return this; | |
} | |
static get [Symbol.species]() { | |
return Observable; | |
} | |
/** | |
* Create an Observable containing one or several values. | |
* @param items array of values the returned Observable will emit in the same order. | |
*/ | |
static of<T>(...items: T[]): Observable<T> { | |
return new Observable<T>((observer) => { | |
items.forEach((item, i) => { | |
observer.next(item); | |
if (i === items.length - 1) { | |
observer.complete(); | |
} | |
}); | |
}); | |
} | |
/** | |
* Transform an Iterable, a Promise or an Observable into an Observable. | |
*/ | |
static from<T>( | |
x: Iterable<T> | PromiseLike<T> | Observable<T>, | |
): Observable<T> { | |
return isPromiseLike(x) | |
? Observable.fromPromise(x) | |
: x instanceof Observable | |
? x | |
: Observable.fromIterable(x); | |
} | |
/** | |
* Creates an observable that never completes | |
*/ | |
static never(): Observable<never> { | |
return new Observable(() => {}); | |
} | |
/** | |
* Creates an observable that completes without emiting a value | |
*/ | |
static empty(): Observable<never> { | |
return new Observable((obs) => obs.complete()); | |
} | |
/** | |
* Create an Observable that will throw an error. | |
* Similar to Promise.reject() | |
*/ | |
static throwError<T>(err?: any): Observable<never> { | |
return new Observable((observer) => { | |
observer.error(err); | |
}); | |
} | |
/** | |
* Create an Observable from a Promise. | |
*/ | |
static fromPromise<T>(promise: PromiseLike<T>): Observable<T> { | |
return new Observable((observer) => { | |
promise.then((value) => { | |
observer.next(value); | |
observer.complete(); | |
}, observer.error); | |
return { | |
unsubscribe: () => { | |
if (isCancelablePromise(promise)) { | |
promise.cancel(); | |
} | |
}, | |
}; | |
}); | |
} | |
/** | |
* Create an Observable that emits an infinite sequence of ascending integers, with a constant interval of time of your choosing between those emissions. | |
* The first emission is not sent immediately, but only after the first period has passed. | |
* Simplified version of "RxJS interval". | |
* @param period Interval of time between the emissions. | |
*/ | |
static interval(period: number = 0): Observable<number> { | |
return new Observable<number>((observer) => { | |
let count = 0; | |
const intervalId = setInterval( | |
() => observer.next(count++), | |
period, | |
); | |
return { | |
unsubscribe: () => clearInterval(intervalId), | |
}; | |
}); | |
} | |
/** | |
* Create an Observable from an Interable. | |
*/ | |
static fromIterable<T>(x: Iterable<T>): Observable<T> { | |
return Observable.of(...x); | |
} | |
static all = all; | |
/** | |
* Transform the Observable instance into a Promise resolving with the | |
* last value of the observable. | |
*/ | |
toPromise(): Promise<T> { | |
return new Promise((resolve, reject) => { | |
let latestValue: Option<T> = none; | |
this.subscribe({ | |
next: (value) => { | |
latestValue = some(value); | |
}, | |
error: reject, | |
complete: () => { | |
if (latestValue.type !== 'none') { | |
resolve(latestValue.value); | |
} else { | |
reject( | |
new Error( | |
'Observable completed without any value emitted.', | |
), | |
); | |
} | |
}, | |
}); | |
}); | |
} | |
/** | |
* Transform the Observable instance into a Promise resolving with the | |
* last value of the observable. | |
* Return a subscription. Calling subscription.unsubscribe will abort | |
* The computation. | |
*/ | |
subscribe( | |
onNext: (value: T) => void, | |
onError?: (error: any) => void, | |
onComplete?: () => void, | |
): Subscription; | |
// eslint-disable-next-line no-dupe-class-members | |
subscribe(observer: Partial<Observer<T>>): Subscription; | |
// eslint-disable-next-line no-dupe-class-members | |
subscribe( | |
observerOrOnNext: Partial<Observer<T>> | ((value: T) => void), | |
onErrorCallback?: (error: any) => void, | |
onCompleteCallback?: () => void, | |
): Subscription { | |
let isClosed = false; | |
const noOp = () => {}; | |
const observer: Observer<T> = | |
typeof observerOrOnNext === 'function' | |
? { | |
next: observerOrOnNext, | |
error: onErrorCallback ?? noOp, | |
complete: onCompleteCallback ?? noOp, | |
} | |
: { | |
next: observerOrOnNext.next ?? noOp, | |
error: observerOrOnNext.error ?? noOp, | |
complete: observerOrOnNext.complete ?? noOp, | |
}; | |
let subscription: Subscription | void; | |
const onUnsubscribe = () => { | |
isClosed = true; | |
subscription?.unsubscribe(); | |
// remove reference so this can be garbage collected | |
subscription = undefined; | |
}; | |
const onError = (err: any) => { | |
if (!isClosed) { | |
isClosed = true; | |
subscription = undefined; | |
observer.error(err); | |
} | |
}; | |
const onComplete = () => { | |
if (!isClosed) { | |
isClosed = true; | |
subscription = undefined; | |
observer.complete(); | |
} | |
}; | |
const onNext = (value: T) => { | |
if (!isClosed) { | |
try { | |
observer.next(value); | |
} catch (e) { | |
onError(e); | |
onUnsubscribe(); | |
} | |
} | |
}; | |
try { | |
subscription = this.f({ | |
next: onNext, | |
error: onError, | |
complete: onComplete, | |
}); | |
} catch (e) { | |
onError(e); | |
} | |
return { | |
unsubscribe: onUnsubscribe, | |
}; | |
} | |
/** | |
* Take a callback called when a new value is emitted | |
* and returns a promise. | |
*/ | |
forEach(callback: (value: T) => void) { | |
return new Promise<void>((res, rej) => | |
this.subscribe({ | |
error: rej, | |
complete: res, | |
next: callback, | |
}), | |
); | |
} | |
/** | |
* Helper to produce side effects when the observable state updates, | |
* without changing the observable itself. | |
* | |
* .tap() doesn't subscribe to the observable. Nothing will happen | |
* until .subscribe() is called on the observable returned in output. | |
* | |
* Take either an onNext callback called when a new value is emitted, | |
* or an partial observer object of shape {next, complete, error}. | |
* Each callback will be called appropriately when the Observable updates. | |
*/ | |
tap(observer: Partial<Observer<T>>): Observable<T>; | |
// eslint-disable-next-line no-dupe-class-members | |
tap( | |
onNext: (value: T) => void, | |
onError?: (error: any) => void, | |
onComplete?: () => void, | |
): Observable<T>; | |
// eslint-disable-next-line no-dupe-class-members | |
tap( | |
observerOrOnNext: Partial<Observer<T>> | ((value: T) => void), | |
onErrorCallback?: (error: any) => void, | |
onCompleteCallback?: () => void, | |
): Observable<T> { | |
const tapObserver: Partial<Observer<T>> = | |
typeof observerOrOnNext === 'object' | |
? observerOrOnNext | |
: { | |
next: observerOrOnNext, | |
error: onErrorCallback, | |
complete: onCompleteCallback, | |
}; | |
return new Observable((observer) => { | |
return this.subscribe({ | |
error: (err) => { | |
try { | |
tapObserver.error?.(err); | |
} catch (e) { | |
console.error(e); | |
} | |
observer.error(err); | |
}, | |
next: (x) => { | |
try { | |
tapObserver.next?.(x); | |
} catch (e) { | |
console.error(e); | |
} | |
observer.next(x); | |
}, | |
complete: () => { | |
try { | |
tapObserver.complete?.(); | |
} catch (e) { | |
console.error(e); | |
} | |
observer.complete(); | |
}, | |
}); | |
}); | |
} | |
/** | |
* Take a predicate and filter observable values. | |
* Similar to Array.prototype.filter. | |
*/ | |
filter(predicate: (value: T) => unknown): Observable<T> { | |
return new Observable((observer) => { | |
return this.subscribe({ | |
error: observer.error, | |
next: (x) => { | |
if (predicate(x)) { | |
observer.next(x); | |
} | |
}, | |
complete: observer.complete, | |
}); | |
}); | |
} | |
/** | |
* Take a function transforming each value of the Observable. | |
* Similar to Array.prototype.map. | |
*/ | |
map<U>(mapper: (value: T) => U): Observable<U> { | |
return new Observable((observer) => { | |
return this.subscribe({ | |
error: observer.error, | |
next: compose(observer.next, mapper), | |
complete: observer.complete, | |
}); | |
}); | |
} | |
/** | |
* Take a reducer function combining each value of the Observable | |
* with an accumulator value. Emits each intermediate value. | |
* Similar to Array.prototype.reduce but the reduction is done | |
* over time using the previously emitted value, and `.scan` emits | |
* all intermediary values, not only the last one. | |
*/ | |
scan<U>(scanner: (acc: U, value: T) => U, seed: U): Observable<U> { | |
let acc: U = seed; | |
const scanValue = (x: T): U => { | |
acc = scanner(acc, x); | |
return acc; | |
}; | |
return new Observable((observer) => { | |
return this.subscribe({ | |
error: observer.error, | |
next: compose(observer.next, scanValue), | |
complete: observer.complete, | |
}); | |
}); | |
} | |
/** | |
* Take the n first values emitted by the observable. | |
*/ | |
take(max: number): Observable<T> { | |
return new Observable((observer) => { | |
let n = 0; | |
let synchronousUnsubscribe = false; | |
const sub = this.subscribe({ | |
error: observer.error, | |
complete: observer.complete, | |
next: (value) => { | |
if (n <= max) { | |
n++; | |
observer.next(value); | |
if (n === max) { | |
if (sub) { | |
sub.unsubscribe(); | |
} else { | |
synchronousUnsubscribe = true; | |
} | |
observer.complete(); | |
} | |
} | |
}, | |
}); | |
if (synchronousUnsubscribe) { | |
sub.unsubscribe(); | |
} | |
return sub; | |
}); | |
} | |
/** | |
* Operator to chain Observables together, similar | |
* to Promise.then. | |
* Every time the current Observable emits a value, | |
* we unsubscribe from the chained Observable, and | |
* create a new one using the `switchMapper` function. | |
* for more information, see https://www.learnrxjs.io/learn-rxjs/operators/transformation/switchmap | |
*/ | |
switchMap<U>(switchMapper: (value: T) => Observable<U>): Observable<U> { | |
return new Observable((observer) => { | |
let subscribtion: Subscription | undefined; | |
let isInnerComplete = false; | |
let isOuterComplete = false; | |
const onComplete = () => { | |
if (isInnerComplete && isOuterComplete) { | |
observer.complete(); | |
} | |
}; | |
const sub = this.subscribe({ | |
error: observer.error, | |
next: (value) => { | |
if (subscribtion) { | |
subscribtion.unsubscribe(); | |
} | |
isOuterComplete = false; | |
subscribtion = switchMapper(value).subscribe({ | |
error: observer.error, | |
next: observer.next, | |
complete: () => { | |
isOuterComplete = true; | |
onComplete(); | |
}, | |
}); | |
}, | |
complete: () => { | |
isInnerComplete = true; | |
onComplete(); | |
}, | |
}); | |
return { | |
unsubscribe: () => { | |
if (subscribtion) { | |
subscribtion.unsubscribe(); | |
} | |
sub.unsubscribe(); | |
}, | |
}; | |
}); | |
} | |
/** | |
* Operator taking the initial value the observable should emit | |
*/ | |
startWith<U>(init: U): Observable<T | U> { | |
return new Observable((observer) => { | |
observer.next(init); | |
return this.subscribe(observer); | |
}); | |
} | |
/** | |
* `shareReplay` will automatically share the current observable process | |
* between any number of subscriptions on the observable returned in output. | |
* When a new subscriber call .subscribe() on this observable, it will also | |
* synchronously emit the current state of the shared observable to make sure | |
* all subscribers end up in a consistent state. | |
* | |
* This operator is useful when we want to share a resource or a side effect | |
* between several streams instead of executing the side effect several times. | |
* Since Observables are lazily evaluated, an Observable containing an HTTP request | |
* would normally execute this request once for each subscriber. `shareReplay` | |
* let us only execute the upstream observable once for every subscriber. | |
*/ | |
shareReplay(): Observable<T> { | |
type ObservableState<T> = | |
| { type: 'loading' } | |
| { type: 'partial'; value: T } | |
| { type: 'error'; error: any } | |
| { type: 'complete'; value: Option<T> }; | |
type StartedState = | |
| { type: 'started'; sub: Subscription } | |
| { type: 'stopped' }; | |
const observers: Set<Observer<T>> = new Set(); | |
let state: ObservableState<T> = { type: 'loading' }; | |
let startedState: StartedState = { type: 'stopped' }; | |
const replay = (observer: Observer<T>) => { | |
switch (state.type) { | |
case 'loading': | |
break; | |
case 'partial': | |
observer.next(state.value); | |
break; | |
case 'error': | |
observer.error(state.error); | |
break; | |
case 'complete': | |
if (state.value.type === 'some') { | |
observer.next(state.value.value); | |
} | |
observer.complete(); | |
break; | |
} | |
}; | |
const start = () => { | |
if (startedState.type === 'stopped') { | |
startedState = { | |
type: 'started', | |
sub: this.subscribe({ | |
complete: () => { | |
state = { | |
type: 'complete', | |
value: | |
state.type === 'partial' | |
? some(state.value) | |
: none, | |
}; | |
observers.forEach((o) => o.complete()); | |
}, | |
next: (value) => { | |
state = { type: 'partial', value }; | |
observers.forEach((o) => o.next(value)); | |
}, | |
error: (err) => { | |
state = { type: 'error', error: err }; | |
observers.forEach((o) => o.error(err)); | |
}, | |
}), | |
}; | |
} | |
}; | |
const stop = () => { | |
if (startedState.type === 'started' && observers.size === 0) { | |
startedState.sub.unsubscribe(); | |
startedState = { type: 'stopped' }; | |
} | |
}; | |
return new Observable((observer) => { | |
replay(observer); | |
if (['loading', 'partial'].includes(state.type)) { | |
observers.add(observer); | |
start(); | |
return { | |
unsubscribe: () => { | |
observers.delete(observer); | |
stop(); | |
}, | |
}; | |
} | |
return { | |
unsubscribe: () => {}, | |
}; | |
}); | |
} | |
/** | |
* catchError let you handle the error of your Observable | |
* by returning a new Observable that will be used after the first | |
* one has errored. | |
*/ | |
catchError<U>( | |
selector: (err: any, caught: Observable<T>) => Observable<U>, | |
): Observable<T | U> { | |
return new Observable((observer) => { | |
let innerSub: Subscription | undefined; | |
const outerSub = this.subscribe({ | |
next: observer.next, | |
complete: observer.complete, | |
error: (err) => { | |
innerSub = selector(err, this).subscribe(observer); | |
}, | |
}); | |
return { | |
unsubscribe: () => { | |
outerSub?.unsubscribe(); | |
innerSub?.unsubscribe(); | |
}, | |
}; | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment