Created
November 28, 2018 06:15
-
-
Save nodew/81bcf9ca7e9bec1c122b56c4bf4a54a5 to your computer and use it in GitHub Desktop.
observable pattern
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function noop() { /* nothing */} | |
interface Unsubscribable { | |
unsubscribe(): void; | |
} | |
interface Subscription extends Unsubscribable { | |
unsubscribe(): void | |
} | |
export interface Subscribable<T> { | |
subscribe(observer?: Observer<T>): Subscription; | |
} | |
type SubscriberFunction<T> = (observer: Observer<T>) => (() => void) | Subscription; | |
interface Observer<T> { | |
start?: (subscription : Subscription) => void; | |
next(value: T); | |
error(err: any); | |
complete(); | |
} | |
export interface UnaryFunction<T, R> { (source: T): R; } | |
export interface OperatorFunction<T, R> extends UnaryFunction<Observable<T>, Observable<R>> {} | |
export interface Operator<T, R> { | |
call(subscriber: Subscriber<R>, source: any): Subscription; | |
} | |
class Subscription { | |
_unsubscribe: () => void; | |
private _closed: boolean; | |
constructor(unsubscribe?: () => void) { | |
this._unsubscribe = unsubscribe; | |
this._closed = false; | |
} | |
get closed() { | |
return this._closed; | |
} | |
unsubscribe() { | |
this._closed = true; | |
this._unsubscribe(); | |
} | |
} | |
class Subscriber<T> extends Subscription implements Observer<T> { | |
destination: Observer<T>; | |
isStopped: boolean; | |
constructor(observer: Observer<T>) { | |
super(() => {}); | |
this.destination = observer; | |
this.isStopped = false; | |
}; | |
unsubscribe() { | |
if (this.closed) { | |
return; | |
} | |
this.isStopped = true; | |
super.unsubscribe(); | |
}; | |
next(value: T) { | |
if (!this.isStopped) { | |
this._next(value); | |
} | |
} | |
protected _next(value: T) { | |
this.destination.next(value); | |
} | |
error(err: any) { | |
if (!this.isStopped) { | |
this.isStopped = true; | |
this._error(err); | |
this.unsubscribe(); | |
} | |
} | |
protected _error(err) { | |
this.destination.error(err); | |
} | |
complete() { | |
if (!this.isStopped) { | |
this.isStopped = true; | |
this._complete(); | |
this.unsubscribe(); | |
} | |
} | |
protected _complete() { | |
this.destination.complete(); | |
} | |
} | |
class Observable<T> implements Subscribable<T> { | |
_subscribe: SubscriberFunction<T>; | |
operator: Operator<any, T>; | |
source: any; | |
constructor(subscribe?: SubscriberFunction<T>) { | |
this._subscribe = subscribe; | |
}; | |
static create<T>(subscribe: SubscriberFunction<T>) : Observable<T> { | |
return new Observable<T>(subscribe); | |
} | |
static of<T>(...items: T[]): Observable<T> { | |
return Observable.create((observer: Observer<T>) => { | |
for (const item of items) { | |
observer.next(item); | |
} | |
observer.complete(); | |
return noop; | |
}); | |
} | |
static from<T>(observer: Observable<T>): Observable<T> { | |
return Observable.create(nextObserver => { | |
return observer.subscribe(nextObserver); | |
}); | |
} | |
subscribe(observer: Observer<T>) : Subscription { | |
const { operator } = this; | |
const subscriber = new Subscriber<T>(observer); | |
let subscription; | |
if (operator) { | |
subscription = operator.call(subscriber, this.source); | |
} else { | |
subscription = this._subscribe(subscriber); | |
} | |
return subscription; | |
} | |
map<R>(fn: (value: T) => R) : Observable<R> { | |
return new Observable<R>((observer: Observer<R>) => { | |
return this.subscribe({ | |
next: (value: T) => { | |
observer.next(fn(value)) | |
}, | |
error: (err) => observer.error(err), | |
complete: () => observer.complete(), | |
}); | |
}) | |
} | |
lift<R>(operator: Operator<T, R>) : Observable<R> { | |
const observable = new Observable<R>(); | |
observable.operator = operator; | |
observable.source = this; | |
return observable; | |
} | |
take(n): Observable<T> { | |
return take<T>(n)(this); | |
} | |
} | |
export const empty: Observer<any> = { | |
// closed: true, | |
next(value: any): void {}, | |
error(err: any): void {}, | |
complete(): void {} | |
}; | |
export function take<T>(count: number): OperatorFunction<T, T> { | |
return (source: Observable<T>) => { | |
return source.lift(new TakeOperator(count)); | |
}; | |
} | |
class TakeOperator<T> implements Operator<T, T> { | |
constructor(private total: number) { | |
if (this.total < 1) { | |
throw new Error('count less than 1'); | |
} | |
} | |
call(subscriber: Subscriber<T>, source: any): Subscription { | |
return source.subscribe(new TakeSubscriber(subscriber, this.total)); | |
} | |
} | |
class TakeSubscriber<T> extends Subscriber<T> { | |
private count: number = 0; | |
constructor(destination: Subscriber<T>, private total: number) { | |
super(destination); | |
} | |
protected _next(value: T): void { | |
const total = this.total; | |
const count = ++this.count; | |
if (count <= total) { | |
this.destination.next(value); | |
if (count === total) { | |
this.destination.complete(); | |
this.unsubscribe(); | |
} | |
} | |
} | |
} | |
function fromEvent<T>(target: HTMLElement, eventName): Observable<T> { | |
return Observable.create((observer) => { | |
const handler = (e: T) => { | |
observer.next(e); | |
}; | |
target.addEventListener(eventName, handler) | |
return new Subscription(() => { | |
target.removeEventListener(eventName, handler) | |
}); | |
}); | |
} | |
function interval(n): Observable<number> { | |
let count = 0; | |
return Observable.create((observer) => { | |
const tick = setInterval(() => { | |
count++; | |
observer.next(count); | |
}, n); | |
return new Subscription(() => { | |
clearInterval(tick); | |
}); | |
}); | |
} | |
Observable | |
.of(1, 2, 3, 4, 5) | |
.map(v => v * 2) | |
.take(3) | |
.map(v => `${v}`) | |
.subscribe({ | |
next: (v) => { | |
console.log(v); | |
}, | |
error: (err) => { | |
console.log(err); | |
}, | |
complete: () => { | |
console.log('completed'); | |
} | |
}); | |
const subscription = interval(1000).subscribe({ | |
next: (v) => { | |
console.log('tick', v); | |
}, | |
error: (err) => { | |
console.log(err); | |
}, | |
complete: () => { | |
console.log('completed'); | |
} | |
}); | |
setTimeout(() => { | |
subscription.unsubscribe(); | |
}, 3500); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment