import type { Observable } from 'rxjs';
import { from } from 'rxjs';
import { last, map, mergeMap, toArray } from 'rxjs/operators';

type ExtractObservableTypes<T> = {
  [P in keyof T]: T[P] extends Observable<infer U> ? U : never;
};

/**
 * @see https://stackoverflow.com/a/54247150
 */
export function forkJoinConcurrent<A extends readonly Observable<any>[]>(
  observables: A,
  concurrent: number,
): Observable<ExtractObservableTypes<A>> {
  // Convert the array of observables to a higher-order observable:
  return from(observables).pipe(
    // Merge each of the observables in the higher-order observable
    // into a single stream:
    mergeMap(
      (observable, observableIndex) =>
        observable.pipe(
          // Like forkJoin, we're interested only in the last value:
          last(undefined, undefined),
          // Combine the value with the index so that the stream of merged
          // values - which could be in any order - can be sorted to match
          // the order of the source observables:
          // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
          map((value) => ({ index: observableIndex, value })),
        ),
      concurrent,
    ),
    // Convert the stream of last values to an array:
    toArray(),
    // Sort the array of value/index pairs by index - so the value
    // indices correspond to the source observable indices and then
    // map the pair to the value:
    map(
      (pairs): ExtractObservableTypes<A> =>
        pairs
          .sort((l, r) => l.index - r.index)
          // eslint-disable-next-line @typescript-eslint/no-unsafe-return
          .map((pair) => pair.value) as unknown as ExtractObservableTypes<A>,
    ),
  );
}
