Skip to content

Instantly share code, notes, and snippets.

@nivrith
Last active December 6, 2020 05:53
Show Gist options
  • Save nivrith/6d2e8e88835cebbb46ca5b1a6db3a3b3 to your computer and use it in GitHub Desktop.
Save nivrith/6d2e8e88835cebbb46ca5b1a6db3a3b3 to your computer and use it in GitHub Desktop.
Merge Latest From Operator to get latest emission of other source observables
export type ObservableSourceType<T = any> = Observable<T> | ((value: any) => Observable<T>);
function* sourcesGen<T>(sources: ObservableSourceType[], a: T, i: number): Generator<Observable<any>, Observable<any>, undefined> {
const length = sources.length;
for (const [index, source] of sources.entries()) {
const observableSource: Observable<any> = typeof source === 'function' ? source.call(null, [a, i]) : source;
if (index <= length + 1) {
yield observableSource;
} else {
return observableSource;
}
}
}
export function mergeLatestFrom<T>(...inputs: any[]): OperatorFunction<T, any[]> {
let concurrency: number;
if (typeof inputs[inputs.length - 1] === 'number') {
concurrency = inputs.pop();
}
const sources: ObservableSourceType[] = inputs;
return (source: Observable<T>) =>
source.pipe(mergeMap((a, i) => of(a).pipe(withLatestFrom<T, any[]>(...sourcesGen(sources, a, i))), concurrency));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment