Skip to content

Instantly share code, notes, and snippets.

@nivrith
Last active December 6, 2020 05:51
Show Gist options
  • Save nivrith/e28d94eeeb8fdb310c1fd1b99b84ee9e to your computer and use it in GitHub Desktop.
Save nivrith/e28d94eeeb8fdb310c1fd1b99b84ee9e to your computer and use it in GitHub Desktop.
Final merge latest from operator
export type ObservableSourceType<T = any> = Observable<T> | ((value: any) => Observable<T>);
function* sourcesGen<T>(sources: ObservableSourceType[], a: T, i: number): Generator<Observable<any>, Observable<any>, undefined> {
const length = sources.length;
for (const [index, source] of sources.entries()) {
const observableSource: Observable<any> = typeof source === 'function' ? source.call(null, [a, i]) : source;
if (index <= length + 1) {
yield observableSource;
} else {
return observableSource;
}
}
}
export function mergeLatestFrom<T, O2 extends ObservableInput<any>>(
source2: O2 | ((v1: T) => O2),
): OperatorFunction<T, [T, ObservedValueOf<O2>]>;
export function mergeLatestFrom<T, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>>(
v2: O2 | ((v1: T) => O2),
v3: O3 | ((v1: T) => O3),
concurrent?: number,
): OperatorFunction<T, [T, ObservedValueOf<O2>, ObservedValueOf<O3>]>;
export function mergeLatestFrom<T, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>>(
v2: O2 | ((v1: T) => O2),
v3: O3 | ((v1: T) => O3),
v4: O4 | ((v1: T) => O4),
concurrent?: number,
): OperatorFunction<T, [T, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>]>;
export function mergeLatestFrom<
T,
O2 extends ObservableInput<any>,
O3 extends ObservableInput<any>,
O4 extends ObservableInput<any>,
O5 extends ObservableInput<any>
>(
v2: O2 | ((v1: T) => O2),
v3: O3 | ((v1: T) => O3),
v4: O4 | ((v1: T) => O4),
v5: O5 | ((v1: T) => O5),
concurrent?: number,
): OperatorFunction<T, [T, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>, ObservedValueOf<O5>]>;
export function mergeLatestFrom<
T,
O2 extends ObservableInput<any>,
O3 extends ObservableInput<any>,
O4 extends ObservableInput<any>,
O5 extends ObservableInput<any>,
O6 extends ObservableInput<any>
>(
v2: O2 | ((v1: T) => O2),
v3: O3 | ((v1: T) => O3),
v4: O4 | ((v1: T) => O4),
v5: O5 | ((v1: T) => O5),
v6: O6 | ((v1: T) => O6),
concurrent?: number,
): OperatorFunction<T, [T, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>, ObservedValueOf<O5>, ObservedValueOf<O6>]>;
export function mergeLatestFrom<T, R>(array: ObservableInput<any>[]): OperatorFunction<T, R>;
/* tslint:disable:max-line-length */
export function mergeLatestFrom<T, O extends ObservableInput<any>>(
project: (value: T, index: number) => Observable<O>,
concurrent?: number,
): OperatorFunction<T, ObservedValueOf<O>>;
export function mergeLatestFrom<T>(...inputs: any[]): OperatorFunction<T, any[]> {
let concurrency: number;
if (typeof inputs[inputs.length - 1] === 'number') {
concurrency = inputs.pop();
}
const sources: ObservableSourceType[] = inputs;
return (source: Observable<T>) =>
source.pipe(mergeMap((a, i) => of(a).pipe(withLatestFrom<T, any[]>(...sourcesGen(sources, a, i))), concurrency));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment