import { v4 as uuid } from 'uuid';

export type PromiseProvider<T> = (...args: unknown[]) => Promise<T>;

type PromiseExecutionListener<T> = (result: FinishedPromiseResult<T>) => void;

interface FinishedPromiseResult<T> {
  isSuccess: boolean;
  result: T | null;
  error: unknown;
}

interface QueuedPromise<T> {
  id: string;
  promiseProvider: PromiseProvider<T>;
}

const DEFAULTS = {
  CONCURRENT_TASKS: 5,
  RATE_LIMIT_MS: 100,
  RATE_LIMIT_MAX_TASKS: 5,
} as const;

export type PromiseQueueOptions = {
  /**
   * The maximum concurrency factor for the queue.
   * This will throttle the number of promises that can be processed at one time.
   * Defaults to 1000 if not specified.
   */
  concurrentTasks?: number;
  /**
   * The unit of time for rate limiting, in milliseconds.
   * This will decide how large the time window is that promises are throttled in.
   * Defaults to 100ms if not specified.
   */
  rateLimitMs?: number;
  /**
   * The maximum number of promises to process in the rate limiting time window.
   * Defaults to 1000 if not specified.
   */
  rateLimitMaxTasks?: number;
};

export class PromiseQueue<T> {
  private readonly concurrentTasks: number;
  private readonly unitOfTimeMillis: number;
  private readonly maxThroughputPerUnitTime: number;
  private promisesToExecute: Array<QueuedPromise<T>>;
  private promisesBeingExecuted: { [id: string]: QueuedPromise<T> };
  private promiseExecutedCallbacks: {
    [id: string]: PromiseExecutionListener<T>;
  };
  private promiseCompletedTimesLog: number[];
  private reattemptTimeoutId: number | ReturnType<typeof setTimeout> | null;

  constructor(options: PromiseQueueOptions = {}) {
    this.concurrentTasks = options.concurrentTasks || DEFAULTS.CONCURRENT_TASKS;
    this.unitOfTimeMillis = options.rateLimitMs || DEFAULTS.RATE_LIMIT_MS;
    this.maxThroughputPerUnitTime =
      options.rateLimitMaxTasks || DEFAULTS.RATE_LIMIT_MAX_TASKS;

    this.promisesToExecute = [];
    this.promisesBeingExecuted = {};
    this.promiseExecutedCallbacks = {};
    this.promiseCompletedTimesLog = [];
    this.reattemptTimeoutId = null;
  }

  numberOfQueuedPromises() {
    return this.promisesToExecute.length;
  }

  numberOfExecutingPromises() {
    return Object.keys(this.promisesBeingExecuted).length;
  }

  /**
   * The queue takes a function that returns a promise.
   * This function will be called at the point where the promise is going to be executed.
   */
  addPromise(promiseProvider: PromiseProvider<T>): Promise<T | null> {
    // return a promise that will complete when the promise from the promise supplier has been run.
    return new Promise((resolve, reject) => {
      // add the promise to list of promises to be executed and also register a callback with the same id
      // so that when this promise has been executed, we can call the callback and resolve the promise to return to the caller
      const id = uuid();
      this.promisesToExecute.push({
        id,
        promiseProvider,
      });
      this.promiseExecutedCallbacks[id] = (
        result: FinishedPromiseResult<T>,
      ) => {
        if (result.isSuccess) {
          resolve(result.result);
        } else {
          reject(result.error);
        }
      };
      // call execute to kick off the processing of promises if it hasn't already started.
      this.execute();
    });
  }

  execute(): void {
    const hasAnythingToExecute = this.promisesToExecute.length > 0;
    if (!hasAnythingToExecute) {
      return;
    }

    // check to see how many promises have been run in the last unit of time
    const now = Date.now();
    const startOfTimeUnit = now - this.unitOfTimeMillis;

    const promisesFinishedInLastUnitTime = this.promiseCompletedTimesLog.filter(
      (time) => {
        return time >= startOfTimeUnit;
      },
    );
    const numberOfPromisesFinishedInLastUnitTime =
      promisesFinishedInLastUnitTime.length;

    const numberOfPromisesBeingExecuted = Object.keys(
      this.promisesBeingExecuted,
    ).length;

    const numberOfPromisesLeftInConcurrencyLimit =
      this.concurrentTasks - numberOfPromisesBeingExecuted;
    const numberOfPromisesLeftInRateLimit =
      this.maxThroughputPerUnitTime - numberOfPromisesFinishedInLastUnitTime;

    const numberOfPromisesToStart = Math.min(
      numberOfPromisesLeftInConcurrencyLimit,
      numberOfPromisesLeftInRateLimit,
    );

    if (numberOfPromisesToStart <= 0) {
      // if we are not starting any more promises, we should check to see if we are going to start more later
      if (!this.reattemptTimeoutId) {
        // given we are in the situation where no more promises are being started, we need to decide how long to wait
        const periodToWaitToReattemptPromisesMillis =
          numberOfPromisesFinishedInLastUnitTime > 0
            ? now - promisesFinishedInLastUnitTime[0]
            : this.unitOfTimeMillis;

        this.reattemptTimeoutId = setTimeout(() => {
          this.reattemptTimeoutId = null;
          this.execute();
        }, periodToWaitToReattemptPromisesMillis);
      }

      return;
    }

    // if we can run more promises, run more promises until we hit the max or run out of promises
    for (let count = 0; count < numberOfPromisesToStart; count++) {
      const nextPromiseToStart = this.promisesToExecute.shift();
      if (!nextPromiseToStart) {
        return;
      }

      const id = nextPromiseToStart.id;
      const promiseExecutionListener = this.promiseExecutedCallbacks[id];
      if (!promiseExecutionListener) {
        continue;
      }
      this.promisesBeingExecuted[id] = nextPromiseToStart;
      // run the promise and pass the result back to the callback associated with this promise
      nextPromiseToStart
        .promiseProvider()
        .then((res) => {
          delete this.promiseExecutedCallbacks[id];
          delete this.promisesBeingExecuted[id];
          promiseExecutionListener({
            isSuccess: true,
            result: res,
            error: null,
          });
        })
        .catch((err) => {
          delete this.promiseExecutedCallbacks[id];
          delete this.promisesBeingExecuted[id];
          promiseExecutionListener({
            isSuccess: false,
            result: null,
            error: err,
          });
        })
        .finally(() => {
          const now = Date.now();
          const startOfTimeUnit = now - this.unitOfTimeMillis;
          this.promiseCompletedTimesLog.push(now);
          this.promiseCompletedTimesLog = this.promiseCompletedTimesLog.filter(
            (time) => {
              return time >= startOfTimeUnit;
            },
          );
          this.execute();
        });
    }
  }
}
