Last active
November 24, 2018 23:12
-
-
Save jeremyben/35e8a9cf637075c510456746fc4936dd to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// pipe's operator function maps an Observable<T> to an Observable<R> | |
// lift's operator function maps an Observer<R> to an Observer<T> | |
// | |
// This is just another way to represent the idea of either: | |
// building an Observable chain down from the source to the sink | |
// or building an Observer chain up from the sink to the source | |
// | |
// Pipe Implementation | |
// | |
const pipe = (...fns: Function[]) => (source) => { | |
return fns.reduce((acc, currentFn) => currentFn(acc), source) | |
} | |
// | |
// Map Implementation | |
// | |
class MyMapSubscriber extends Subscriber<any> { | |
fn: Function | |
constructor(subscriber, fn) { | |
super(subscriber) | |
this.fn = fn | |
} | |
_next(value) { | |
this.destination.next(this.fn(value)) | |
} | |
} | |
const map = (fn) => (source) => { | |
return source.lift({ | |
call(subscriber, source) { | |
source.subscribe(new MyMapSubscriber(subscriber, fn)) | |
}, | |
}) | |
// Shorthand for deprecated way : | |
// const o$ = new Observable() | |
// o$.source = source | |
// o$.operator = { | |
// call(subscriber, source) { | |
// source.subscribe(new MapSubscriber(subscriber, fn)) | |
// }, | |
// } | |
// return o$ | |
} | |
// | |
// MergeMap Implementation | |
// | |
class MyMergeMapSubscriber extends Subscriber<any> { | |
fn: Function | |
constructor(sub, fn) { | |
super(sub) | |
this.fn = fn | |
} | |
_next(value) { | |
console.log('outer', value) | |
const o$ = this.fn(value) | |
o$.subscribe({ | |
next: (value) => { | |
console.log(' inner', value) | |
this.destination.next(value) | |
}, | |
}) | |
} | |
} | |
const myMergeMap = (fn) => (source) => { | |
return source.lift({ | |
call(subscriber, source) { | |
source.subscribe(new MyMergeMapSubscriber(subscriber, fn)) | |
}, | |
}) | |
} | |
// | |
// SwitchMap Implementation | |
// | |
class MySwitchMapSubscriber extends Subscriber<any> { | |
innerSubscription: Subscription | |
fn: Function | |
constructor(sub, fn) { | |
super(sub) | |
this.fn = fn | |
} | |
_next(value) { | |
console.log('outer', value) | |
const o$ = this.fn(value) as Observable<any> | |
if (this.innerSubscription) { | |
this.innerSubscription.unsubscribe() | |
} | |
this.innerSubscription = o$.subscribe({ | |
next: (value) => { | |
console.log(' inner', value) | |
this.destination.next(value) | |
}, | |
}) | |
} | |
} | |
const mySwitchMap = (fn) => (source) => { | |
return source.lift({ | |
call(sub, source) { | |
source.subscribe(new MySwitchMapSubscriber(sub, fn)) | |
}, | |
}) | |
} | |
// | |
// ConcatMap Implementation | |
// | |
class MyConcatMapSubscriber extends Subscriber<any> { | |
fn: Function | |
innerSubscription: Subscription | |
buffer = [] | |
constructor(sub, fn) { | |
super(sub) | |
this.fn = fn | |
} | |
_next(value) { | |
if (this.innerSubscription) { | |
this.buffer = [...this.buffer, value] | |
} else { | |
const o$ = this.fn(value) as Observable<any> | |
this.innerSubscription = o$.subscribe({ | |
next: (value) => { | |
this.destination.next(value) | |
}, | |
complete: () => { | |
console.log(this.buffer) | |
if (this.buffer.length) { | |
const [first, ...rest] = this.buffer | |
this.buffer = rest | |
this._next(first) | |
} | |
}, | |
}) | |
this.add(this.innerSubscription) | |
} | |
} | |
} | |
const myConcatMap = (fn) => (source) => { | |
return source.lift({ | |
call(sub, source) { | |
source.subscribe(new MyConcatMapSubscriber(sub, fn)) | |
}, | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment