Last active
December 6, 2020 05:53
-
-
Save nivrith/6d2e8e88835cebbb46ca5b1a6db3a3b3 to your computer and use it in GitHub Desktop.
Merge Latest From Operator to get latest emission of other source observables
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export type ObservableSourceType<T = any> = Observable<T> | ((value: any) => Observable<T>); | |
function* sourcesGen<T>(sources: ObservableSourceType[], a: T, i: number): Generator<Observable<any>, Observable<any>, undefined> { | |
const length = sources.length; | |
for (const [index, source] of sources.entries()) { | |
const observableSource: Observable<any> = typeof source === 'function' ? source.call(null, [a, i]) : source; | |
if (index <= length + 1) { | |
yield observableSource; | |
} else { | |
return observableSource; | |
} | |
} | |
} | |
export function mergeLatestFrom<T>(...inputs: any[]): OperatorFunction<T, any[]> { | |
let concurrency: number; | |
if (typeof inputs[inputs.length - 1] === 'number') { | |
concurrency = inputs.pop(); | |
} | |
const sources: ObservableSourceType[] = inputs; | |
return (source: Observable<T>) => | |
source.pipe(mergeMap((a, i) => of(a).pipe(withLatestFrom<T, any[]>(...sourcesGen(sources, a, i))), concurrency)); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment