0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

effect-tsで配列の配列の非同期処理のabort対応

Posted at

配列の配列の非同期処理で対象API(以下ではsleep)がabortする場合のメモ

前回

配列が1つの場合

全部同時実行

import { describe, test } from 'vitest'
import { Effect, pipe, Either, Exit, Cause } from 'effect'
import { UnknownException } from 'effect/Cause'

class SleepNGBeforeError {
  static tagName = 'Sleep.Ng.Before'
  readonly _tag = SleepNGBeforeError.tagName

  static isClass(test: any): test is SleepNGBeforeError {
    return test._tag === SleepNGBeforeError.tagName
  }

  getMessage(site: string) {
    return `${site} - ng - Sleep NG before wait`
  }
}

class SleepNGAfterError {
  static tagName = 'Sleep.Ng.After'
  readonly _tag = SleepNGAfterError.tagName

  wait: number
  constructor(w: number) {
    this.wait = w
  }

  static isClass(test: any): test is SleepNGAfterError {
    return test._tag === SleepNGAfterError.tagName
  }

  getMessage(site: string) {
    return `${site} - ng - ${this.wait} - Sleep NG after wait`
  }
}

class AlreadyAbortError {
  static tagName = 'Already.Abort.Error'
  readonly _tag = AlreadyAbortError.tagName

  static isClass(test: any): test is AlreadyAbortError {
    return test._tag === AlreadyAbortError.tagName
  }

  static getMessage(site: string) {
    return `${site} - aborted - from event`
  }
}

class OriginAbortError {
  static tagName = 'Origin.Abort.Error'
  readonly _tag = OriginAbortError.tagName

  site: string
  reason: string
  constructor(s: string, r: string) {
    this.site = s
    this.reason = r
  }

  static isClass(test: any): test is OriginAbortError {
    return test._tag === OriginAbortError.tagName
  }

  getMessage() {
    return `${this.site} - abort - ${this.reason}`
  }

  static siteCheck(abortAction: Function, topIndex?: number) {
    return (site: string) => {
      const lastIndex = OkData.getLastIndex(site)
      let originAbortError = undefined
      if (Number.isNaN(lastIndex)) {
        originAbortError = new OriginAbortError(site, `${lastIndex} is NaN`)
      } else if (lastIndex === undefined) {
        originAbortError = new OriginAbortError(site, `${lastIndex} is undefined`)
      } else if (site[site.length - 2] !== '/') {
        originAbortError = new OriginAbortError(site, `lastIndex >= 10`)
      } else if (topIndex && topIndex + lastIndex >= 10) {
        originAbortError = new OriginAbortError(site, `topIndex + lastIndex >= 10`)
      }

      if (originAbortError !== undefined) {
        Effect.runPromise(checkLog(originAbortError.getMessage()))
        abortAction()
        return Effect.fail(originAbortError)
      }

      return Effect.succeed(site)
    }
  }
}

class OkData {
  wait: number
  constructor(w: number) {
    this.wait = w
  }

  static isClass(test: any): test is OkData {
    return test instanceof OkData
  }

  static getLastIndex(site: string) {
    const index = site.slice(-1)
    return Number(index)
  }

  getMessage(site: string) {
    return `${site} - ok - ${this.wait}`
  }
}

const sleep = (wait: number) =>
  Effect.tryPromise<OkData>(
    () =>
      new Promise((resolve, reject) => {
        const waitSeconds = wait / 1000
        if (waitSeconds % 3 === 0) {
          reject(new SleepNGBeforeError())
        } else if (waitSeconds % 2 === 0) {
          setTimeout(() => reject(new SleepNGAfterError(wait)), wait)
        } else {
          setTimeout(() => resolve(new OkData(wait)), wait)
        }
      })
  )

const checkLog = (text: string) =>
  Effect.sync(() => console.log(`time : ${new Date().toISOString()} - text: ${text}`))

const testOptions = {
  timeout: 1000 * 100
}

function getWaitNumber(index: number) {
  const wait = index * 1000
  return wait
}

function mapError_sleepNg<T>(
  site: string,
  effectAndThen: Effect.Effect<T, UnknownException, never>
) {
  const effectAll = Effect.mapError(effectAndThen, (err) => {
    if (SleepNGBeforeError.isClass(err.error)) {
      return err.error
    } else if (SleepNGAfterError.isClass(err.error)) {
      return err.error
    } else {
      Effect.runPromise(checkLog(`mapError_sleepNg : UnknownException : ${err}`))
      return err
    }
  })
  return effectAll
}

const do_sleep1 = (site: string) => (wait: number) =>
  Effect.try(() => {
    const sleepEffect = sleep(wait)
    const effectAll = mapError_sleepNg(site, sleepEffect)
    return effectAll
  })

const mapper1_gen = (abortController: AbortController, topIndex?: number) => (site: string) =>
  Effect.gen(function* () {
    if (abortController.signal.aborted) {
      const alreadyAbortError = new AlreadyAbortError()
      Effect.runPromise(checkLog(AlreadyAbortError.getMessage(site)))
      return Either.left(alreadyAbortError)
    }

    const abortEvent = () => {
      Effect.runPromise(checkLog(AlreadyAbortError.getMessage(site)))
    }
    abortController.signal.addEventListener('abort', abortEvent)
    function removeEvent() {
      abortController.signal.removeEventListener('abort', abortEvent)
    }
    const abortAction = () => {
      removeEvent()
      abortController.abort()
    }

    const abortCheck = yield* pipe(site, OriginAbortError.siteCheck(abortAction, topIndex))
    const effect = yield* pipe(abortCheck, OkData.getLastIndex, getWaitNumber, do_sleep1(site))
    const either = yield* Effect.either(effect)

    if (Either.isRight(either)) {
      const okData = either.right

      const message = okData.getMessage(site)
      Effect.runPromise(checkLog(message))
    } else if (Either.isLeft(either)) {
      const ngData = either.left

      if (SleepNGBeforeError.isClass(ngData)) {
        const message = ngData.getMessage(site)
        Effect.runPromise(checkLog(message))
      } else if (SleepNGAfterError.isClass(ngData)) {
        const message = ngData.getMessage(site)
        Effect.runPromise(checkLog(message))
      }
    }

    removeEvent()
    return either
  })

describe('effect-ts 非同期処理1つの配列', () => {
  test('使い方1ー全部同時実行', testOptions, async () => {
    const sites = [
      'https://test/1',
      'https://test/2',
      'https://test/3',
      'https://test/4',
      'https://test/5',
      'https://test/10'
    ]

    const abortController = new AbortController()

    Effect.runPromise(checkLog('[start]'))

    const effect = await Effect.all(sites.map(mapper1_gen(abortController)), {
      concurrency: sites.length
    })
    const exit = await Effect.runPromiseExit(effect, { signal: abortController.signal })

    console.log(
      Exit.match(exit, {
        onFailure: (cause) => `中断しました: ${Cause.pretty(cause)}`,
        onSuccess: (either) => either
      })
    )
  })
})

実行結果

time : 2025-02-26T13:38:02.447Z - text: [start]
time : 2025-02-26T13:38:02.458Z - text: https://test/10 - abort - lastIndex >= 10
time : 2025-02-26T13:38:02.459Z - text: https://test/1 - aborted - from event
time : 2025-02-26T13:38:02.459Z - text: https://test/2 - aborted - from event
time : 2025-02-26T13:38:02.459Z - text: https://test/3 - aborted - from event
time : 2025-02-26T13:38:02.459Z - text: https://test/4 - aborted - from event
time : 2025-02-26T13:38:02.459Z - text: https://test/5 - aborted - from event
中断しました: All fibers interrupted without errors.

実行結果(※中断データを除外した場合)

time : 2025-02-26T13:38:57.402Z - text: [start]
time : 2025-02-26T13:38:57.415Z - text: https://test/3 - ng - Sleep NG before wait
time : 2025-02-26T13:38:58.429Z - text: https://test/1 - ok - 1000
time : 2025-02-26T13:38:59.423Z - text: https://test/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-26T13:39:01.427Z - text: https://test/4 - ng - 4000 - Sleep NG after wait
time : 2025-02-26T13:39:02.415Z - text: https://test/5 - ok - 5000
[
  { _id: 'Either', _tag: 'Right', right: OkData { wait: 1000 } },
  {
    _id: 'Either',
    _tag: 'Left',
    left: SleepNGAfterError { _tag: 'Sleep.Ng.After', wait: 2000 }
  },
  {
    _id: 'Either',
    _tag: 'Left',
    left: SleepNGBeforeError { _tag: 'Sleep.Ng.Before' }
  },
  {
    _id: 'Either',
    _tag: 'Left',
    left: SleepNGAfterError { _tag: 'Sleep.Ng.After', wait: 4000 }
  },
  { _id: 'Either', _tag: 'Right', right: OkData { wait: 5000 } }
]

最大2個まで同時実行

import { describe, test } from 'vitest'
import { Effect, pipe, Either, Exit, Cause } from 'effect'
import { UnknownException } from 'effect/Cause'

class SleepNGBeforeError {
  static tagName = 'Sleep.Ng.Before'
  readonly _tag = SleepNGBeforeError.tagName

  static isClass(test: any): test is SleepNGBeforeError {
    return test._tag === SleepNGBeforeError.tagName
  }

  getMessage(site: string) {
    return `${site} - ng - Sleep NG before wait`
  }
}

class SleepNGAfterError {
  static tagName = 'Sleep.Ng.After'
  readonly _tag = SleepNGAfterError.tagName

  wait: number
  constructor(w: number) {
    this.wait = w
  }

  static isClass(test: any): test is SleepNGAfterError {
    return test._tag === SleepNGAfterError.tagName
  }

  getMessage(site: string) {
    return `${site} - ng - ${this.wait} - Sleep NG after wait`
  }
}

class AlreadyAbortError {
  static tagName = 'Already.Abort.Error'
  readonly _tag = AlreadyAbortError.tagName

  static isClass(test: any): test is AlreadyAbortError {
    return test._tag === AlreadyAbortError.tagName
  }

  static getMessage(site: string) {
    return `${site} - aborted - from event`
  }
}

class OriginAbortError {
  static tagName = 'Origin.Abort.Error'
  readonly _tag = OriginAbortError.tagName

  site: string
  reason: string
  constructor(s: string, r: string) {
    this.site = s
    this.reason = r
  }

  static isClass(test: any): test is OriginAbortError {
    return test._tag === OriginAbortError.tagName
  }

  getMessage() {
    return `${this.site} - abort - ${this.reason}`
  }

  static siteCheck(abortAction: Function, topIndex?: number) {
    return (site: string) => {
      const lastIndex = OkData.getLastIndex(site)
      let originAbortError = undefined
      if (Number.isNaN(lastIndex)) {
        originAbortError = new OriginAbortError(site, `${lastIndex} is NaN`)
      } else if (lastIndex === undefined) {
        originAbortError = new OriginAbortError(site, `${lastIndex} is undefined`)
      } else if (site[site.length - 2] !== '/') {
        originAbortError = new OriginAbortError(site, `lastIndex >= 10`)
      } else if (topIndex && topIndex + lastIndex >= 10) {
        originAbortError = new OriginAbortError(site, `topIndex + lastIndex >= 10`)
      }

      if (originAbortError !== undefined) {
        Effect.runPromise(checkLog(originAbortError.getMessage()))
        abortAction()
        return Effect.fail(originAbortError)
      }

      return Effect.succeed(site)
    }
  }
}

class OkData {
  wait: number
  constructor(w: number) {
    this.wait = w
  }

  static isClass(test: any): test is OkData {
    return test instanceof OkData
  }

  static getLastIndex(site: string) {
    const index = site.slice(-1)
    return Number(index)
  }

  getMessage(site: string) {
    return `${site} - ok - ${this.wait}`
  }
}

const sleep = (wait: number) =>
  Effect.tryPromise<OkData>(
    () =>
      new Promise((resolve, reject) => {
        const waitSeconds = wait / 1000
        if (waitSeconds % 3 === 0) {
          reject(new SleepNGBeforeError())
        } else if (waitSeconds % 2 === 0) {
          setTimeout(() => reject(new SleepNGAfterError(wait)), wait)
        } else {
          setTimeout(() => resolve(new OkData(wait)), wait)
        }
      })
  )

const checkLog = (text: string) =>
  Effect.sync(() => console.log(`time : ${new Date().toISOString()} - text: ${text}`))

const testOptions = {
  timeout: 1000 * 100
}

function getWaitNumber(index: number) {
  const wait = index * 1000
  return wait
}

function mapError_sleepNg<T>(
  site: string,
  effectAndThen: Effect.Effect<T, UnknownException, never>
) {
  const effectAll = Effect.mapError(effectAndThen, (err) => {
    if (SleepNGBeforeError.isClass(err.error)) {
      return err.error
    } else if (SleepNGAfterError.isClass(err.error)) {
      return err.error
    } else {
      Effect.runPromise(checkLog(`mapError_sleepNg : UnknownException : ${err}`))
      return err
    }
  })
  return effectAll
}

const do_sleep1 = (site: string) => (wait: number) =>
  Effect.try(() => {
    const sleepEffect = sleep(wait)
    const effectAll = mapError_sleepNg(site, sleepEffect)
    return effectAll
  })

const mapper1_gen = (abortController: AbortController, topIndex?: number) => (site: string) =>
  Effect.gen(function* () {
    if (abortController.signal.aborted) {
      const alreadyAbortError = new AlreadyAbortError()
      Effect.runPromise(checkLog(AlreadyAbortError.getMessage(site)))
      return Either.left(alreadyAbortError)
    }

    const abortEvent = () => {
      Effect.runPromise(checkLog(AlreadyAbortError.getMessage(site)))
    }
    abortController.signal.addEventListener('abort', abortEvent)
    function removeEvent() {
      abortController.signal.removeEventListener('abort', abortEvent)
    }
    const abortAction = () => {
      removeEvent()
      abortController.abort()
    }

    const abortCheck = yield* pipe(site, OriginAbortError.siteCheck(abortAction, topIndex))
    const effect = yield* pipe(abortCheck, OkData.getLastIndex, getWaitNumber, do_sleep1(site))
    const either = yield* Effect.either(effect)

    if (Either.isRight(either)) {
      const okData = either.right

      const message = okData.getMessage(site)
      Effect.runPromise(checkLog(message))
    } else if (Either.isLeft(either)) {
      const ngData = either.left

      if (SleepNGBeforeError.isClass(ngData)) {
        const message = ngData.getMessage(site)
        Effect.runPromise(checkLog(message))
      } else if (SleepNGAfterError.isClass(ngData)) {
        const message = ngData.getMessage(site)
        Effect.runPromise(checkLog(message))
      }
    }

    removeEvent()
    return either
  })

describe('effect-ts 非同期処理1つの配列', () => {
  test('使い方2ー最大2個まで同時実行', testOptions, async () => {
    const sites = [
      'https://test/1',
      'https://test/2',
      'https://test/3',
      'https://test/4',
      'https://test/5',
      'https://test/10'
    ]

    const abortController = new AbortController()

    Effect.runPromise(checkLog('[start]'))

    const effect = await Effect.all(sites.map(mapper1_gen(abortController)), {
      concurrency: 2
    })
    const exit = await Effect.runPromiseExit(effect, { signal: abortController.signal })

    console.log(
      Exit.match(exit, {
        onFailure: (cause) => `中断しました: ${Cause.pretty(cause)}`,
        onSuccess: (either) => either
      })
    )
  })
})

実行結果

time : 2025-02-26T13:40:15.927Z - text: [start]
time : 2025-02-26T13:40:16.946Z - text: https://test/1 - ok - 1000
time : 2025-02-26T13:40:16.949Z - text: https://test/3 - ng - Sleep NG before wait
time : 2025-02-26T13:40:17.955Z - text: https://test/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-26T13:40:20.951Z - text: https://test/4 - ng - 4000 - Sleep NG after wait
time : 2025-02-26T13:40:20.952Z - text: https://test/10 - abort - lastIndex >= 10
time : 2025-02-26T13:40:20.953Z - text: https://test/5 - aborted - from event
中断しました: All fibers interrupted without errors.

実行結果(※中断データを除外した場合)

time : 2025-02-26T13:41:08.954Z - text: [start]
time : 2025-02-26T13:41:09.967Z - text: https://test/1 - ok - 1000
time : 2025-02-26T13:41:09.972Z - text: https://test/3 - ng - Sleep NG before wait
time : 2025-02-26T13:41:10.970Z - text: https://test/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-26T13:41:13.985Z - text: https://test/4 - ng - 4000 - Sleep NG after wait
time : 2025-02-26T13:41:15.985Z - text: https://test/5 - ok - 5000
[
  { _id: 'Either', _tag: 'Right', right: OkData { wait: 1000 } },
  {
    _id: 'Either',
    _tag: 'Left',
    left: SleepNGAfterError { _tag: 'Sleep.Ng.After', wait: 2000 }
  },
  {
    _id: 'Either',
    _tag: 'Left',
    left: SleepNGBeforeError { _tag: 'Sleep.Ng.Before' }
  },
  {
    _id: 'Either',
    _tag: 'Left',
    left: SleepNGAfterError { _tag: 'Sleep.Ng.After', wait: 4000 }
  },
  { _id: 'Either', _tag: 'Right', right: OkData { wait: 5000 } }
]

配列が2つの場合(シンプル)

全部同時実行からの全部同時実行

import { describe, test } from 'vitest'
import { Effect, pipe, Array, Either, Exit, Cause } from 'effect'
import { UnknownException } from 'effect/Cause'

class SleepNGBeforeError {
  static tagName = 'Sleep.Ng.Before'
  readonly _tag = SleepNGBeforeError.tagName

  static isClass(test: any): test is SleepNGBeforeError {
    return test._tag === SleepNGBeforeError.tagName
  }

  getMessage(site: string) {
    return `${site} - ng - Sleep NG before wait`
  }
}

class SleepNGAfterError {
  static tagName = 'Sleep.Ng.After'
  readonly _tag = SleepNGAfterError.tagName

  wait: number
  constructor(w: number) {
    this.wait = w
  }

  static isClass(test: any): test is SleepNGAfterError {
    return test._tag === SleepNGAfterError.tagName
  }

  getMessage(site: string) {
    return `${site} - ng - ${this.wait} - Sleep NG after wait`
  }
}

type AllNgData = SleepNGBeforeError | SleepNGAfterError | UnknownException

class TopSleepNGError {
  static tagName = 'Top.Sleep.Ng'
  readonly _tag = TopSleepNGError.tagName

  ngData: AllNgData
  constructor(ng: AllNgData) {
    this.ngData = ng
  }

  static isClass(test: any): test is SleepNGAfterError {
    return test._tag === SleepNGAfterError.tagName
  }

  getLastMessage() {
    const lastMessage =
      this.ngData instanceof UnknownException
        ? ' - ng - UnknownException'
        : this.ngData.getMessage('')
    return lastMessage
  }

  getMessage(site: string) {
    const index = TopOkData.getTopIndex(site)
    const lastMessage = this.getLastMessage()
    return `mapper2-index = ${index}${lastMessage}`
  }

  getTopMessage(site: string) {
    const lastMessage = this.getLastMessage()
    return `${site.substring(0, site.length - 2)}${lastMessage}`
  }
}

class AlreadyAbortError {
  static tagName = 'Already.Abort.Error'
  readonly _tag = AlreadyAbortError.tagName

  static isClass(test: any): test is AlreadyAbortError {
    return test._tag === AlreadyAbortError.tagName
  }

  static getMessage(site: string) {
    return `${site} - aborted - from event`
  }
}

class TopAlreadyAbortError {
  static tagName = 'Top.Already.Abort.Error'
  readonly _tag = TopAlreadyAbortError.tagName

  static isClass(test: any): test is TopAlreadyAbortError {
    return test._tag === TopAlreadyAbortError.tagName
  }

  static getMessage(site: string) {
    const topIndex = TopOkData.getTopIndex(site)
    return `mapper2-index = ${topIndex} - aborted - from event`
  }
}

class OriginAbortError {
  static tagName = 'Origin.Abort.Error'
  readonly _tag = OriginAbortError.tagName

  site: string
  reason: string
  constructor(s: string, r: string) {
    this.site = s
    this.reason = r
  }

  static isClass(test: any): test is OriginAbortError {
    return test._tag === OriginAbortError.tagName
  }

  getMessage() {
    return `${this.site} - abort - ${this.reason}`
  }

  static siteCheck(abortAction: Function, topIndex?: number) {
    return (site: string) => {
      const lastIndex = OkData.getLastIndex(site)
      let originAbortError = undefined
      if (Number.isNaN(lastIndex)) {
        originAbortError = new OriginAbortError(site, `${lastIndex} is NaN`)
      } else if (lastIndex === undefined) {
        originAbortError = new OriginAbortError(site, `${lastIndex} is undefined`)
      } else if (site[site.length - 2] !== '/') {
        originAbortError = new OriginAbortError(site, `lastIndex >= 10`)
      } else if (topIndex && topIndex + lastIndex >= 10) {
        originAbortError = new OriginAbortError(site, `topIndex + lastIndex >= 10`)
      }

      if (originAbortError !== undefined) {
        Effect.runPromise(checkLog(originAbortError.getMessage()))
        abortAction()
        return Effect.fail(originAbortError)
      }

      return Effect.succeed(site)
    }
  }
}

class TopOkData {
  site: string
  wait: number
  index: number
  constructor(s: string, ok: OkData) {
    this.site = s
    this.wait = ok.wait
    this.index = TopOkData.getTopIndex(s)
  }

  static isClass(test: any): test is TopOkData {
    return test instanceof TopOkData
  }

  static getTopIndex(site: string) {
    const index = site.substring(site.length - 3, site.length - 2)
    return Number(index)
  }

  static getMessage(site: string) {
    const index = TopOkData.getTopIndex(site)
    return `mapper2-index = ${index}`
  }

  getTopMessage(opWait: number) {
    return `${this.site.substring(0, this.site.length - 2)} - ok - ${this.wait + opWait}`
  }
}

class OkData {
  wait: number
  constructor(w: number) {
    this.wait = w
  }

  static isClass(test: any): test is OkData {
    return test instanceof OkData
  }

  static getLastIndex(site: string) {
    const index = site.slice(-1)
    return Number(index)
  }

  getMessage(site: string) {
    return `${site} - ok - ${this.wait}`
  }
}

const sleep = (wait: number) =>
  Effect.tryPromise<OkData>(
    () =>
      new Promise((resolve, reject) => {
        const waitSeconds = wait / 1000
        if (waitSeconds % 3 === 0) {
          reject(new SleepNGBeforeError())
        } else if (waitSeconds % 2 === 0) {
          setTimeout(() => reject(new SleepNGAfterError(wait)), wait)
        } else {
          setTimeout(() => resolve(new OkData(wait)), wait)
        }
      })
  )

const checkLog = (text: string) =>
  Effect.sync(() => console.log(`time : ${new Date().toISOString()} - text: ${text}`))

const testOptions = {
  timeout: 1000 * 100
}

const sumNumber = (chunkSize: number) => (a: number[], b: number[]) => {
  function addArray(num: number) {
    if (a.length === chunkSize) {
      const aMin = Math.min(...a)
      const aMinIndex = a.indexOf(aMin)
      a[aMinIndex] += num
    } else {
      a.push(num)
    }
  }

  b.forEach((bNum) => addArray(bNum))

  return a
}

function getWaitNumber(index: number) {
  const wait = index * 1000
  return wait
}

function mapError_sleepNg<T>(
  site: string,
  effectAndThen: Effect.Effect<T, UnknownException, never>
) {
  const effectAll = Effect.mapError(effectAndThen, (err) => {
    if (SleepNGBeforeError.isClass(err.error)) {
      return err.error
    } else if (SleepNGAfterError.isClass(err.error)) {
      return err.error
    } else {
      Effect.runPromise(checkLog(`mapError_sleepNg : UnknownException : ${err}`))
      return err
    }
  })
  return effectAll
}

function mapNumber_fromData<T, U>(data: Either.Either<T, U>) {
  const result = Either.mapBoth(data, {
    onLeft: (left) => {
      if (OkData.isClass(left)) {
        return left.wait
      } else if (SleepNGBeforeError.isClass(left)) {
        return 0
      } else if (SleepNGAfterError.isClass(left)) {
        return left.wait
      }

      return 0
    },
    onRight: (right) => {
      if (OkData.isClass(right)) {
        return right.wait
      }

      return 0
    }
  })

  return Either.isLeft(result) ? result.left : result.right
}

const do_sleep1 = (site: string) => (wait: number) =>
  Effect.try(() => {
    const sleepEffect = sleep(wait)
    const effectAll = mapError_sleepNg(site, sleepEffect)
    return effectAll
  })

const do_sleep2 = (site: string) => (wait: number) =>
  Effect.try(() => {
    const sleepEffect = sleep(wait)
    const effectAndThen = Effect.andThen(sleepEffect, (okData) => {
      const topOkData = new TopOkData(site, okData)
      return topOkData
    })
    const effectAll = mapError_sleepNg(site, effectAndThen)
    return effectAll
  })

const mapper1_gen = (abortController: AbortController, topIndex?: number) => (site: string) =>
  Effect.gen(function* () {
    if (abortController.signal.aborted) {
      const alreadyAbortError = new AlreadyAbortError()
      Effect.runPromise(checkLog(AlreadyAbortError.getMessage(site)))
      return Either.left(alreadyAbortError)
    }

    const abortEvent = () => {
      Effect.runPromise(checkLog(AlreadyAbortError.getMessage(site)))
    }
    abortController.signal.addEventListener('abort', abortEvent)
    function removeEvent() {
      abortController.signal.removeEventListener('abort', abortEvent)
    }
    const abortAction = () => {
      removeEvent()
      abortController.abort()
    }

    const abortCheck = yield* pipe(site, OriginAbortError.siteCheck(abortAction, topIndex))
    const effect = yield* pipe(abortCheck, OkData.getLastIndex, getWaitNumber, do_sleep1(site))
    const either = yield* Effect.either(effect)

    if (Either.isRight(either)) {
      const okData = either.right

      const message = okData.getMessage(site)
      Effect.runPromise(checkLog(message))
    } else if (Either.isLeft(either)) {
      const ngData = either.left

      if (SleepNGBeforeError.isClass(ngData)) {
        const message = ngData.getMessage(site)
        Effect.runPromise(checkLog(message))
      } else if (SleepNGAfterError.isClass(ngData)) {
        const message = ngData.getMessage(site)
        Effect.runPromise(checkLog(message))
      }
    }

    removeEvent()
    return either
  })

const mapper2_gen =
  (abortController: AbortController, options: { concurrency: number }) => (sites: string[]) =>
    Effect.gen(function* () {
      const site = sites[0]

      if (abortController.signal.aborted) {
        const topAlreadyAbortError = new TopAlreadyAbortError()
        Effect.runPromise(checkLog(TopAlreadyAbortError.getMessage(site)))
        return Either.left(topAlreadyAbortError)
      }

      const abortEvent = () => {
        Effect.runPromise(checkLog(TopAlreadyAbortError.getMessage(site)))
      }
      abortController.signal.addEventListener('abort', abortEvent)
      function removeEvent() {
        abortController.signal.removeEventListener('abort', abortEvent)
      }

      const effect2 = yield* pipe(site, TopOkData.getTopIndex, getWaitNumber, do_sleep2(site))
      const either2 = yield* Effect.either(effect2)

      const map = Either.mapBoth(either2, {
        onLeft: (ngData) => {
          const topNgData = new TopSleepNGError(ngData)

          const message = topNgData.getMessage(site)
          Effect.runPromise(checkLog(message))

          const topMessage = topNgData.getTopMessage(site)
          return topMessage
        },
        onRight: (topOkData) =>
          Effect.gen(function* () {
            const message = TopOkData.getMessage(site)
            Effect.runPromise(checkLog(message))

            const either1 = yield* Effect.all(
              sites.map(mapper1_gen(abortController, topOkData.index)),
              options
            )
            const arrayEither1 = either1.map(mapNumber_fromData)
            const concurrency = options['concurrency']
            const chunkArray = Array.chunksOf(arrayEither1, concurrency)
            const waitNumber = Array.reduce(chunkArray, [0], sumNumber(concurrency))

            const topMessage = topOkData.getTopMessage(Math.max(...waitNumber))
            return topMessage
          })
      })

      const result = Either.isLeft(map) ? map : Either.right(yield* map.right)

      removeEvent()
      return result
    })

describe('effect-ts 非同期処理2つの配列', () => {
  test('使い方1ー全部同時実行からの全部同時実行', testOptions, async () => {
    const sites = [
      // ['https://test/1/1'],
      // ['https://test/2/1', 'https://test/2/2']
      // ['https://test/3/1', 'https://test/3/2', 'https://test/3/3']
      // ['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
      // [
      //   'https://test/5/1',
      //   'https://test/5/2',
      //   'https://test/5/3',
      //   'https://test/5/4',
      //   'https://test/5/5'
      // ],
      [
        'https://test/7/1',
        'https://test/7/2',
        'https://test/7/3',
        'https://test/7/4',
        'https://test/7/5',
        'https://test/7/6',
        'https://test/7/7'
      ]
    ]

    const abortController = new AbortController()

    Effect.runPromise(checkLog('[start]'))

    const effect = await Effect.all(
      sites.map(mapper2_gen(abortController, { concurrency: Number.MAX_VALUE })),
      {
        concurrency: Number.MAX_VALUE
      }
    )
    const exit = await Effect.runPromiseExit(effect, { signal: abortController.signal })

    console.log(
      Exit.match(exit, {
        onFailure: (cause) => `中断しました: ${Cause.pretty(cause)}`,
        onSuccess: (either) => either
      })
    )
  })
})

実行結果

time : 2025-02-26T13:42:47.593Z - text: [start]
time : 2025-02-26T13:42:54.625Z - text: mapper2-index = 7
time : 2025-02-26T13:42:54.638Z - text: https://test/7/3 - abort - topIndex + lastIndex >= 10
time : 2025-02-26T13:42:54.643Z - text: mapper2-index = 7 - aborted - from event
time : 2025-02-26T13:42:54.645Z - text: https://test/7/1 - aborted - from event
time : 2025-02-26T13:42:54.645Z - text: https://test/7/2 - aborted - from event
time : 2025-02-26T13:42:54.651Z - text: https://test/7/4 - aborted - from event
time : 2025-02-26T13:42:54.653Z - text: https://test/7/5 - aborted - from event
time : 2025-02-26T13:42:54.655Z - text: https://test/7/6 - aborted - from event
time : 2025-02-26T13:42:54.655Z - text: https://test/7/7 - aborted - from event
中断しました: All fibers interrupted without errors.

実行結果(※中断データを除外した場合)

time : 2025-02-26T13:44:50.401Z - text: [start]
time : 2025-02-26T13:44:57.427Z - text: mapper2-index = 7
time : 2025-02-26T13:44:58.451Z - text: https://test/7/1 - ok - 1000
time : 2025-02-26T13:44:59.450Z - text: https://test/7/2 - ng - 2000 - Sleep NG after wait
[
  { _id: 'Either', _tag: 'Right', right: 'https://test/7 - ok - 9000' }
]

全部同時実行からの直列実行

import { describe, test } from 'vitest'
import { Effect, pipe, Array, Either, Exit, Cause } from 'effect'
import { UnknownException } from 'effect/Cause'

class SleepNGBeforeError {
  static tagName = 'Sleep.Ng.Before'
  readonly _tag = SleepNGBeforeError.tagName

  static isClass(test: any): test is SleepNGBeforeError {
    return test._tag === SleepNGBeforeError.tagName
  }

  getMessage(site: string) {
    return `${site} - ng - Sleep NG before wait`
  }
}

class SleepNGAfterError {
  static tagName = 'Sleep.Ng.After'
  readonly _tag = SleepNGAfterError.tagName

  wait: number
  constructor(w: number) {
    this.wait = w
  }

  static isClass(test: any): test is SleepNGAfterError {
    return test._tag === SleepNGAfterError.tagName
  }

  getMessage(site: string) {
    return `${site} - ng - ${this.wait} - Sleep NG after wait`
  }
}

type AllNgData = SleepNGBeforeError | SleepNGAfterError | UnknownException

class TopSleepNGError {
  static tagName = 'Top.Sleep.Ng'
  readonly _tag = TopSleepNGError.tagName

  ngData: AllNgData
  constructor(ng: AllNgData) {
    this.ngData = ng
  }

  static isClass(test: any): test is SleepNGAfterError {
    return test._tag === SleepNGAfterError.tagName
  }

  getLastMessage() {
    const lastMessage =
      this.ngData instanceof UnknownException
        ? ' - ng - UnknownException'
        : this.ngData.getMessage('')
    return lastMessage
  }

  getMessage(site: string) {
    const index = TopOkData.getTopIndex(site)
    const lastMessage = this.getLastMessage()
    return `mapper2-index = ${index}${lastMessage}`
  }

  getTopMessage(site: string) {
    const lastMessage = this.getLastMessage()
    return `${site.substring(0, site.length - 2)}${lastMessage}`
  }
}

class AlreadyAbortError {
  static tagName = 'Already.Abort.Error'
  readonly _tag = AlreadyAbortError.tagName

  static isClass(test: any): test is AlreadyAbortError {
    return test._tag === AlreadyAbortError.tagName
  }

  static getMessage(site: string) {
    return `${site} - aborted - from event`
  }
}

class TopAlreadyAbortError {
  static tagName = 'Top.Already.Abort.Error'
  readonly _tag = TopAlreadyAbortError.tagName

  static isClass(test: any): test is TopAlreadyAbortError {
    return test._tag === TopAlreadyAbortError.tagName
  }

  static getMessage(site: string) {
    const topIndex = TopOkData.getTopIndex(site)
    return `mapper2-index = ${topIndex} - aborted - from event`
  }
}

class OriginAbortError {
  static tagName = 'Origin.Abort.Error'
  readonly _tag = OriginAbortError.tagName

  site: string
  reason: string
  constructor(s: string, r: string) {
    this.site = s
    this.reason = r
  }

  static isClass(test: any): test is OriginAbortError {
    return test._tag === OriginAbortError.tagName
  }

  getMessage() {
    return `${this.site} - abort - ${this.reason}`
  }

  static siteCheck(abortAction: Function, topIndex?: number) {
    return (site: string) => {
      const lastIndex = OkData.getLastIndex(site)
      let originAbortError = undefined
      if (Number.isNaN(lastIndex)) {
        originAbortError = new OriginAbortError(site, `${lastIndex} is NaN`)
      } else if (lastIndex === undefined) {
        originAbortError = new OriginAbortError(site, `${lastIndex} is undefined`)
      } else if (site[site.length - 2] !== '/') {
        originAbortError = new OriginAbortError(site, `lastIndex >= 10`)
      } else if (topIndex && topIndex + lastIndex >= 10) {
        originAbortError = new OriginAbortError(site, `topIndex + lastIndex >= 10`)
      }

      if (originAbortError !== undefined) {
        Effect.runPromise(checkLog(originAbortError.getMessage()))
        abortAction()
        return Effect.fail(originAbortError)
      }

      return Effect.succeed(site)
    }
  }
}

class TopOkData {
  site: string
  wait: number
  index: number
  constructor(s: string, ok: OkData) {
    this.site = s
    this.wait = ok.wait
    this.index = TopOkData.getTopIndex(s)
  }

  static isClass(test: any): test is TopOkData {
    return test instanceof TopOkData
  }

  static getTopIndex(site: string) {
    const index = site.substring(site.length - 3, site.length - 2)
    return Number(index)
  }

  static getMessage(site: string) {
    const index = TopOkData.getTopIndex(site)
    return `mapper2-index = ${index}`
  }

  getTopMessage(opWait: number) {
    return `${this.site.substring(0, this.site.length - 2)} - ok - ${this.wait + opWait}`
  }
}

class OkData {
  wait: number
  constructor(w: number) {
    this.wait = w
  }

  static isClass(test: any): test is OkData {
    return test instanceof OkData
  }

  static getLastIndex(site: string) {
    const index = site.slice(-1)
    return Number(index)
  }

  getMessage(site: string) {
    return `${site} - ok - ${this.wait}`
  }
}

const sleep = (wait: number) =>
  Effect.tryPromise<OkData>(
    () =>
      new Promise((resolve, reject) => {
        const waitSeconds = wait / 1000
        if (waitSeconds % 3 === 0) {
          reject(new SleepNGBeforeError())
        } else if (waitSeconds % 2 === 0) {
          setTimeout(() => reject(new SleepNGAfterError(wait)), wait)
        } else {
          setTimeout(() => resolve(new OkData(wait)), wait)
        }
      })
  )

const checkLog = (text: string) =>
  Effect.sync(() => console.log(`time : ${new Date().toISOString()} - text: ${text}`))

const testOptions = {
  timeout: 1000 * 100
}

const sumNumber = (chunkSize: number) => (a: number[], b: number[]) => {
  function addArray(num: number) {
    if (a.length === chunkSize) {
      const aMin = Math.min(...a)
      const aMinIndex = a.indexOf(aMin)
      a[aMinIndex] += num
    } else {
      a.push(num)
    }
  }

  b.forEach((bNum) => addArray(bNum))

  return a
}

function getWaitNumber(index: number) {
  const wait = index * 1000
  return wait
}

function mapError_sleepNg<T>(
  site: string,
  effectAndThen: Effect.Effect<T, UnknownException, never>
) {
  const effectAll = Effect.mapError(effectAndThen, (err) => {
    if (SleepNGBeforeError.isClass(err.error)) {
      return err.error
    } else if (SleepNGAfterError.isClass(err.error)) {
      return err.error
    } else {
      Effect.runPromise(checkLog(`mapError_sleepNg : UnknownException : ${err}`))
      return err
    }
  })
  return effectAll
}

function mapNumber_fromData<T, U>(data: Either.Either<T, U>) {
  const result = Either.mapBoth(data, {
    onLeft: (left) => {
      if (OkData.isClass(left)) {
        return left.wait
      } else if (SleepNGBeforeError.isClass(left)) {
        return 0
      } else if (SleepNGAfterError.isClass(left)) {
        return left.wait
      }

      return 0
    },
    onRight: (right) => {
      if (OkData.isClass(right)) {
        return right.wait
      }

      return 0
    }
  })

  return Either.isLeft(result) ? result.left : result.right
}

const do_sleep1 = (site: string) => (wait: number) =>
  Effect.try(() => {
    const sleepEffect = sleep(wait)
    const effectAll = mapError_sleepNg(site, sleepEffect)
    return effectAll
  })

const do_sleep2 = (site: string) => (wait: number) =>
  Effect.try(() => {
    const sleepEffect = sleep(wait)
    const effectAndThen = Effect.andThen(sleepEffect, (okData) => {
      const topOkData = new TopOkData(site, okData)
      return topOkData
    })
    const effectAll = mapError_sleepNg(site, effectAndThen)
    return effectAll
  })

const mapper1_gen = (abortController: AbortController, topIndex?: number) => (site: string) =>
  Effect.gen(function* () {
    if (abortController.signal.aborted) {
      const alreadyAbortError = new AlreadyAbortError()
      Effect.runPromise(checkLog(AlreadyAbortError.getMessage(site)))
      return Either.left(alreadyAbortError)
    }

    const abortEvent = () => {
      Effect.runPromise(checkLog(AlreadyAbortError.getMessage(site)))
    }
    abortController.signal.addEventListener('abort', abortEvent)
    function removeEvent() {
      abortController.signal.removeEventListener('abort', abortEvent)
    }
    const abortAction = () => {
      removeEvent()
      abortController.abort()
    }

    const abortCheck = yield* pipe(site, OriginAbortError.siteCheck(abortAction, topIndex))
    const effect = yield* pipe(abortCheck, OkData.getLastIndex, getWaitNumber, do_sleep1(site))
    const either = yield* Effect.either(effect)

    if (Either.isRight(either)) {
      const okData = either.right

      const message = okData.getMessage(site)
      Effect.runPromise(checkLog(message))
    } else if (Either.isLeft(either)) {
      const ngData = either.left

      if (SleepNGBeforeError.isClass(ngData)) {
        const message = ngData.getMessage(site)
        Effect.runPromise(checkLog(message))
      } else if (SleepNGAfterError.isClass(ngData)) {
        const message = ngData.getMessage(site)
        Effect.runPromise(checkLog(message))
      }
    }

    removeEvent()
    return either
  })

const mapper2_gen =
  (abortController: AbortController, options: { concurrency: number }) => (sites: string[]) =>
    Effect.gen(function* () {
      const site = sites[0]

      if (abortController.signal.aborted) {
        const topAlreadyAbortError = new TopAlreadyAbortError()
        Effect.runPromise(checkLog(TopAlreadyAbortError.getMessage(site)))
        return Either.left(topAlreadyAbortError)
      }

      const abortEvent = () => {
        Effect.runPromise(checkLog(TopAlreadyAbortError.getMessage(site)))
      }
      abortController.signal.addEventListener('abort', abortEvent)
      function removeEvent() {
        abortController.signal.removeEventListener('abort', abortEvent)
      }

      const effect2 = yield* pipe(site, TopOkData.getTopIndex, getWaitNumber, do_sleep2(site))
      const either2 = yield* Effect.either(effect2)

      const map = Either.mapBoth(either2, {
        onLeft: (ngData) => {
          const topNgData = new TopSleepNGError(ngData)

          const message = topNgData.getMessage(site)
          Effect.runPromise(checkLog(message))

          const topMessage = topNgData.getTopMessage(site)
          return topMessage
        },
        onRight: (topOkData) =>
          Effect.gen(function* () {
            const message = TopOkData.getMessage(site)
            Effect.runPromise(checkLog(message))

            const either1 = yield* Effect.all(
              sites.map(mapper1_gen(abortController, topOkData.index)),
              options
            )
            const arrayEither1 = either1.map(mapNumber_fromData)
            const concurrency = options['concurrency']
            const chunkArray = Array.chunksOf(arrayEither1, concurrency)
            const waitNumber = Array.reduce(chunkArray, [0], sumNumber(concurrency))

            const topMessage = topOkData.getTopMessage(Math.max(...waitNumber))
            return topMessage
          })
      })

      const result = Either.isLeft(map) ? map : Either.right(yield* map.right)

      removeEvent()
      return result
    })

describe('effect-ts 非同期処理2つの配列', () => {
  test('使い方3ー全部同時実行からの直列実行', testOptions, async () => {
    const sites = [
      // ['https://test/1/1'],
      // ['https://test/2/1', 'https://test/2/2']
      // ['https://test/3/1', 'https://test/3/2', 'https://test/3/3']
      // ['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
      // [
      //   'https://test/5/1',
      //   'https://test/5/2',
      //   'https://test/5/3',
      //   'https://test/5/4',
      //   'https://test/5/5'
      // ],
      [
        'https://test/7/1',
        'https://test/7/2',
        'https://test/7/3',
        'https://test/7/4',
        'https://test/7/5',
        'https://test/7/6',
        'https://test/7/7'
      ]
    ]

    const abortController = new AbortController()

    Effect.runPromise(checkLog('[start]'))

    const effect = await Effect.all(sites.map(mapper2_gen(abortController, { concurrency: 1 })), {
      concurrency: Number.MAX_VALUE
    })
    const exit = await Effect.runPromiseExit(effect, { signal: abortController.signal })

    console.log(
      Exit.match(exit, {
        onFailure: (cause) => `中断しました: ${Cause.pretty(cause)}`,
        onSuccess: (either) => either
      })
    )
  })
})

実行結果

time : 2025-02-26T13:47:01.774Z - text: [start]
time : 2025-02-26T13:47:08.793Z - text: mapper2-index = 7
time : 2025-02-26T13:47:09.810Z - text: https://test/7/1 - ok - 1000
time : 2025-02-26T13:47:11.834Z - text: https://test/7/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-26T13:47:11.836Z - text: https://test/7/3 - abort - topIndex + lastIndex >= 10
time : 2025-02-26T13:47:11.841Z - text: mapper2-index = 7 - aborted - from event
中断しました: All fibers interrupted without errors.

実行結果(※中断データを除外した場合)

time : 2025-02-26T13:48:58.173Z - text: [start]
time : 2025-02-26T13:49:05.191Z - text: mapper2-index = 7
time : 2025-02-26T13:49:06.195Z - text: https://test/7/1 - ok - 1000
time : 2025-02-26T13:49:08.202Z - text: https://test/7/2 - ng - 2000 - Sleep NG after wait
[
  {
    _id: 'Either',
    _tag: 'Right',
    right: 'https://test/7 - ok - 10000'
  }
]

全部同時実行からの最大2個まで同時実行

import { describe, test } from 'vitest'
import { Effect, pipe, Array, Either, Exit, Cause } from 'effect'
import { UnknownException } from 'effect/Cause'

class SleepNGBeforeError {
  static tagName = 'Sleep.Ng.Before'
  readonly _tag = SleepNGBeforeError.tagName

  static isClass(test: any): test is SleepNGBeforeError {
    return test._tag === SleepNGBeforeError.tagName
  }

  getMessage(site: string) {
    return `${site} - ng - Sleep NG before wait`
  }
}

class SleepNGAfterError {
  static tagName = 'Sleep.Ng.After'
  readonly _tag = SleepNGAfterError.tagName

  wait: number
  constructor(w: number) {
    this.wait = w
  }

  static isClass(test: any): test is SleepNGAfterError {
    return test._tag === SleepNGAfterError.tagName
  }

  getMessage(site: string) {
    return `${site} - ng - ${this.wait} - Sleep NG after wait`
  }
}

type AllNgData = SleepNGBeforeError | SleepNGAfterError | UnknownException

class TopSleepNGError {
  static tagName = 'Top.Sleep.Ng'
  readonly _tag = TopSleepNGError.tagName

  ngData: AllNgData
  constructor(ng: AllNgData) {
    this.ngData = ng
  }

  static isClass(test: any): test is SleepNGAfterError {
    return test._tag === SleepNGAfterError.tagName
  }

  getLastMessage() {
    const lastMessage =
      this.ngData instanceof UnknownException
        ? ' - ng - UnknownException'
        : this.ngData.getMessage('')
    return lastMessage
  }

  getMessage(site: string) {
    const index = TopOkData.getTopIndex(site)
    const lastMessage = this.getLastMessage()
    return `mapper2-index = ${index}${lastMessage}`
  }

  getTopMessage(site: string) {
    const lastMessage = this.getLastMessage()
    return `${site.substring(0, site.length - 2)}${lastMessage}`
  }
}

class AlreadyAbortError {
  static tagName = 'Already.Abort.Error'
  readonly _tag = AlreadyAbortError.tagName

  static isClass(test: any): test is AlreadyAbortError {
    return test._tag === AlreadyAbortError.tagName
  }

  static getMessage(site: string) {
    return `${site} - aborted - from event`
  }
}

class TopAlreadyAbortError {
  static tagName = 'Top.Already.Abort.Error'
  readonly _tag = TopAlreadyAbortError.tagName

  static isClass(test: any): test is TopAlreadyAbortError {
    return test._tag === TopAlreadyAbortError.tagName
  }

  static getMessage(site: string) {
    const topIndex = TopOkData.getTopIndex(site)
    return `mapper2-index = ${topIndex} - aborted - from event`
  }
}

class OriginAbortError {
  static tagName = 'Origin.Abort.Error'
  readonly _tag = OriginAbortError.tagName

  site: string
  reason: string
  constructor(s: string, r: string) {
    this.site = s
    this.reason = r
  }

  static isClass(test: any): test is OriginAbortError {
    return test._tag === OriginAbortError.tagName
  }

  getMessage() {
    return `${this.site} - abort - ${this.reason}`
  }

  static siteCheck(abortAction: Function, topIndex?: number) {
    return (site: string) => {
      const lastIndex = OkData.getLastIndex(site)
      let originAbortError = undefined
      if (Number.isNaN(lastIndex)) {
        originAbortError = new OriginAbortError(site, `${lastIndex} is NaN`)
      } else if (lastIndex === undefined) {
        originAbortError = new OriginAbortError(site, `${lastIndex} is undefined`)
      } else if (site[site.length - 2] !== '/') {
        originAbortError = new OriginAbortError(site, `lastIndex >= 10`)
      } else if (topIndex && topIndex + lastIndex >= 10) {
        originAbortError = new OriginAbortError(site, `topIndex + lastIndex >= 10`)
      }

      if (originAbortError !== undefined) {
        Effect.runPromise(checkLog(originAbortError.getMessage()))
        abortAction()
        return Effect.fail(originAbortError)
      }

      return Effect.succeed(site)
    }
  }
}

class TopOkData {
  site: string
  wait: number
  index: number
  constructor(s: string, ok: OkData) {
    this.site = s
    this.wait = ok.wait
    this.index = TopOkData.getTopIndex(s)
  }

  static isClass(test: any): test is TopOkData {
    return test instanceof TopOkData
  }

  static getTopIndex(site: string) {
    const index = site.substring(site.length - 3, site.length - 2)
    return Number(index)
  }

  static getMessage(site: string) {
    const index = TopOkData.getTopIndex(site)
    return `mapper2-index = ${index}`
  }

  getTopMessage(opWait: number) {
    return `${this.site.substring(0, this.site.length - 2)} - ok - ${this.wait + opWait}`
  }
}

class OkData {
  wait: number
  constructor(w: number) {
    this.wait = w
  }

  static isClass(test: any): test is OkData {
    return test instanceof OkData
  }

  static getLastIndex(site: string) {
    const index = site.slice(-1)
    return Number(index)
  }

  getMessage(site: string) {
    return `${site} - ok - ${this.wait}`
  }
}

const sleep = (wait: number) =>
  Effect.tryPromise<OkData>(
    () =>
      new Promise((resolve, reject) => {
        const waitSeconds = wait / 1000
        if (waitSeconds % 3 === 0) {
          reject(new SleepNGBeforeError())
        } else if (waitSeconds % 2 === 0) {
          setTimeout(() => reject(new SleepNGAfterError(wait)), wait)
        } else {
          setTimeout(() => resolve(new OkData(wait)), wait)
        }
      })
  )

const checkLog = (text: string) =>
  Effect.sync(() => console.log(`time : ${new Date().toISOString()} - text: ${text}`))

const testOptions = {
  timeout: 1000 * 100
}

const sumNumber = (chunkSize: number) => (a: number[], b: number[]) => {
  function addArray(num: number) {
    if (a.length === chunkSize) {
      const aMin = Math.min(...a)
      const aMinIndex = a.indexOf(aMin)
      a[aMinIndex] += num
    } else {
      a.push(num)
    }
  }

  b.forEach((bNum) => addArray(bNum))

  return a
}

function getWaitNumber(index: number) {
  const wait = index * 1000
  return wait
}

function mapError_sleepNg<T>(
  site: string,
  effectAndThen: Effect.Effect<T, UnknownException, never>
) {
  const effectAll = Effect.mapError(effectAndThen, (err) => {
    if (SleepNGBeforeError.isClass(err.error)) {
      return err.error
    } else if (SleepNGAfterError.isClass(err.error)) {
      return err.error
    } else {
      Effect.runPromise(checkLog(`mapError_sleepNg : UnknownException : ${err}`))
      return err
    }
  })
  return effectAll
}

function mapNumber_fromData<T, U>(data: Either.Either<T, U>) {
  const result = Either.mapBoth(data, {
    onLeft: (left) => {
      if (OkData.isClass(left)) {
        return left.wait
      } else if (SleepNGBeforeError.isClass(left)) {
        return 0
      } else if (SleepNGAfterError.isClass(left)) {
        return left.wait
      }

      return 0
    },
    onRight: (right) => {
      if (OkData.isClass(right)) {
        return right.wait
      }

      return 0
    }
  })

  return Either.isLeft(result) ? result.left : result.right
}

const do_sleep1 = (site: string) => (wait: number) =>
  Effect.try(() => {
    const sleepEffect = sleep(wait)
    const effectAll = mapError_sleepNg(site, sleepEffect)
    return effectAll
  })

const do_sleep2 = (site: string) => (wait: number) =>
  Effect.try(() => {
    const sleepEffect = sleep(wait)
    const effectAndThen = Effect.andThen(sleepEffect, (okData) => {
      const topOkData = new TopOkData(site, okData)
      return topOkData
    })
    const effectAll = mapError_sleepNg(site, effectAndThen)
    return effectAll
  })

const mapper1_gen = (abortController: AbortController, topIndex?: number) => (site: string) =>
  Effect.gen(function* () {
    if (abortController.signal.aborted) {
      const alreadyAbortError = new AlreadyAbortError()
      Effect.runPromise(checkLog(AlreadyAbortError.getMessage(site)))
      return Either.left(alreadyAbortError)
    }

    const abortEvent = () => {
      Effect.runPromise(checkLog(AlreadyAbortError.getMessage(site)))
    }
    abortController.signal.addEventListener('abort', abortEvent)
    function removeEvent() {
      abortController.signal.removeEventListener('abort', abortEvent)
    }
    const abortAction = () => {
      removeEvent()
      abortController.abort()
    }

    const abortCheck = yield* pipe(site, OriginAbortError.siteCheck(abortAction, topIndex))
    const effect = yield* pipe(abortCheck, OkData.getLastIndex, getWaitNumber, do_sleep1(site))
    const either = yield* Effect.either(effect)

    if (Either.isRight(either)) {
      const okData = either.right

      const message = okData.getMessage(site)
      Effect.runPromise(checkLog(message))
    } else if (Either.isLeft(either)) {
      const ngData = either.left

      if (SleepNGBeforeError.isClass(ngData)) {
        const message = ngData.getMessage(site)
        Effect.runPromise(checkLog(message))
      } else if (SleepNGAfterError.isClass(ngData)) {
        const message = ngData.getMessage(site)
        Effect.runPromise(checkLog(message))
      }
    }

    removeEvent()
    return either
  })

const mapper2_gen =
  (abortController: AbortController, options: { concurrency: number }) => (sites: string[]) =>
    Effect.gen(function* () {
      const site = sites[0]

      if (abortController.signal.aborted) {
        const topAlreadyAbortError = new TopAlreadyAbortError()
        Effect.runPromise(checkLog(TopAlreadyAbortError.getMessage(site)))
        return Either.left(topAlreadyAbortError)
      }

      const abortEvent = () => {
        Effect.runPromise(checkLog(TopAlreadyAbortError.getMessage(site)))
      }
      abortController.signal.addEventListener('abort', abortEvent)
      function removeEvent() {
        abortController.signal.removeEventListener('abort', abortEvent)
      }

      const effect2 = yield* pipe(site, TopOkData.getTopIndex, getWaitNumber, do_sleep2(site))
      const either2 = yield* Effect.either(effect2)

      const map = Either.mapBoth(either2, {
        onLeft: (ngData) => {
          const topNgData = new TopSleepNGError(ngData)

          const message = topNgData.getMessage(site)
          Effect.runPromise(checkLog(message))

          const topMessage = topNgData.getTopMessage(site)
          return topMessage
        },
        onRight: (topOkData) =>
          Effect.gen(function* () {
            const message = TopOkData.getMessage(site)
            Effect.runPromise(checkLog(message))

            const either1 = yield* Effect.all(
              sites.map(mapper1_gen(abortController, topOkData.index)),
              options
            )
            const arrayEither1 = either1.map(mapNumber_fromData)
            const concurrency = options['concurrency']
            const chunkArray = Array.chunksOf(arrayEither1, concurrency)
            const waitNumber = Array.reduce(chunkArray, [0], sumNumber(concurrency))

            const topMessage = topOkData.getTopMessage(Math.max(...waitNumber))
            return topMessage
          })
      })

      const result = Either.isLeft(map) ? map : Either.right(yield* map.right)

      removeEvent()
      return result
    })

describe('effect-ts 非同期処理2つの配列', () => {
  test('使い方5ー全部同時実行からの最大2個まで同時実行', testOptions, async () => {
    const sites = [
      // ['https://test/1/1'],
      // ['https://test/2/1', 'https://test/2/2']
      // ['https://test/3/1', 'https://test/3/2', 'https://test/3/3']
      // ['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
      // [
      //   'https://test/5/1',
      //   'https://test/5/2',
      //   'https://test/5/3',
      //   'https://test/5/4',
      //   'https://test/5/5'
      // ],
      [
        'https://test/7/1',
        'https://test/7/2',
        'https://test/7/3',
        'https://test/7/4',
        'https://test/7/5',
        'https://test/7/6',
        'https://test/7/7'
      ]
    ]

    const abortController = new AbortController()

    Effect.runPromise(checkLog('[start]'))

    const effect = await Effect.all(sites.map(mapper2_gen(abortController, { concurrency: 2 })), {
      concurrency: Number.MAX_VALUE
    })
    const exit = await Effect.runPromiseExit(effect, { signal: abortController.signal })

    console.log(
      Exit.match(exit, {
        onFailure: (cause) => `中断しました: ${Cause.pretty(cause)}`,
        onSuccess: (either) => either
      })
    )
  })
})

実行結果

time : 2025-02-26T13:51:20.945Z - text: [start]
time : 2025-02-26T13:51:27.969Z - text: mapper2-index = 7
time : 2025-02-26T13:51:28.986Z - text: https://test/7/1 - ok - 1000
time : 2025-02-26T13:51:28.991Z - text: https://test/7/3 - abort - topIndex + lastIndex >= 10
time : 2025-02-26T13:51:28.996Z - text: mapper2-index = 7 - aborted - from event
time : 2025-02-26T13:51:28.997Z - text: https://test/7/2 - aborted - from event
中断しました: All fibers interrupted without errors.

実行結果(※中断データを除外した場合)

time : 2025-02-26T13:53:40.471Z - text: [start]
time : 2025-02-26T13:53:47.493Z - text: mapper2-index = 7
time : 2025-02-26T13:53:48.509Z - text: https://test/7/1 - ok - 1000
time : 2025-02-26T13:53:49.511Z - text: https://test/7/2 - ng - 2000 - Sleep NG after wait
[
  { _id: 'Either', _tag: 'Right', right: 'https://test/7 - ok - 9000' }
]

配列が2つの場合(複雑)

全部同時実行からの全部同時実行

import { describe, test } from 'vitest'
import { Effect, pipe, Array, Either, Exit, Cause } from 'effect'
import { UnknownException } from 'effect/Cause'

class SleepNGBeforeError {
  static tagName = 'Sleep.Ng.Before'
  readonly _tag = SleepNGBeforeError.tagName

  static isClass(test: any): test is SleepNGBeforeError {
    return test._tag === SleepNGBeforeError.tagName
  }

  getMessage(site: string) {
    return `${site} - ng - Sleep NG before wait`
  }
}

class SleepNGAfterError {
  static tagName = 'Sleep.Ng.After'
  readonly _tag = SleepNGAfterError.tagName

  wait: number
  constructor(w: number) {
    this.wait = w
  }

  static isClass(test: any): test is SleepNGAfterError {
    return test._tag === SleepNGAfterError.tagName
  }

  getMessage(site: string) {
    return `${site} - ng - ${this.wait} - Sleep NG after wait`
  }
}

type AllNgData = SleepNGBeforeError | SleepNGAfterError | UnknownException

class TopSleepNGError {
  static tagName = 'Top.Sleep.Ng'
  readonly _tag = TopSleepNGError.tagName

  ngData: AllNgData
  constructor(ng: AllNgData) {
    this.ngData = ng
  }

  static isClass(test: any): test is SleepNGAfterError {
    return test._tag === SleepNGAfterError.tagName
  }

  getLastMessage() {
    const lastMessage =
      this.ngData instanceof UnknownException
        ? ' - ng - UnknownException'
        : this.ngData.getMessage('')
    return lastMessage
  }

  getMessage(site: string) {
    const index = TopOkData.getTopIndex(site)
    const lastMessage = this.getLastMessage()
    return `mapper2-index = ${index}${lastMessage}`
  }

  getTopMessage(site: string) {
    const lastMessage = this.getLastMessage()
    return `${site.substring(0, site.length - 2)}${lastMessage}`
  }
}

class AlreadyAbortError {
  static tagName = 'Already.Abort.Error'
  readonly _tag = AlreadyAbortError.tagName

  static isClass(test: any): test is AlreadyAbortError {
    return test._tag === AlreadyAbortError.tagName
  }

  static getMessage(site: string) {
    return `${site} - aborted - from event`
  }
}

class TopAlreadyAbortError {
  static tagName = 'Top.Already.Abort.Error'
  readonly _tag = TopAlreadyAbortError.tagName

  static isClass(test: any): test is TopAlreadyAbortError {
    return test._tag === TopAlreadyAbortError.tagName
  }

  static getMessage(site: string) {
    const topIndex = TopOkData.getTopIndex(site)
    return `mapper2-index = ${topIndex} - aborted - from event`
  }
}

class OriginAbortError {
  static tagName = 'Origin.Abort.Error'
  readonly _tag = OriginAbortError.tagName

  site: string
  reason: string
  constructor(s: string, r: string) {
    this.site = s
    this.reason = r
  }

  static isClass(test: any): test is OriginAbortError {
    return test._tag === OriginAbortError.tagName
  }

  getMessage() {
    return `${this.site} - abort - ${this.reason}`
  }

  static siteCheck(abortAction: Function, topIndex?: number) {
    return (site: string) => {
      const lastIndex = OkData.getLastIndex(site)
      let originAbortError = undefined
      if (Number.isNaN(lastIndex)) {
        originAbortError = new OriginAbortError(site, `${lastIndex} is NaN`)
      } else if (lastIndex === undefined) {
        originAbortError = new OriginAbortError(site, `${lastIndex} is undefined`)
      } else if (site[site.length - 2] !== '/') {
        originAbortError = new OriginAbortError(site, `lastIndex >= 10`)
      } else if (topIndex && topIndex + lastIndex >= 10) {
        originAbortError = new OriginAbortError(site, `topIndex + lastIndex >= 10`)
      }

      if (originAbortError !== undefined) {
        Effect.runPromise(checkLog(originAbortError.getMessage()))
        abortAction()
        return Effect.fail(originAbortError)
      }

      return Effect.succeed(site)
    }
  }
}

class TopOkData {
  site: string
  wait: number
  index: number
  constructor(s: string, ok: OkData) {
    this.site = s
    this.wait = ok.wait
    this.index = TopOkData.getTopIndex(s)
  }

  static isClass(test: any): test is TopOkData {
    return test instanceof TopOkData
  }

  static getTopIndex(site: string) {
    const index = site.substring(site.length - 3, site.length - 2)
    return Number(index)
  }

  static getMessage(site: string) {
    const index = TopOkData.getTopIndex(site)
    return `mapper2-index = ${index}`
  }

  getTopMessage(opWait: number) {
    return `${this.site.substring(0, this.site.length - 2)} - ok - ${this.wait + opWait}`
  }
}

class OkData {
  wait: number
  constructor(w: number) {
    this.wait = w
  }

  static isClass(test: any): test is OkData {
    return test instanceof OkData
  }

  static getLastIndex(site: string) {
    const index = site.slice(-1)
    return Number(index)
  }

  getMessage(site: string) {
    return `${site} - ok - ${this.wait}`
  }
}

const sleep = (wait: number) =>
  Effect.tryPromise<OkData>(
    () =>
      new Promise((resolve, reject) => {
        const waitSeconds = wait / 1000
        if (waitSeconds % 3 === 0) {
          reject(new SleepNGBeforeError())
        } else if (waitSeconds % 2 === 0) {
          setTimeout(() => reject(new SleepNGAfterError(wait)), wait)
        } else {
          setTimeout(() => resolve(new OkData(wait)), wait)
        }
      })
  )

const checkLog = (text: string) =>
  Effect.sync(() => console.log(`time : ${new Date().toISOString()} - text: ${text}`))

const testOptions = {
  timeout: 1000 * 100
}

const sumNumber = (chunkSize: number) => (a: number[], b: number[]) => {
  function addArray(num: number) {
    if (a.length === chunkSize) {
      const aMin = Math.min(...a)
      const aMinIndex = a.indexOf(aMin)
      a[aMinIndex] += num
    } else {
      a.push(num)
    }
  }

  b.forEach((bNum) => addArray(bNum))

  return a
}

function getWaitNumber(index: number) {
  const wait = index * 1000
  return wait
}

function mapError_sleepNg<T>(
  site: string,
  effectAndThen: Effect.Effect<T, UnknownException, never>
) {
  const effectAll = Effect.mapError(effectAndThen, (err) => {
    if (SleepNGBeforeError.isClass(err.error)) {
      return err.error
    } else if (SleepNGAfterError.isClass(err.error)) {
      return err.error
    } else {
      Effect.runPromise(checkLog(`mapError_sleepNg : UnknownException : ${err}`))
      return err
    }
  })
  return effectAll
}

function mapNumber_fromData<T, U>(data: Either.Either<T, U>) {
  const result = Either.mapBoth(data, {
    onLeft: (left) => {
      if (OkData.isClass(left)) {
        return left.wait
      } else if (SleepNGBeforeError.isClass(left)) {
        return 0
      } else if (SleepNGAfterError.isClass(left)) {
        return left.wait
      }

      return 0
    },
    onRight: (right) => {
      if (OkData.isClass(right)) {
        return right.wait
      }

      return 0
    }
  })

  return Either.isLeft(result) ? result.left : result.right
}

const do_sleep1 = (site: string) => (wait: number) =>
  Effect.try(() => {
    const sleepEffect = sleep(wait)
    const effectAll = mapError_sleepNg(site, sleepEffect)
    return effectAll
  })

const do_sleep2 = (site: string) => (wait: number) =>
  Effect.try(() => {
    const sleepEffect = sleep(wait)
    const effectAndThen = Effect.andThen(sleepEffect, (okData) => {
      const topOkData = new TopOkData(site, okData)
      return topOkData
    })
    const effectAll = mapError_sleepNg(site, effectAndThen)
    return effectAll
  })

const mapper1_gen = (abortController: AbortController, topIndex?: number) => (site: string) =>
  Effect.gen(function* () {
    if (abortController.signal.aborted) {
      const alreadyAbortError = new AlreadyAbortError()
      Effect.runPromise(checkLog(AlreadyAbortError.getMessage(site)))
      return Either.left(alreadyAbortError)
    }

    const abortEvent = () => {
      Effect.runPromise(checkLog(AlreadyAbortError.getMessage(site)))
    }
    abortController.signal.addEventListener('abort', abortEvent)
    function removeEvent() {
      abortController.signal.removeEventListener('abort', abortEvent)
    }
    const abortAction = () => {
      removeEvent()
      abortController.abort()
    }

    const abortCheck = yield* pipe(site, OriginAbortError.siteCheck(abortAction, topIndex))
    const effect = yield* pipe(abortCheck, OkData.getLastIndex, getWaitNumber, do_sleep1(site))
    const either = yield* Effect.either(effect)

    if (Either.isRight(either)) {
      const okData = either.right

      const message = okData.getMessage(site)
      Effect.runPromise(checkLog(message))
    } else if (Either.isLeft(either)) {
      const ngData = either.left

      if (SleepNGBeforeError.isClass(ngData)) {
        const message = ngData.getMessage(site)
        Effect.runPromise(checkLog(message))
      } else if (SleepNGAfterError.isClass(ngData)) {
        const message = ngData.getMessage(site)
        Effect.runPromise(checkLog(message))
      }
    }

    removeEvent()
    return either
  })

const mapper2_gen =
  (abortController: AbortController, options: { concurrency: number }) => (sites: string[]) =>
    Effect.gen(function* () {
      const site = sites[0]

      if (abortController.signal.aborted) {
        const topAlreadyAbortError = new TopAlreadyAbortError()
        Effect.runPromise(checkLog(TopAlreadyAbortError.getMessage(site)))
        return Either.left(topAlreadyAbortError)
      }

      const abortEvent = () => {
        Effect.runPromise(checkLog(TopAlreadyAbortError.getMessage(site)))
      }
      abortController.signal.addEventListener('abort', abortEvent)
      function removeEvent() {
        abortController.signal.removeEventListener('abort', abortEvent)
      }

      const effect2 = yield* pipe(site, TopOkData.getTopIndex, getWaitNumber, do_sleep2(site))
      const either2 = yield* Effect.either(effect2)

      const map = Either.mapBoth(either2, {
        onLeft: (ngData) => {
          const topNgData = new TopSleepNGError(ngData)

          const message = topNgData.getMessage(site)
          Effect.runPromise(checkLog(message))

          const topMessage = topNgData.getTopMessage(site)
          return topMessage
        },
        onRight: (topOkData) =>
          Effect.gen(function* () {
            const message = TopOkData.getMessage(site)
            Effect.runPromise(checkLog(message))

            const either1 = yield* Effect.all(
              sites.map(mapper1_gen(abortController, topOkData.index)),
              options
            )
            const arrayEither1 = either1.map(mapNumber_fromData)
            const concurrency = options['concurrency']
            const chunkArray = Array.chunksOf(arrayEither1, concurrency)
            const waitNumber = Array.reduce(chunkArray, [0], sumNumber(concurrency))

            const topMessage = topOkData.getTopMessage(Math.max(...waitNumber))
            return topMessage
          })
      })

      const result = Either.isLeft(map) ? map : Either.right(yield* map.right)

      removeEvent()
      return result
    })

describe('effect-ts 非同期処理2つの配列', () => {
  test('使い方2ー全部同時実行からの全部同時実行', testOptions, async () => {
    const sites = [
      ['https://test/1/1'],
      ['https://test/2/1', 'https://test/2/2'],
      ['https://test/3/1', 'https://test/3/2', 'https://test/3/3'],
      ['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
      [
        'https://test/5/1',
        'https://test/5/2',
        'https://test/5/3',
        'https://test/5/4',
        'https://test/5/5'
      ],
      [
        'https://test/7/1',
        'https://test/7/2',
        'https://test/7/3',
        'https://test/7/4',
        'https://test/7/5',
        'https://test/7/6',
        'https://test/7/7'
      ]
    ]

    const abortController = new AbortController()

    Effect.runPromise(checkLog('[start]'))

    const effect = await Effect.all(
      sites.map(mapper2_gen(abortController, { concurrency: Number.MAX_VALUE })),
      {
        concurrency: Number.MAX_VALUE
      }
    )
    const exit = await Effect.runPromiseExit(effect, { signal: abortController.signal })

    console.log(
      Exit.match(exit, {
        onFailure: (cause) => `中断しました: ${Cause.pretty(cause)}`,
        onSuccess: (either) => either
      })
    )
  })
})

実行結果

time : 2025-02-26T13:54:41.810Z - text: [start]
time : 2025-02-26T13:54:41.825Z - text: mapper2-index = 3 - ng - Sleep NG before wait
time : 2025-02-26T13:54:42.827Z - text: mapper2-index = 1
time : 2025-02-26T13:54:43.829Z - text: mapper2-index = 2 - ng - 2000 - Sleep NG after wait
time : 2025-02-26T13:54:43.846Z - text: https://test/1/1 - ok - 1000
time : 2025-02-26T13:54:45.837Z - text: mapper2-index = 4 - ng - 4000 - Sleep NG after wait
time : 2025-02-26T13:54:46.826Z - text: mapper2-index = 5
time : 2025-02-26T13:54:46.835Z - text: https://test/5/5 - abort - topIndex + lastIndex >= 10
time : 2025-02-26T13:54:46.839Z - text: mapper2-index = 5 - aborted - from event
time : 2025-02-26T13:54:46.841Z - text: mapper2-index = 7 - aborted - from event
time : 2025-02-26T13:54:46.842Z - text: https://test/5/1 - aborted - from event
time : 2025-02-26T13:54:46.843Z - text: https://test/5/2 - aborted - from event
time : 2025-02-26T13:54:46.844Z - text: https://test/5/3 - aborted - from event
time : 2025-02-26T13:54:46.844Z - text: https://test/5/4 - aborted - from event
中断しました: All fibers interrupted without errors.

実行結果(※中断データを除外した場合)

time : 2025-02-26T13:56:17.360Z - text: [start]
time : 2025-02-26T13:56:17.375Z - text: mapper2-index = 3 - ng - Sleep NG before wait
time : 2025-02-26T13:56:18.380Z - text: mapper2-index = 1
time : 2025-02-26T13:56:19.379Z - text: mapper2-index = 2 - ng - 2000 - Sleep NG after wait
time : 2025-02-26T13:56:19.396Z - text: https://test/1/1 - ok - 1000
time : 2025-02-26T13:56:21.381Z - text: mapper2-index = 4 - ng - 4000 - Sleep NG after wait
time : 2025-02-26T13:56:22.379Z - text: mapper2-index = 5
time : 2025-02-26T13:56:22.383Z - text: https://test/5/3 - ng - Sleep NG before wait
time : 2025-02-26T13:56:23.386Z - text: https://test/5/1 - ok - 1000
time : 2025-02-26T13:56:24.381Z - text: mapper2-index = 7
time : 2025-02-26T13:56:24.388Z - text: https://test/5/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-26T13:56:25.398Z - text: https://test/7/1 - ok - 1000
time : 2025-02-26T13:56:26.397Z - text: https://test/5/4 - ng - 4000 - Sleep NG after wait
time : 2025-02-26T13:56:26.404Z - text: https://test/7/2 - ng - 2000 - Sleep NG after wait
[
  { _id: 'Either', _tag: 'Right', right: 'https://test/1 - ok - 2000' },
  {
    _id: 'Either',
    _tag: 'Left',
    left: 'https://test/2 - ng - 2000 - Sleep NG after wait'
  },
  {
    _id: 'Either',
    _tag: 'Left',
    left: 'https://test/3 - ng - Sleep NG before wait'
  },
  {
    _id: 'Either',
    _tag: 'Left',
    left: 'https://test/4 - ng - 4000 - Sleep NG after wait'
  },
  { _id: 'Either', _tag: 'Right', right: 'https://test/5 - ok - 9000' },
  { _id: 'Either', _tag: 'Right', right: 'https://test/7 - ok - 9000' }
]

全部同時実行からの直列実行

import { describe, test } from 'vitest'
import { Effect, pipe, Array, Either, Exit, Cause } from 'effect'
import { UnknownException } from 'effect/Cause'

class SleepNGBeforeError {
  static tagName = 'Sleep.Ng.Before'
  readonly _tag = SleepNGBeforeError.tagName

  static isClass(test: any): test is SleepNGBeforeError {
    return test._tag === SleepNGBeforeError.tagName
  }

  getMessage(site: string) {
    return `${site} - ng - Sleep NG before wait`
  }
}

class SleepNGAfterError {
  static tagName = 'Sleep.Ng.After'
  readonly _tag = SleepNGAfterError.tagName

  wait: number
  constructor(w: number) {
    this.wait = w
  }

  static isClass(test: any): test is SleepNGAfterError {
    return test._tag === SleepNGAfterError.tagName
  }

  getMessage(site: string) {
    return `${site} - ng - ${this.wait} - Sleep NG after wait`
  }
}

type AllNgData = SleepNGBeforeError | SleepNGAfterError | UnknownException

class TopSleepNGError {
  static tagName = 'Top.Sleep.Ng'
  readonly _tag = TopSleepNGError.tagName

  ngData: AllNgData
  constructor(ng: AllNgData) {
    this.ngData = ng
  }

  static isClass(test: any): test is SleepNGAfterError {
    return test._tag === SleepNGAfterError.tagName
  }

  getLastMessage() {
    const lastMessage =
      this.ngData instanceof UnknownException
        ? ' - ng - UnknownException'
        : this.ngData.getMessage('')
    return lastMessage
  }

  getMessage(site: string) {
    const index = TopOkData.getTopIndex(site)
    const lastMessage = this.getLastMessage()
    return `mapper2-index = ${index}${lastMessage}`
  }

  getTopMessage(site: string) {
    const lastMessage = this.getLastMessage()
    return `${site.substring(0, site.length - 2)}${lastMessage}`
  }
}

class AlreadyAbortError {
  static tagName = 'Already.Abort.Error'
  readonly _tag = AlreadyAbortError.tagName

  static isClass(test: any): test is AlreadyAbortError {
    return test._tag === AlreadyAbortError.tagName
  }

  static getMessage(site: string) {
    return `${site} - aborted - from event`
  }
}

class TopAlreadyAbortError {
  static tagName = 'Top.Already.Abort.Error'
  readonly _tag = TopAlreadyAbortError.tagName

  static isClass(test: any): test is TopAlreadyAbortError {
    return test._tag === TopAlreadyAbortError.tagName
  }

  static getMessage(site: string) {
    const topIndex = TopOkData.getTopIndex(site)
    return `mapper2-index = ${topIndex} - aborted - from event`
  }
}

class OriginAbortError {
  static tagName = 'Origin.Abort.Error'
  readonly _tag = OriginAbortError.tagName

  site: string
  reason: string
  constructor(s: string, r: string) {
    this.site = s
    this.reason = r
  }

  static isClass(test: any): test is OriginAbortError {
    return test._tag === OriginAbortError.tagName
  }

  getMessage() {
    return `${this.site} - abort - ${this.reason}`
  }

  static siteCheck(abortAction: Function, topIndex?: number) {
    return (site: string) => {
      const lastIndex = OkData.getLastIndex(site)
      let originAbortError = undefined
      if (Number.isNaN(lastIndex)) {
        originAbortError = new OriginAbortError(site, `${lastIndex} is NaN`)
      } else if (lastIndex === undefined) {
        originAbortError = new OriginAbortError(site, `${lastIndex} is undefined`)
      } else if (site[site.length - 2] !== '/') {
        originAbortError = new OriginAbortError(site, `lastIndex >= 10`)
      } else if (topIndex && topIndex + lastIndex >= 10) {
        originAbortError = new OriginAbortError(site, `topIndex + lastIndex >= 10`)
      }

      if (originAbortError !== undefined) {
        Effect.runPromise(checkLog(originAbortError.getMessage()))
        abortAction()
        return Effect.fail(originAbortError)
      }

      return Effect.succeed(site)
    }
  }
}

class TopOkData {
  site: string
  wait: number
  index: number
  constructor(s: string, ok: OkData) {
    this.site = s
    this.wait = ok.wait
    this.index = TopOkData.getTopIndex(s)
  }

  static isClass(test: any): test is TopOkData {
    return test instanceof TopOkData
  }

  static getTopIndex(site: string) {
    const index = site.substring(site.length - 3, site.length - 2)
    return Number(index)
  }

  static getMessage(site: string) {
    const index = TopOkData.getTopIndex(site)
    return `mapper2-index = ${index}`
  }

  getTopMessage(opWait: number) {
    return `${this.site.substring(0, this.site.length - 2)} - ok - ${this.wait + opWait}`
  }
}

class OkData {
  wait: number
  constructor(w: number) {
    this.wait = w
  }

  static isClass(test: any): test is OkData {
    return test instanceof OkData
  }

  static getLastIndex(site: string) {
    const index = site.slice(-1)
    return Number(index)
  }

  getMessage(site: string) {
    return `${site} - ok - ${this.wait}`
  }
}

const sleep = (wait: number) =>
  Effect.tryPromise<OkData>(
    () =>
      new Promise((resolve, reject) => {
        const waitSeconds = wait / 1000
        if (waitSeconds % 3 === 0) {
          reject(new SleepNGBeforeError())
        } else if (waitSeconds % 2 === 0) {
          setTimeout(() => reject(new SleepNGAfterError(wait)), wait)
        } else {
          setTimeout(() => resolve(new OkData(wait)), wait)
        }
      })
  )

const checkLog = (text: string) =>
  Effect.sync(() => console.log(`time : ${new Date().toISOString()} - text: ${text}`))

const testOptions = {
  timeout: 1000 * 100
}

const sumNumber = (chunkSize: number) => (a: number[], b: number[]) => {
  function addArray(num: number) {
    if (a.length === chunkSize) {
      const aMin = Math.min(...a)
      const aMinIndex = a.indexOf(aMin)
      a[aMinIndex] += num
    } else {
      a.push(num)
    }
  }

  b.forEach((bNum) => addArray(bNum))

  return a
}

function getWaitNumber(index: number) {
  const wait = index * 1000
  return wait
}

function mapError_sleepNg<T>(
  site: string,
  effectAndThen: Effect.Effect<T, UnknownException, never>
) {
  const effectAll = Effect.mapError(effectAndThen, (err) => {
    if (SleepNGBeforeError.isClass(err.error)) {
      return err.error
    } else if (SleepNGAfterError.isClass(err.error)) {
      return err.error
    } else {
      Effect.runPromise(checkLog(`mapError_sleepNg : UnknownException : ${err}`))
      return err
    }
  })
  return effectAll
}

function mapNumber_fromData<T, U>(data: Either.Either<T, U>) {
  const result = Either.mapBoth(data, {
    onLeft: (left) => {
      if (OkData.isClass(left)) {
        return left.wait
      } else if (SleepNGBeforeError.isClass(left)) {
        return 0
      } else if (SleepNGAfterError.isClass(left)) {
        return left.wait
      }

      return 0
    },
    onRight: (right) => {
      if (OkData.isClass(right)) {
        return right.wait
      }

      return 0
    }
  })

  return Either.isLeft(result) ? result.left : result.right
}

const do_sleep1 = (site: string) => (wait: number) =>
  Effect.try(() => {
    const sleepEffect = sleep(wait)
    const effectAll = mapError_sleepNg(site, sleepEffect)
    return effectAll
  })

const do_sleep2 = (site: string) => (wait: number) =>
  Effect.try(() => {
    const sleepEffect = sleep(wait)
    const effectAndThen = Effect.andThen(sleepEffect, (okData) => {
      const topOkData = new TopOkData(site, okData)
      return topOkData
    })
    const effectAll = mapError_sleepNg(site, effectAndThen)
    return effectAll
  })

const mapper1_gen = (abortController: AbortController, topIndex?: number) => (site: string) =>
  Effect.gen(function* () {
    if (abortController.signal.aborted) {
      const alreadyAbortError = new AlreadyAbortError()
      Effect.runPromise(checkLog(AlreadyAbortError.getMessage(site)))
      return Either.left(alreadyAbortError)
    }

    const abortEvent = () => {
      Effect.runPromise(checkLog(AlreadyAbortError.getMessage(site)))
    }
    abortController.signal.addEventListener('abort', abortEvent)
    function removeEvent() {
      abortController.signal.removeEventListener('abort', abortEvent)
    }
    const abortAction = () => {
      removeEvent()
      abortController.abort()
    }

    const abortCheck = yield* pipe(site, OriginAbortError.siteCheck(abortAction, topIndex))
    const effect = yield* pipe(abortCheck, OkData.getLastIndex, getWaitNumber, do_sleep1(site))
    const either = yield* Effect.either(effect)

    if (Either.isRight(either)) {
      const okData = either.right

      const message = okData.getMessage(site)
      Effect.runPromise(checkLog(message))
    } else if (Either.isLeft(either)) {
      const ngData = either.left

      if (SleepNGBeforeError.isClass(ngData)) {
        const message = ngData.getMessage(site)
        Effect.runPromise(checkLog(message))
      } else if (SleepNGAfterError.isClass(ngData)) {
        const message = ngData.getMessage(site)
        Effect.runPromise(checkLog(message))
      }
    }

    removeEvent()
    return either
  })

const mapper2_gen =
  (abortController: AbortController, options: { concurrency: number }) => (sites: string[]) =>
    Effect.gen(function* () {
      const site = sites[0]

      if (abortController.signal.aborted) {
        const topAlreadyAbortError = new TopAlreadyAbortError()
        Effect.runPromise(checkLog(TopAlreadyAbortError.getMessage(site)))
        return Either.left(topAlreadyAbortError)
      }

      const abortEvent = () => {
        Effect.runPromise(checkLog(TopAlreadyAbortError.getMessage(site)))
      }
      abortController.signal.addEventListener('abort', abortEvent)
      function removeEvent() {
        abortController.signal.removeEventListener('abort', abortEvent)
      }

      const effect2 = yield* pipe(site, TopOkData.getTopIndex, getWaitNumber, do_sleep2(site))
      const either2 = yield* Effect.either(effect2)

      const map = Either.mapBoth(either2, {
        onLeft: (ngData) => {
          const topNgData = new TopSleepNGError(ngData)

          const message = topNgData.getMessage(site)
          Effect.runPromise(checkLog(message))

          const topMessage = topNgData.getTopMessage(site)
          return topMessage
        },
        onRight: (topOkData) =>
          Effect.gen(function* () {
            const message = TopOkData.getMessage(site)
            Effect.runPromise(checkLog(message))

            const either1 = yield* Effect.all(
              sites.map(mapper1_gen(abortController, topOkData.index)),
              options
            )
            const arrayEither1 = either1.map(mapNumber_fromData)
            const concurrency = options['concurrency']
            const chunkArray = Array.chunksOf(arrayEither1, concurrency)
            const waitNumber = Array.reduce(chunkArray, [0], sumNumber(concurrency))

            const topMessage = topOkData.getTopMessage(Math.max(...waitNumber))
            return topMessage
          })
      })

      const result = Either.isLeft(map) ? map : Either.right(yield* map.right)

      removeEvent()
      return result
    })

describe('effect-ts 非同期処理2つの配列', () => {
  test('使い方4ー全部同時実行からの直列実行', testOptions, async () => {
    const sites = [
      ['https://test/1/1'],
      ['https://test/2/1', 'https://test/2/2'],
      ['https://test/3/1', 'https://test/3/2', 'https://test/3/3'],
      ['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
      [
        'https://test/5/1',
        'https://test/5/2',
        'https://test/5/3',
        'https://test/5/4',
        'https://test/5/5'
      ],
      [
        'https://test/7/1',
        'https://test/7/2',
        'https://test/7/3',
        'https://test/7/4',
        'https://test/7/5',
        'https://test/7/6',
        'https://test/7/7'
      ]
    ]

    const abortController = new AbortController()

    Effect.runPromise(checkLog('[start]'))

    const effect = await Effect.all(sites.map(mapper2_gen(abortController, { concurrency: 1 })), {
      concurrency: Number.MAX_VALUE
    })
    const exit = await Effect.runPromiseExit(effect, { signal: abortController.signal })

    console.log(
      Exit.match(exit, {
        onFailure: (cause) => `中断しました: ${Cause.pretty(cause)}`,
        onSuccess: (either) => either
      })
    )
  })
})

実行結果

time : 2025-02-26T13:57:47.543Z - text: [start]
time : 2025-02-26T13:57:47.558Z - text: mapper2-index = 3 - ng - Sleep NG before wait
time : 2025-02-26T13:57:48.564Z - text: mapper2-index = 1
time : 2025-02-26T13:57:49.562Z - text: mapper2-index = 2 - ng - 2000 - Sleep NG after wait
time : 2025-02-26T13:57:49.577Z - text: https://test/1/1 - ok - 1000
time : 2025-02-26T13:57:51.564Z - text: mapper2-index = 4 - ng - 4000 - Sleep NG after wait
time : 2025-02-26T13:57:52.563Z - text: mapper2-index = 5
time : 2025-02-26T13:57:53.578Z - text: https://test/5/1 - ok - 1000
time : 2025-02-26T13:57:54.563Z - text: mapper2-index = 7
time : 2025-02-26T13:57:55.567Z - text: https://test/7/1 - ok - 1000
time : 2025-02-26T13:57:55.581Z - text: https://test/5/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-26T13:57:55.583Z - text: https://test/5/3 - ng - Sleep NG before wait
time : 2025-02-26T13:57:57.582Z - text: https://test/7/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-26T13:57:57.582Z - text: https://test/7/3 - abort - topIndex + lastIndex >= 10
time : 2025-02-26T13:57:57.584Z - text: mapper2-index = 5 - aborted - from event
time : 2025-02-26T13:57:57.584Z - text: mapper2-index = 7 - aborted - from event
time : 2025-02-26T13:57:57.584Z - text: https://test/5/4 - aborted - from event
中断しました: All fibers interrupted without errors.

実行結果(※中断データを除外した場合)

time : 2025-02-26T13:59:52.548Z - text: [start]
time : 2025-02-26T13:59:52.563Z - text: mapper2-index = 3 - ng - Sleep NG before wait
time : 2025-02-26T13:59:53.572Z - text: mapper2-index = 1
time : 2025-02-26T13:59:54.562Z - text: mapper2-index = 2 - ng - 2000 - Sleep NG after wait
time : 2025-02-26T13:59:54.582Z - text: https://test/1/1 - ok - 1000
time : 2025-02-26T13:59:56.572Z - text: mapper2-index = 4 - ng - 4000 - Sleep NG after wait
time : 2025-02-26T13:59:57.577Z - text: mapper2-index = 5
time : 2025-02-26T13:59:58.596Z - text: https://test/5/1 - ok - 1000
time : 2025-02-26T13:59:59.563Z - text: mapper2-index = 7
time : 2025-02-26T14:00:00.580Z - text: https://test/7/1 - ok - 1000
time : 2025-02-26T14:00:00.602Z - text: https://test/5/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-26T14:00:00.604Z - text: https://test/5/3 - ng - Sleep NG before wait
time : 2025-02-26T14:00:02.594Z - text: https://test/7/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-26T14:00:04.609Z - text: https://test/5/4 - ng - 4000 - Sleep NG after wait
[
  { _id: 'Either', _tag: 'Right', right: 'https://test/1 - ok - 2000' },
  {
    _id: 'Either',
    _tag: 'Left',
    left: 'https://test/2 - ng - 2000 - Sleep NG after wait'
  },
  {
    _id: 'Either',
    _tag: 'Left',
    left: 'https://test/3 - ng - Sleep NG before wait'
  },
  {
    _id: 'Either',
    _tag: 'Left',
    left: 'https://test/4 - ng - 4000 - Sleep NG after wait'
  },
  {
    _id: 'Either',
    _tag: 'Right',
    right: 'https://test/5 - ok - 12000'
  },
  {
    _id: 'Either',
    _tag: 'Right',
    right: 'https://test/7 - ok - 10000'
  }
]

全部同時実行からの最大2個まで同時実行

import { describe, test } from 'vitest'
import { Effect, pipe, Array, Either, Exit, Cause } from 'effect'
import { UnknownException } from 'effect/Cause'

class SleepNGBeforeError {
  static tagName = 'Sleep.Ng.Before'
  readonly _tag = SleepNGBeforeError.tagName

  static isClass(test: any): test is SleepNGBeforeError {
    return test._tag === SleepNGBeforeError.tagName
  }

  getMessage(site: string) {
    return `${site} - ng - Sleep NG before wait`
  }
}

class SleepNGAfterError {
  static tagName = 'Sleep.Ng.After'
  readonly _tag = SleepNGAfterError.tagName

  wait: number
  constructor(w: number) {
    this.wait = w
  }

  static isClass(test: any): test is SleepNGAfterError {
    return test._tag === SleepNGAfterError.tagName
  }

  getMessage(site: string) {
    return `${site} - ng - ${this.wait} - Sleep NG after wait`
  }
}

type AllNgData = SleepNGBeforeError | SleepNGAfterError | UnknownException

class TopSleepNGError {
  static tagName = 'Top.Sleep.Ng'
  readonly _tag = TopSleepNGError.tagName

  ngData: AllNgData
  constructor(ng: AllNgData) {
    this.ngData = ng
  }

  static isClass(test: any): test is SleepNGAfterError {
    return test._tag === SleepNGAfterError.tagName
  }

  getLastMessage() {
    const lastMessage =
      this.ngData instanceof UnknownException
        ? ' - ng - UnknownException'
        : this.ngData.getMessage('')
    return lastMessage
  }

  getMessage(site: string) {
    const index = TopOkData.getTopIndex(site)
    const lastMessage = this.getLastMessage()
    return `mapper2-index = ${index}${lastMessage}`
  }

  getTopMessage(site: string) {
    const lastMessage = this.getLastMessage()
    return `${site.substring(0, site.length - 2)}${lastMessage}`
  }
}

class AlreadyAbortError {
  static tagName = 'Already.Abort.Error'
  readonly _tag = AlreadyAbortError.tagName

  static isClass(test: any): test is AlreadyAbortError {
    return test._tag === AlreadyAbortError.tagName
  }

  static getMessage(site: string) {
    return `${site} - aborted - from event`
  }
}

class TopAlreadyAbortError {
  static tagName = 'Top.Already.Abort.Error'
  readonly _tag = TopAlreadyAbortError.tagName

  static isClass(test: any): test is TopAlreadyAbortError {
    return test._tag === TopAlreadyAbortError.tagName
  }

  static getMessage(site: string) {
    const topIndex = TopOkData.getTopIndex(site)
    return `mapper2-index = ${topIndex} - aborted - from event`
  }
}

class OriginAbortError {
  static tagName = 'Origin.Abort.Error'
  readonly _tag = OriginAbortError.tagName

  site: string
  reason: string
  constructor(s: string, r: string) {
    this.site = s
    this.reason = r
  }

  static isClass(test: any): test is OriginAbortError {
    return test._tag === OriginAbortError.tagName
  }

  getMessage() {
    return `${this.site} - abort - ${this.reason}`
  }

  static siteCheck(abortAction: Function, topIndex?: number) {
    return (site: string) => {
      const lastIndex = OkData.getLastIndex(site)
      let originAbortError = undefined
      if (Number.isNaN(lastIndex)) {
        originAbortError = new OriginAbortError(site, `${lastIndex} is NaN`)
      } else if (lastIndex === undefined) {
        originAbortError = new OriginAbortError(site, `${lastIndex} is undefined`)
      } else if (site[site.length - 2] !== '/') {
        originAbortError = new OriginAbortError(site, `lastIndex >= 10`)
      } else if (topIndex && topIndex + lastIndex >= 10) {
        originAbortError = new OriginAbortError(site, `topIndex + lastIndex >= 10`)
      }

      if (originAbortError !== undefined) {
        Effect.runPromise(checkLog(originAbortError.getMessage()))
        abortAction()
        return Effect.fail(originAbortError)
      }

      return Effect.succeed(site)
    }
  }
}

class TopOkData {
  site: string
  wait: number
  index: number
  constructor(s: string, ok: OkData) {
    this.site = s
    this.wait = ok.wait
    this.index = TopOkData.getTopIndex(s)
  }

  static isClass(test: any): test is TopOkData {
    return test instanceof TopOkData
  }

  static getTopIndex(site: string) {
    const index = site.substring(site.length - 3, site.length - 2)
    return Number(index)
  }

  static getMessage(site: string) {
    const index = TopOkData.getTopIndex(site)
    return `mapper2-index = ${index}`
  }

  getTopMessage(opWait: number) {
    return `${this.site.substring(0, this.site.length - 2)} - ok - ${this.wait + opWait}`
  }
}

class OkData {
  wait: number
  constructor(w: number) {
    this.wait = w
  }

  static isClass(test: any): test is OkData {
    return test instanceof OkData
  }

  static getLastIndex(site: string) {
    const index = site.slice(-1)
    return Number(index)
  }

  getMessage(site: string) {
    return `${site} - ok - ${this.wait}`
  }
}

const sleep = (wait: number) =>
  Effect.tryPromise<OkData>(
    () =>
      new Promise((resolve, reject) => {
        const waitSeconds = wait / 1000
        if (waitSeconds % 3 === 0) {
          reject(new SleepNGBeforeError())
        } else if (waitSeconds % 2 === 0) {
          setTimeout(() => reject(new SleepNGAfterError(wait)), wait)
        } else {
          setTimeout(() => resolve(new OkData(wait)), wait)
        }
      })
  )

const checkLog = (text: string) =>
  Effect.sync(() => console.log(`time : ${new Date().toISOString()} - text: ${text}`))

const testOptions = {
  timeout: 1000 * 100
}

const sumNumber = (chunkSize: number) => (a: number[], b: number[]) => {
  function addArray(num: number) {
    if (a.length === chunkSize) {
      const aMin = Math.min(...a)
      const aMinIndex = a.indexOf(aMin)
      a[aMinIndex] += num
    } else {
      a.push(num)
    }
  }

  b.forEach((bNum) => addArray(bNum))

  return a
}

function getWaitNumber(index: number) {
  const wait = index * 1000
  return wait
}

function mapError_sleepNg<T>(
  site: string,
  effectAndThen: Effect.Effect<T, UnknownException, never>
) {
  const effectAll = Effect.mapError(effectAndThen, (err) => {
    if (SleepNGBeforeError.isClass(err.error)) {
      return err.error
    } else if (SleepNGAfterError.isClass(err.error)) {
      return err.error
    } else {
      Effect.runPromise(checkLog(`mapError_sleepNg : UnknownException : ${err}`))
      return err
    }
  })
  return effectAll
}

function mapNumber_fromData<T, U>(data: Either.Either<T, U>) {
  const result = Either.mapBoth(data, {
    onLeft: (left) => {
      if (OkData.isClass(left)) {
        return left.wait
      } else if (SleepNGBeforeError.isClass(left)) {
        return 0
      } else if (SleepNGAfterError.isClass(left)) {
        return left.wait
      }

      return 0
    },
    onRight: (right) => {
      if (OkData.isClass(right)) {
        return right.wait
      }

      return 0
    }
  })

  return Either.isLeft(result) ? result.left : result.right
}

const do_sleep1 = (site: string) => (wait: number) =>
  Effect.try(() => {
    const sleepEffect = sleep(wait)
    const effectAll = mapError_sleepNg(site, sleepEffect)
    return effectAll
  })

const do_sleep2 = (site: string) => (wait: number) =>
  Effect.try(() => {
    const sleepEffect = sleep(wait)
    const effectAndThen = Effect.andThen(sleepEffect, (okData) => {
      const topOkData = new TopOkData(site, okData)
      return topOkData
    })
    const effectAll = mapError_sleepNg(site, effectAndThen)
    return effectAll
  })

const mapper1_gen = (abortController: AbortController, topIndex?: number) => (site: string) =>
  Effect.gen(function* () {
    if (abortController.signal.aborted) {
      const alreadyAbortError = new AlreadyAbortError()
      Effect.runPromise(checkLog(AlreadyAbortError.getMessage(site)))
      return Either.left(alreadyAbortError)
    }

    const abortEvent = () => {
      Effect.runPromise(checkLog(AlreadyAbortError.getMessage(site)))
    }
    abortController.signal.addEventListener('abort', abortEvent)
    function removeEvent() {
      abortController.signal.removeEventListener('abort', abortEvent)
    }
    const abortAction = () => {
      removeEvent()
      abortController.abort()
    }

    const abortCheck = yield* pipe(site, OriginAbortError.siteCheck(abortAction, topIndex))
    const effect = yield* pipe(abortCheck, OkData.getLastIndex, getWaitNumber, do_sleep1(site))
    const either = yield* Effect.either(effect)

    if (Either.isRight(either)) {
      const okData = either.right

      const message = okData.getMessage(site)
      Effect.runPromise(checkLog(message))
    } else if (Either.isLeft(either)) {
      const ngData = either.left

      if (SleepNGBeforeError.isClass(ngData)) {
        const message = ngData.getMessage(site)
        Effect.runPromise(checkLog(message))
      } else if (SleepNGAfterError.isClass(ngData)) {
        const message = ngData.getMessage(site)
        Effect.runPromise(checkLog(message))
      }
    }

    removeEvent()
    return either
  })

const mapper2_gen =
  (abortController: AbortController, options: { concurrency: number }) => (sites: string[]) =>
    Effect.gen(function* () {
      const site = sites[0]

      if (abortController.signal.aborted) {
        const topAlreadyAbortError = new TopAlreadyAbortError()
        Effect.runPromise(checkLog(TopAlreadyAbortError.getMessage(site)))
        return Either.left(topAlreadyAbortError)
      }

      const abortEvent = () => {
        Effect.runPromise(checkLog(TopAlreadyAbortError.getMessage(site)))
      }
      abortController.signal.addEventListener('abort', abortEvent)
      function removeEvent() {
        abortController.signal.removeEventListener('abort', abortEvent)
      }

      const effect2 = yield* pipe(site, TopOkData.getTopIndex, getWaitNumber, do_sleep2(site))
      const either2 = yield* Effect.either(effect2)

      const map = Either.mapBoth(either2, {
        onLeft: (ngData) => {
          const topNgData = new TopSleepNGError(ngData)

          const message = topNgData.getMessage(site)
          Effect.runPromise(checkLog(message))

          const topMessage = topNgData.getTopMessage(site)
          return topMessage
        },
        onRight: (topOkData) =>
          Effect.gen(function* () {
            const message = TopOkData.getMessage(site)
            Effect.runPromise(checkLog(message))

            const either1 = yield* Effect.all(
              sites.map(mapper1_gen(abortController, topOkData.index)),
              options
            )
            const arrayEither1 = either1.map(mapNumber_fromData)
            const concurrency = options['concurrency']
            const chunkArray = Array.chunksOf(arrayEither1, concurrency)
            const waitNumber = Array.reduce(chunkArray, [0], sumNumber(concurrency))

            const topMessage = topOkData.getTopMessage(Math.max(...waitNumber))
            return topMessage
          })
      })

      const result = Either.isLeft(map) ? map : Either.right(yield* map.right)

      removeEvent()
      return result
    })

describe('effect-ts 非同期処理2つの配列', () => {
  test('使い方6ー全部同時実行からの最大2個まで同時実行', testOptions, async () => {
    const sites = [
      ['https://test/1/1'],
      ['https://test/2/1', 'https://test/2/2'],
      ['https://test/3/1', 'https://test/3/2', 'https://test/3/3'],
      ['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
      [
        'https://test/5/1',
        'https://test/5/2',
        'https://test/5/3',
        'https://test/5/4',
        'https://test/5/5'
      ],
      [
        'https://test/7/1',
        'https://test/7/2',
        'https://test/7/3',
        'https://test/7/4',
        'https://test/7/5',
        'https://test/7/6',
        'https://test/7/7'
      ]
    ]

    const abortController = new AbortController()

    Effect.runPromise(checkLog('[start]'))

    const effect = await Effect.all(sites.map(mapper2_gen(abortController, { concurrency: 2 })), {
      concurrency: Number.MAX_VALUE
    })
    const exit = await Effect.runPromiseExit(effect, { signal: abortController.signal })

    console.log(
      Exit.match(exit, {
        onFailure: (cause) => `中断しました: ${Cause.pretty(cause)}`,
        onSuccess: (either) => either
      })
    )
  })
})

実行結果

time : 2025-02-26T14:01:36.468Z - text: [start]
time : 2025-02-26T14:01:36.482Z - text: mapper2-index = 3 - ng - Sleep NG before wait
time : 2025-02-26T14:01:37.484Z - text: mapper2-index = 1
time : 2025-02-26T14:01:38.485Z - text: mapper2-index = 2 - ng - 2000 - Sleep NG after wait
time : 2025-02-26T14:01:38.499Z - text: https://test/1/1 - ok - 1000
time : 2025-02-26T14:01:40.484Z - text: mapper2-index = 4 - ng - 4000 - Sleep NG after wait
time : 2025-02-26T14:01:41.495Z - text: mapper2-index = 5
time : 2025-02-26T14:01:42.501Z - text: https://test/5/1 - ok - 1000
time : 2025-02-26T14:01:42.504Z - text: https://test/5/3 - ng - Sleep NG before wait
time : 2025-02-26T14:01:43.495Z - text: mapper2-index = 7
time : 2025-02-26T14:01:43.503Z - text: https://test/5/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-26T14:01:43.503Z - text: https://test/5/5 - abort - topIndex + lastIndex >= 10
time : 2025-02-26T14:01:43.505Z - text: mapper2-index = 5 - aborted - from event
time : 2025-02-26T14:01:43.506Z - text: mapper2-index = 7 - aborted - from event
time : 2025-02-26T14:01:43.506Z - text: https://test/5/4 - aborted - from event
time : 2025-02-26T14:01:43.507Z - text: https://test/7/1 - aborted - from event
time : 2025-02-26T14:01:43.507Z - text: https://test/7/2 - aborted - from event
中断しました: All fibers interrupted without errors.

実行結果(※中断データを除外した場合)

time : 2025-02-26T14:03:02.231Z - text: [start]
time : 2025-02-26T14:03:02.251Z - text: mapper2-index = 3 - ng - Sleep NG before wait
time : 2025-02-26T14:03:03.249Z - text: mapper2-index = 1
time : 2025-02-26T14:03:04.262Z - text: mapper2-index = 2 - ng - 2000 - Sleep NG after wait
time : 2025-02-26T14:03:04.265Z - text: https://test/1/1 - ok - 1000
time : 2025-02-26T14:03:06.265Z - text: mapper2-index = 4 - ng - 4000 - Sleep NG after wait
time : 2025-02-26T14:03:07.253Z - text: mapper2-index = 5
time : 2025-02-26T14:03:08.260Z - text: https://test/5/1 - ok - 1000
time : 2025-02-26T14:03:08.265Z - text: https://test/5/3 - ng - Sleep NG before wait
time : 2025-02-26T14:03:09.251Z - text: mapper2-index = 7
time : 2025-02-26T14:03:09.269Z - text: https://test/5/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-26T14:03:10.255Z - text: https://test/7/1 - ok - 1000
time : 2025-02-26T14:03:11.264Z - text: https://test/7/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-26T14:03:12.275Z - text: https://test/5/4 - ng - 4000 - Sleep NG after wait
[
  { _id: 'Either', _tag: 'Right', right: 'https://test/1 - ok - 2000' },
  {
    _id: 'Either',
    _tag: 'Left',
    left: 'https://test/2 - ng - 2000 - Sleep NG after wait'
  },
  {
    _id: 'Either',
    _tag: 'Left',
    left: 'https://test/3 - ng - Sleep NG before wait'
  },
  {
    _id: 'Either',
    _tag: 'Left',
    left: 'https://test/4 - ng - 4000 - Sleep NG after wait'
  },
  {
    _id: 'Either',
    _tag: 'Right',
    right: 'https://test/5 - ok - 10000'
  },
  { _id: 'Either', _tag: 'Right', right: 'https://test/7 - ok - 9000' }
]
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?