Created
May 12, 2017 09:10
-
-
Save EvAlex/1939696d82a971a8d56429d93348dde2 to your computer and use it in GitHub Desktop.
StatefulObserver
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Observer, Observable, ReplaySubject, Subscriber } from 'rxjs/Rx'; | |
interface IObserver<T> { | |
next(value?: T): void; | |
error(err: any): void; | |
complete(): void; | |
} | |
/** | |
* Provides Observer interface implementation that is handy for use in data-binding | |
*/ | |
export class StatefulObserver<TData, TParams> extends Subscriber<TData> implements Observer<TData> { | |
private _params: TParams; | |
private _value: TData = null; | |
private _errorObject; | |
private _receivedValue = false; | |
private _receivedError = false; | |
private _isComplete = false; | |
private _connected: { observer: IObserver<any>, project: (value: any) => any }[] = []; | |
get params(): TParams { | |
return this._params; | |
} | |
get value(): TData { | |
return this._value; | |
} | |
public get receivedValue() { | |
return this._receivedValue; | |
} | |
get waitingForValue() { | |
return !this._receivedValue && !this._receivedError && !this._isComplete; | |
} | |
get willReceiveValue() { | |
return !this._receivedError && !this._isComplete; | |
} | |
get receivedError() { | |
return this._receivedError; | |
} | |
get errorObject() { | |
return this._errorObject; | |
} | |
get isComplete() { | |
return this._isComplete; | |
} | |
constructor(params?: TParams) { | |
super(); | |
this._params = params; | |
} | |
next(data: TData) { | |
this._value = data; | |
this._receivedValue = true; | |
this._connected.forEach(e => e.observer.next(e.project(data))); | |
} | |
error(error?) { | |
this._receivedError = true; | |
this._errorObject = error; | |
this._connected.forEach(e => e.observer.error(error)); | |
} | |
complete() { | |
this._isComplete = true; | |
this._connected.forEach(e => e.observer.complete()); | |
} | |
map<T>(project: (value: TData) => T): StatefulObserver<T, TParams> { | |
const res = new StatefulObserver<T, TParams>(this._params); | |
this._connected.push({ observer: res, project: project }); | |
return res; | |
} | |
asObservable<T>(project: (value: TData) => T = v => <any>v): Observable<T> { | |
const res = new ReplaySubject<T>(); | |
if (this._receivedValue) { | |
res.next(project(this._value)); | |
} | |
if (this._receivedError) { | |
res.error(this._errorObject); | |
} | |
if (this._isComplete) { | |
res.complete(); | |
} | |
this._connected.push({ observer: res, project: project }); | |
return res.asObservable(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment