import { identity, identityAsync } from '../util/identity'; import { wrapWithAbort } from './operators/withabort'; // eslint-disable-next-line @typescript-eslint/no-empty-function const NEVER_PROMISE = new Promise(() => {}); type MergeResult<T> = { value: T; index: number }; function wrapPromiseWithIndex<T>(promise: Promise<T>, index: number) { return promise.then(value => ({ value, index })) as Promise<MergeResult<T>>; } export interface ForkJoinOptions<T, R> { thisArg?: any; selector?: (args: T[], signal?: AbortSignal) => R | Promise<R>; signal?: AbortSignal; } // eslint-disable-next-line complexity export async function forkJoin<T, R>( sources: AsyncIterable<T>[], options: ForkJoinOptions<T, R> ): Promise<R | undefined> { const opts = options || { selector: identityAsync } as ForkJoinOptions<T, R>; const { selector, thisArg, signal } = opts; const length = sources.length; const iterators = new Array<AsyncIterator<T>>(length); const nexts = new Array<Promise<MergeResult<IteratorResult<T>>>>(length); let active = length; const values = new Array<T>(length); const hasValues = new Array<boolean>(length); hasValues.fill(false); for (let i = 0; i < length; i++) { const iterator = wrapWithAbort(sources[i], signal)[Symbol.asyncIterator](); iterators[i] = iterator; nexts[i] = wrapPromiseWithIndex(iterator.next(), i); } while (active > 0) { const next = Promise.race(nexts); const { value: next$, index } = await next; if (next$.done) { nexts[index] = <Promise<MergeResult<IteratorResult<T>>>>NEVER_PROMISE; active--; } else { const iterator$ = iterators[index]; nexts[index] = wrapPromiseWithIndex(iterator$.next(), index); hasValues[index] = true; values[index] = next$.value; } } if (hasValues.length > 0 && hasValues.every(identity)) { return await selector!.call(thisArg, values, signal); } return undefined; }