import { AsyncIterableX } from './asynciterablex'; import { identity } from '../internal/identity'; // tslint:disable-next-line:no-empty const NEVER_PROMISE = new Promise(() => {}); type MergeResult<T> = { value: T; index: number }; function wrapPromiseWithIndex<T>(promise: Promise<T>, index: number) { return promise.then(value => ({ value, index })) as Promise<MergeResult<T>>; } export class CombineLatestAsyncIterable<T> extends AsyncIterableX<T[]> { private _source: AsyncIterable<T>[]; constructor(sources: AsyncIterable<T>[]) { super(); this._source = sources; } async *[Symbol.asyncIterator]() { const length = this._source.length; const iterators = new Array<AsyncIterator<T>>(length); const nexts = new Array<Promise<MergeResult<IteratorResult<T>>>>(length); let hasValueAll = false; const values = new Array<T>(length); const hasValues = new Array<boolean>(length); let active = length; hasValues.fill(false); for (let i = 0; i < length; i++) { const iterator = this._source[i][Symbol.asyncIterator](); iterators[i] = iterator; nexts[i] = wrapPromiseWithIndex(iterator.next(), i); } while (active > 0) { const next = Promise.race(nexts); const { value: next$, index } = await next; if (next$.done) { nexts[index] = <Promise<MergeResult<IteratorResult<T>>>>NEVER_PROMISE; active--; } else { values[index] = next$.value; hasValues[index] = true; const iterator$ = iterators[index]; nexts[index] = wrapPromiseWithIndex(iterator$.next(), index); if (hasValueAll || (hasValueAll = hasValues.every(identity))) { yield values; } } } } } export function combineLatest<T, T2>( source: AsyncIterable<T>, v2: AsyncIterable<T2> ): AsyncIterableX<(T | T2)[]>; export function combineLatest<T, T2, T3>( source: AsyncIterable<T>, v2: AsyncIterable<T2>, v3: AsyncIterable<T3> ): AsyncIterableX<(T | T2 | T3)[]>; export function combineLatest<T, T2, T3, T4>( source: AsyncIterable<T>, v2: AsyncIterable<T2>, v3: AsyncIterable<T3>, v4: AsyncIterable<T4> ): AsyncIterableX<(T | T2 | T3 | T4)[]>; export function combineLatest<T, T2, T3, T4, T5>( source: AsyncIterable<T>, v2: AsyncIterable<T2>, v3: AsyncIterable<T3>, v4: AsyncIterable<T4>, v5: AsyncIterable<T5> ): AsyncIterable<(T | T2 | T3 | T4 | T5)[]>; export function combineLatest<T, T2, T3, T4, T5, T6>( source: AsyncIterable<T>, v2: AsyncIterable<T2>, v3: AsyncIterable<T3>, v4: AsyncIterable<T4>, v5: AsyncIterable<T5>, v6: AsyncIterable<T6> ): AsyncIterable<(T | T2 | T3 | T4 | T5 | T6)[]>; export function combineLatest<T>( source: AsyncIterable<T>, ...args: AsyncIterable<T>[]): AsyncIterableX<T[]> { return new CombineLatestAsyncIterable<T>([source, ...args]); } export function combineLatestStatic<T, T2>( v1: AsyncIterable<T>, v2: AsyncIterable<T2> ): AsyncIterableX<(T | T2)[]>; export function combineLatestStatic<T, T2, T3>( v1: AsyncIterable<T>, v2: AsyncIterable<T2>, v3: AsyncIterable<T3> ): AsyncIterableX<(T | T2 | T3)[]>; export function combineLatestStatic<T, T2, T3, T4>( v1: AsyncIterable<T>, v2: AsyncIterable<T2>, v3: AsyncIterable<T3>, v4: AsyncIterable<T4> ): AsyncIterableX<(T | T2 | T3 | T4)[]>; export function combineLatestStatic<T, T2, T3, T4, T5>( v1: AsyncIterable<T>, v2: AsyncIterable<T2>, v3: AsyncIterable<T3>, v4: AsyncIterable<T4>, v5: AsyncIterable<T5> ): AsyncIterable<(T | T2 | T3 | T4 | T5)[]>; export function combineLatestStatic<T, T2, T3, T4, T5, T6>( v1: AsyncIterable<T>, v2: AsyncIterable<T2>, v3: AsyncIterable<T3>, v4: AsyncIterable<T4>, v5: AsyncIterable<T5>, v6: AsyncIterable<T6> ): AsyncIterable<(T | T2 | T3 | T4 | T5 | T6)[]>; export function combineLatestStatic<T>(...args: AsyncIterable<T>[]): AsyncIterableX<T[]> { return new CombineLatestAsyncIterable<T>(args); }