import { identity, identityAsync } from '../util/identity';
import { wrapWithAbort } from './operators/withabort';

// eslint-disable-next-line @typescript-eslint/no-empty-function
const NEVER_PROMISE = new Promise(() => {});

type MergeResult<T> = { value: T; index: number };

function wrapPromiseWithIndex<T>(promise: Promise<T>, index: number) {
  return promise.then(value => ({ value, index })) as Promise<MergeResult<T>>;
}

export interface ForkJoinOptions<T, R> {
  thisArg?: any;
  selector?: (args: T[], signal?: AbortSignal) => R | Promise<R>;
  signal?: AbortSignal;
}

// eslint-disable-next-line complexity
export async function forkJoin<T, R>(
  sources: AsyncIterable<T>[],
  options: ForkJoinOptions<T, R>
): Promise<R | undefined> {
  const opts = options || { selector: identityAsync } as ForkJoinOptions<T, R>;
  const { selector, thisArg, signal } = opts;
  const length = sources.length;
  const iterators = new Array<AsyncIterator<T>>(length);
  const nexts = new Array<Promise<MergeResult<IteratorResult<T>>>>(length);

  let active = length;
  const values = new Array<T>(length);
  const hasValues = new Array<boolean>(length);
  hasValues.fill(false);

  for (let i = 0; i < length; i++) {
    const iterator = wrapWithAbort(sources[i], signal)[Symbol.asyncIterator]();
    iterators[i] = iterator;
    nexts[i] = wrapPromiseWithIndex(iterator.next(), i);
  }

  while (active > 0) {
    const next = Promise.race(nexts);
    const { value: next$, index } = await next;
    if (next$.done) {
      nexts[index] = <Promise<MergeResult<IteratorResult<T>>>>NEVER_PROMISE;
      active--;
    } else {
      const iterator$ = iterators[index];
      nexts[index] = wrapPromiseWithIndex(iterator$.next(), index);
      hasValues[index] = true;
      values[index] = next$.value;
    }
  }

  if (hasValues.length > 0 && hasValues.every(identity)) {
    return await selector!.call(thisArg, values, signal);
  }

  return undefined;
}