'use strict'

const abortControllerModule = require('../../../../../watcher/aborter/controller')

const AbortController = globalThis.AbortController || abortControllerModule.AbortController

const {
  codes: { ERR_INVALID_ARG_TYPE, ERR_MISSING_ARGS, ERR_OUT_OF_RANGE },
  AbortError
} = require('../../ours/errors')

const { validateAbortSignal, validateInteger, validateObject } = require('../validators')

const kWeakHandler = require('../../ours/primordials').Symbol('kWeak')

const { finished } = require('./end-of-stream')

const {
  ArrayPrototypePush,
  MathFloor,
  Number,
  NumberIsNaN,
  Promise,
  PromiseReject,
  PromisePrototypeCatch,
  Symbol
} = require('../../ours/primordials')

const kEmpty = Symbol('kEmpty')
const kEof = Symbol('kEof')

function map(fn, options) {
  if (typeof fn !== 'function') {
    throw new ERR_INVALID_ARG_TYPE('fn', ['Function', 'AsyncFunction'], fn)
  }

  if (options != null) {
    validateObject(options, 'options')
  }

  if ((options === null || options === undefined ? undefined : options.signal) != null) {
    validateAbortSignal(options.signal, 'options.signal')
  }

  let concurrency = 1

  if ((options === null || options === undefined ? undefined : options.concurrency) != null) {
    concurrency = MathFloor(options.concurrency)
  }

  validateInteger(concurrency, 'concurrency', 1)
  return async function* map() {
    var _options$signal, _options$signal2

    const ac = new AbortController()
    const stream = this
    const queue = []
    const signal = ac.signal
    const signalOpt = {
      signal
    }

    const abort = () => ac.abort()

    if (
      options !== null &&
      options !== undefined &&
      (_options$signal = options.signal) !== null &&
      _options$signal !== undefined &&
      _options$signal.aborted
    ) {
      abort()
    }

    options === null || options === undefined
      ? undefined
      : (_options$signal2 = options.signal) === null || _options$signal2 === undefined
        ? undefined
        : _options$signal2.addEventListener('abort', abort)
    let next
    let resume
    let done = false

    function onDone() {
      done = true
    }

    async function pump() {
      try {
        for await (let val of stream) {
          var _val

          if (done) {
            return
          }

          if (signal.aborted) {
            throw new AbortError()
          }

          try {
            val = fn(val, signalOpt)
          } catch (err) {
            val = PromiseReject(err)
          }

          if (val === kEmpty) {
            continue
          }

          if (typeof ((_val = val) === null || _val === undefined ? undefined : _val.catch) === 'function') {
            val.catch(onDone)
          }

          queue.push(val)

          if (next) {
            next()
            next = null
          }

          if (!done && queue.length && queue.length >= concurrency) {
            await new Promise((resolve) => {
              resume = resolve
            })
          }
        }

        queue.push(kEof)
      } catch (err) {
        const val = PromiseReject(err)
        PromisePrototypeCatch(val, onDone)
        queue.push(val)
      } finally {
        var _options$signal3

        done = true

        if (next) {
          next()
          next = null
        }

        options === null || options === undefined
          ? undefined
          : (_options$signal3 = options.signal) === null || _options$signal3 === undefined
            ? undefined
            : _options$signal3.removeEventListener('abort', abort)
      }
    }

    pump()

    try {
      while (true) {
        while (queue.length > 0) {
          const val = await queue[0]

          if (val === kEof) {
            return
          }

          if (signal.aborted) {
            throw new AbortError()
          }

          if (val !== kEmpty) {
            yield val
          }

          queue.shift()

          if (resume) {
            resume()
            resume = null
          }
        }

        await new Promise((resolve) => {
          next = resolve
        })
      }
    } finally {
      ac.abort()
      done = true

      if (resume) {
        resume()
        resume = null
      }
    }
  }.call(this)
}

function asIndexedPairs(options = undefined) {
  if (options != null) {
    validateObject(options, 'options')
  }

  if ((options === null || options === undefined ? undefined : options.signal) != null) {
    validateAbortSignal(options.signal, 'options.signal')
  }

  return async function* asIndexedPairs() {
    let index = 0

    for await (const val of this) {
      var _options$signal4

      if (
        options !== null &&
        options !== undefined &&
        (_options$signal4 = options.signal) !== null &&
        _options$signal4 !== undefined &&
        _options$signal4.aborted
      ) {
        throw new AbortError({
          cause: options.signal.reason
        })
      }

      yield [index++, val]
    }
  }.call(this)
}

async function some(fn, options = undefined) {
  // eslint-disable-next-line no-unused-vars
  for await (const unused of filter.call(this, fn, options)) {
    return true
  }

  return false
}

async function every(fn, options = undefined) {
  if (typeof fn !== 'function') {
    throw new ERR_INVALID_ARG_TYPE('fn', ['Function', 'AsyncFunction'], fn)
  } // https://en.wikipedia.org/wiki/De_Morgan%27s_laws

  return !(await some.call(
    this,
    async (...args) => {
      return !(await fn(...args))
    },
    options
  ))
}

async function find(fn, options) {
  for await (const result of filter.call(this, fn, options)) {
    return result
  }

  return undefined
}

async function forEach(fn, options) {
  if (typeof fn !== 'function') {
    throw new ERR_INVALID_ARG_TYPE('fn', ['Function', 'AsyncFunction'], fn)
  }

  async function forEachFn(value, options) {
    await fn(value, options)
    return kEmpty
  } // eslint-disable-next-line no-unused-vars

  for await (const unused of map.call(this, forEachFn, options));
}

function filter(fn, options) {
  if (typeof fn !== 'function') {
    throw new ERR_INVALID_ARG_TYPE('fn', ['Function', 'AsyncFunction'], fn)
  }

  async function filterFn(value, options) {
    if (await fn(value, options)) {
      return value
    }

    return kEmpty
  }

  return map.call(this, filterFn, options)
} // Specific to provide better error to reduce since the argument is only
// missing if the stream has no items in it - but the code is still appropriate

class ReduceAwareErrMissingArgs extends ERR_MISSING_ARGS {
  constructor() {
    super('reduce')
    this.message = 'Reduce of an empty stream requires an initial value'
  }
}

async function reduce(reducer, initialValue, options) {
  var _options$signal5

  if (typeof reducer !== 'function') {
    throw new ERR_INVALID_ARG_TYPE('reducer', ['Function', 'AsyncFunction'], reducer)
  }

  if (options != null) {
    validateObject(options, 'options')
  }

  if ((options === null || options === undefined ? undefined : options.signal) != null) {
    validateAbortSignal(options.signal, 'options.signal')
  }

  let hasInitialValue = arguments.length > 1

  if (
    options !== null &&
    options !== undefined &&
    (_options$signal5 = options.signal) !== null &&
    _options$signal5 !== undefined &&
    _options$signal5.aborted
  ) {
    const err = new AbortError(undefined, {
      cause: options.signal.reason
    })
    this.once('error', () => { }) // The error is already propagated

    await finished(this.destroy(err))
    throw err
  }

  const ac = new AbortController()
  const signal = ac.signal

  if (options !== null && options !== undefined && options.signal) {
    const opts = {
      once: true,
      [kWeakHandler]: this
    }
    options.signal.addEventListener('abort', () => ac.abort(), opts)
  }

  let gotAnyItemFromStream = false

  try {
    for await (const value of this) {
      var _options$signal6

      gotAnyItemFromStream = true

      if (
        options !== null &&
        options !== undefined &&
        (_options$signal6 = options.signal) !== null &&
        _options$signal6 !== undefined &&
        _options$signal6.aborted
      ) {
        throw new AbortError()
      }

      if (!hasInitialValue) {
        initialValue = value
        hasInitialValue = true
      } else {
        initialValue = await reducer(initialValue, value, {
          signal
        })
      }
    }

    if (!gotAnyItemFromStream && !hasInitialValue) {
      throw new ReduceAwareErrMissingArgs()
    }
  } finally {
    ac.abort()
  }

  return initialValue
}

async function toArray(options) {
  if (options != null) {
    validateObject(options, 'options')
  }

  if ((options === null || options === undefined ? undefined : options.signal) != null) {
    validateAbortSignal(options.signal, 'options.signal')
  }

  const result = []

  for await (const val of this) {
    var _options$signal7

    if (
      options !== null &&
      options !== undefined &&
      (_options$signal7 = options.signal) !== null &&
      _options$signal7 !== undefined &&
      _options$signal7.aborted
    ) {
      throw new AbortError(undefined, {
        cause: options.signal.reason
      })
    }

    ArrayPrototypePush(result, val)
  }

  return result
}

function flatMap(fn, options) {
  const values = map.call(this, fn, options)
  return async function* flatMap() {
    for await (const val of values) {
      yield* val
    }
  }.call(this)
}

function toIntegerOrInfinity(number) {
  // We coerce here to align with the spec
  // https://github.com/tc39/proposal-iterator-helpers/issues/169
  number = Number(number)

  if (NumberIsNaN(number)) {
    return 0
  }

  if (number < 0) {
    throw new ERR_OUT_OF_RANGE('number', '>= 0', number)
  }

  return number
}

function drop(number, options = undefined) {
  if (options != null) {
    validateObject(options, 'options')
  }

  if ((options === null || options === undefined ? undefined : options.signal) != null) {
    validateAbortSignal(options.signal, 'options.signal')
  }

  number = toIntegerOrInfinity(number)
  return async function* drop() {
    var _options$signal8

    if (
      options !== null &&
      options !== undefined &&
      (_options$signal8 = options.signal) !== null &&
      _options$signal8 !== undefined &&
      _options$signal8.aborted
    ) {
      throw new AbortError()
    }

    for await (const val of this) {
      var _options$signal9

      if (
        options !== null &&
        options !== undefined &&
        (_options$signal9 = options.signal) !== null &&
        _options$signal9 !== undefined &&
        _options$signal9.aborted
      ) {
        throw new AbortError()
      }

      if (number-- <= 0) {
        yield val
      }
    }
  }.call(this)
}

function take(number, options = undefined) {
  if (options != null) {
    validateObject(options, 'options')
  }

  if ((options === null || options === undefined ? undefined : options.signal) != null) {
    validateAbortSignal(options.signal, 'options.signal')
  }

  number = toIntegerOrInfinity(number)
  return async function* take() {
    var _options$signal10

    if (
      options !== null &&
      options !== undefined &&
      (_options$signal10 = options.signal) !== null &&
      _options$signal10 !== undefined &&
      _options$signal10.aborted
    ) {
      throw new AbortError()
    }

    for await (const val of this) {
      var _options$signal11

      if (
        options !== null &&
        options !== undefined &&
        (_options$signal11 = options.signal) !== null &&
        _options$signal11 !== undefined &&
        _options$signal11.aborted
      ) {
        throw new AbortError()
      }

      if (number-- > 0) {
        yield val
      } else {
        return
      }
    }
  }.call(this)
}

module.exports.streamReturningOperators = {
  asIndexedPairs,
  drop,
  filter,
  flatMap,
  map,
  take
}
module.exports.promiseReturningOperators = {
  every,
  forEach,
  reduce,
  toArray,
  some,
  find
}