0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

effect-tsで配列の配列の非同期処理のPromiseのreject対応

Posted at

配列の配列の非同期処理で対象API(以下ではsleep)がrejectする場合のメモ
書き方は、まだまだどちらがよいかわからないところがある

前回

配列が1つの場合

全部同時実行

import { describe, test } from 'vitest'
import { Effect, pipe, Either } from 'effect'
import { UnknownException } from 'effect/Cause'

class SleepNGBeforeError {
  static tagName = 'Sleep.Ng.Before'
  readonly _tag = SleepNGBeforeError.tagName

  static isClass(test: any): test is SleepNGBeforeError {
    return test._tag === SleepNGBeforeError.tagName
  }

  getMessage(site: string) {
    return `${site} - ng - Sleep NG before wait`
  }
}

class SleepNGAfterError {
  static tagName = 'Sleep.Ng.After'
  readonly _tag = SleepNGAfterError.tagName

  wait: number
  constructor(w: number) {
    this.wait = w
  }

  static isClass(test: any): test is SleepNGAfterError {
    return test._tag === SleepNGAfterError.tagName
  }

  getMessage(site: string) {
    return `${site} - ng - ${this.wait} - Sleep NG after wait`
  }
}

class OkData {
  wait: number
  constructor(w: number) {
    this.wait = w
  }

  static isClass(test: any): test is OkData {
    return test instanceof OkData
  }

  static getLastIndex(site: string) {
    const index = site.slice(-1)
    return Number(index)
  }

  getMessage(site: string) {
    return `${site} - ok - ${this.wait}`
  }
}

const sleep = (wait: number) =>
  Effect.tryPromise<OkData>(
    () =>
      new Promise((resolve, reject) => {
        const waitSeconds = wait / 1000
        if (waitSeconds % 3 === 0) {
          reject(new SleepNGBeforeError())
        } else if (waitSeconds % 2 === 0) {
          setTimeout(() => reject(new SleepNGAfterError(wait)), wait)
        } else {
          setTimeout(() => resolve(new OkData(wait)), wait)
        }
      })
  )

const checkLog = (text: string) =>
  Effect.sync(() => console.log(`time : ${new Date().toISOString()} - text: ${text}`))

const testOptions = {
  timeout: 1000 * 100
}

function getWaitNumber(index: number) {
  const wait = index * 1000
  return wait
}

function mapError_sleepNg<T>(
  site: string,
  effectAndThen: Effect.Effect<T, UnknownException, never>
) {
  const effectAll = Effect.mapError(effectAndThen, (err) => {
    if (SleepNGBeforeError.isClass(err.error)) {
      return err.error
    } else if (SleepNGAfterError.isClass(err.error)) {
      return err.error
    } else {
      Effect.runPromise(checkLog(`mapError_sleepNg : UnknownException : ${err}`))
      return err
    }
  })
  return effectAll
}

const do_sleep1 = (site: string) => (wait: number) =>
  Effect.try(() => {
    const sleepEffect = sleep(wait)
    const effectAll = mapError_sleepNg(site, sleepEffect)
    return effectAll
  })

const mapper1_gen = (site: string) =>
  Effect.gen(function* () {
    const effect = yield* pipe(site, OkData.getLastIndex, getWaitNumber, do_sleep1(site))
    const either = yield* Effect.either(effect)

    if (Either.isRight(either)) {
      const okData = either.right

      const message = okData.getMessage(site)
      Effect.runPromise(checkLog(message))
    } else if (Either.isLeft(either)) {
      const ngData = either.left

      if (SleepNGBeforeError.isClass(ngData)) {
        const message = ngData.getMessage(site)
        Effect.runPromise(checkLog(message))
      } else if (SleepNGAfterError.isClass(ngData)) {
        const message = ngData.getMessage(site)
        Effect.runPromise(checkLog(message))
      }
    }

    return either
  })

describe('effect-ts 非同期処理1つの配列', () => {
  test('使い方1ー全部同時実行', testOptions, async () => {
    const sites = [
      'https://test/1',
      'https://test/2',
      'https://test/3',
      'https://test/4',
      'https://test/5'
    ]

    Effect.runPromise(checkLog('[start]'))

    const effect = await Effect.all(sites.map(mapper1_gen), {
      concurrency: sites.length
    })
    const either = await Effect.runPromise(effect)

    console.log(either)
  })
})

実行結果

time : 2025-02-24T11:24:36.510Z - text: [start]
time : 2025-02-24T11:24:36.523Z - text: https://test/3 - ng - Sleep NG before wait
time : 2025-02-24T11:24:37.526Z - text: https://test/1 - ok - 1000
time : 2025-02-24T11:24:38.538Z - text: https://test/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-24T11:24:40.525Z - text: https://test/4 - ng - 4000 - Sleep NG after wait
time : 2025-02-24T11:24:41.525Z - text: https://test/5 - ok - 5000
[
  { _id: 'Either', _tag: 'Right', right: OkData { wait: 1000 } },
  {
    _id: 'Either',
    _tag: 'Left',
    left: SleepNGAfterError { _tag: 'Sleep.Ng.After', wait: 2000 }
  },
  {
    _id: 'Either',
    _tag: 'Left',
    left: SleepNGBeforeError { _tag: 'Sleep.Ng.Before' }
  },
  {
    _id: 'Either',
    _tag: 'Left',
    left: SleepNGAfterError { _tag: 'Sleep.Ng.After', wait: 4000 }
  },
  { _id: 'Either', _tag: 'Right', right: OkData { wait: 5000 } }
]

最大2個まで同時実行

import { describe, test } from 'vitest'
import { Effect, pipe, Either } from 'effect'
import { UnknownException } from 'effect/Cause'

class SleepNGBeforeError {
  static tagName = 'Sleep.Ng.Before'
  readonly _tag = SleepNGBeforeError.tagName

  static isClass(test: any): test is SleepNGBeforeError {
    return test._tag === SleepNGBeforeError.tagName
  }

  getMessage(site: string) {
    return `${site} - ng - Sleep NG before wait`
  }
}

class SleepNGAfterError {
  static tagName = 'Sleep.Ng.After'
  readonly _tag = SleepNGAfterError.tagName

  wait: number
  constructor(w: number) {
    this.wait = w
  }

  static isClass(test: any): test is SleepNGAfterError {
    return test._tag === SleepNGAfterError.tagName
  }

  getMessage(site: string) {
    return `${site} - ng - ${this.wait} - Sleep NG after wait`
  }
}

class OkData {
  wait: number
  constructor(w: number) {
    this.wait = w
  }

  static isClass(test: any): test is OkData {
    return test instanceof OkData
  }

  static getLastIndex(site: string) {
    const index = site.slice(-1)
    return Number(index)
  }

  getMessage(site: string) {
    return `${site} - ok - ${this.wait}`
  }
}

const sleep = (wait: number) =>
  Effect.tryPromise<OkData>(
    () =>
      new Promise((resolve, reject) => {
        const waitSeconds = wait / 1000
        if (waitSeconds % 3 === 0) {
          reject(new SleepNGBeforeError())
        } else if (waitSeconds % 2 === 0) {
          setTimeout(() => reject(new SleepNGAfterError(wait)), wait)
        } else {
          setTimeout(() => resolve(new OkData(wait)), wait)
        }
      })
  )

const checkLog = (text: string) =>
  Effect.sync(() => console.log(`time : ${new Date().toISOString()} - text: ${text}`))

const testOptions = {
  timeout: 1000 * 100
}

function getWaitNumber(index: number) {
  const wait = index * 1000
  return wait
}

function mapError_sleepNg<T>(
  site: string,
  effectAndThen: Effect.Effect<T, UnknownException, never>
) {
  const effectAll = Effect.mapError(effectAndThen, (err) => {
    if (SleepNGBeforeError.isClass(err.error)) {
      return err.error
    } else if (SleepNGAfterError.isClass(err.error)) {
      return err.error
    } else {
      Effect.runPromise(checkLog(`mapError_sleepNg : UnknownException : ${err}`))
      return err
    }
  })
  return effectAll
}

const do_sleep1 = (site: string) => (wait: number) =>
  Effect.try(() => {
    const sleepEffect = sleep(wait)
    const effectAll = mapError_sleepNg(site, sleepEffect)
    return effectAll
  })

const mapper1_gen = (site: string) =>
  Effect.gen(function* () {
    const effect = yield* pipe(site, OkData.getLastIndex, getWaitNumber, do_sleep1(site))
    const either = yield* Effect.either(effect)

    if (Either.isRight(either)) {
      const okData = either.right

      const message = okData.getMessage(site)
      Effect.runPromise(checkLog(message))
    } else if (Either.isLeft(either)) {
      const ngData = either.left

      if (SleepNGBeforeError.isClass(ngData)) {
        const message = ngData.getMessage(site)
        Effect.runPromise(checkLog(message))
      } else if (SleepNGAfterError.isClass(ngData)) {
        const message = ngData.getMessage(site)
        Effect.runPromise(checkLog(message))
      }
    }

    return either
  })

describe('effect-ts 非同期処理1つの配列', () => {
  test('使い方2ー最大2個まで同時実行', testOptions, async () => {
    const sites = [
      'https://test/1',
      'https://test/2',
      'https://test/3',
      'https://test/4',
      'https://test/5'
    ]

    Effect.runPromise(checkLog('[start]'))

    const effect = await Effect.all(sites.map(mapper1_gen), {
      concurrency: 2
    })
    const either = await Effect.runPromise(effect)

    console.log(either)
  })
})

実行結果

time : 2025-02-24T11:30:04.377Z - text: [start]
time : 2025-02-24T11:30:05.391Z - text: https://test/1 - ok - 1000
time : 2025-02-24T11:30:05.400Z - text: https://test/3 - ng - Sleep NG before wait
time : 2025-02-24T11:30:06.401Z - text: https://test/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-24T11:30:09.406Z - text: https://test/4 - ng - 4000 - Sleep NG after wait
time : 2025-02-24T11:30:11.407Z - text: https://test/5 - ok - 5000
[
  { _id: 'Either', _tag: 'Right', right: OkData { wait: 1000 } },
  {
    _id: 'Either',
    _tag: 'Left',
    left: SleepNGAfterError { _tag: 'Sleep.Ng.After', wait: 2000 }
  },
  {
    _id: 'Either',
    _tag: 'Left',
    left: SleepNGBeforeError { _tag: 'Sleep.Ng.Before' }
  },
  {
    _id: 'Either',
    _tag: 'Left',
    left: SleepNGAfterError { _tag: 'Sleep.Ng.After', wait: 4000 }
  },
  { _id: 'Either', _tag: 'Right', right: OkData { wait: 5000 } }
]

配列が2つの場合(シンプル)

全部同時実行からの全部同時実行

import { describe, test } from 'vitest'
import { Effect, pipe, Array, Either } from 'effect'
import { UnknownException } from 'effect/Cause'

class SleepNGBeforeError {
  static tagName = 'Sleep.Ng.Before'
  readonly _tag = SleepNGBeforeError.tagName

  static isClass(test: any): test is SleepNGBeforeError {
    return test._tag === SleepNGBeforeError.tagName
  }

  getMessage(site: string) {
    return `${site} - ng - Sleep NG before wait`
  }
}

class SleepNGAfterError {
  static tagName = 'Sleep.Ng.After'
  readonly _tag = SleepNGAfterError.tagName

  wait: number
  constructor(w: number) {
    this.wait = w
  }

  static isClass(test: any): test is SleepNGAfterError {
    return test._tag === SleepNGAfterError.tagName
  }

  getMessage(site: string) {
    return `${site} - ng - ${this.wait} - Sleep NG after wait`
  }
}

type AllNgData = SleepNGBeforeError | SleepNGAfterError | UnknownException

class TopSleepNGError {
  static tagName = 'Top.Sleep.Ng'
  readonly _tag = TopSleepNGError.tagName

  ngData: AllNgData
  constructor(ng: AllNgData) {
    this.ngData = ng
  }

  static isClass(test: any): test is SleepNGAfterError {
    return test._tag === SleepNGAfterError.tagName
  }

  getLastMessage() {
    const lastMessage =
      this.ngData instanceof UnknownException
        ? ' - ng - UnknownException'
        : this.ngData.getMessage('')
    return lastMessage
  }

  getMessage(site: string) {
    const index = TopOkData.getTopIndex(site)
    const lastMessage = this.getLastMessage()
    return `mapper2-index = ${index}${lastMessage}`
  }

  getTopMessage(site: string) {
    const lastMessage = this.getLastMessage()
    return `${site.substring(0, site.length - 2)}${lastMessage}`
  }
}

class TopOkData {
  site: string
  wait: number
  constructor(s: string, ok: OkData) {
    this.site = s
    this.wait = ok.wait
  }

  static isClass(test: any): test is TopOkData {
    return test instanceof TopOkData
  }

  static getTopIndex(site: string) {
    const index = site.substring(site.length - 3, site.length - 2)
    return Number(index)
  }

  static getMessage(site: string) {
    const index = TopOkData.getTopIndex(site)
    return `mapper2-index = ${index}`
  }

  getTopMessage(opWait: number) {
    return `${this.site.substring(0, this.site.length - 2)} - ok - ${this.wait + opWait}`
  }
}

class OkData {
  wait: number
  constructor(w: number) {
    this.wait = w
  }

  static isClass(test: any): test is OkData {
    return test instanceof OkData
  }

  static getLastIndex(site: string) {
    const index = site.slice(-1)
    return Number(index)
  }

  getMessage(site: string) {
    return `${site} - ok - ${this.wait}`
  }
}

const sleep = (wait: number) =>
  Effect.tryPromise<OkData>(
    () =>
      new Promise((resolve, reject) => {
        const waitSeconds = wait / 1000
        if (waitSeconds % 3 === 0) {
          reject(new SleepNGBeforeError())
        } else if (waitSeconds % 2 === 0) {
          setTimeout(() => reject(new SleepNGAfterError(wait)), wait)
        } else {
          setTimeout(() => resolve(new OkData(wait)), wait)
        }
      })
  )

const checkLog = (text: string) =>
  Effect.sync(() => console.log(`time : ${new Date().toISOString()} - text: ${text}`))

const testOptions = {
  timeout: 1000 * 100
}

const sumNumber = (chunkSize: number) => (a: number[], b: number[]) => {
  function addArray(num: number) {
    if (a.length === chunkSize) {
      const aMin = Math.min(...a)
      const aMinIndex = a.indexOf(aMin)
      a[aMinIndex] += num
    } else {
      a.push(num)
    }
  }

  b.forEach((bNum) => addArray(bNum))

  return a
}

function getWaitNumber(index: number) {
  const wait = index * 1000
  return wait
}

function mapError_sleepNg<T>(
  site: string,
  effectAndThen: Effect.Effect<T, UnknownException, never>
) {
  const effectAll = Effect.mapError(effectAndThen, (err) => {
    if (SleepNGBeforeError.isClass(err.error)) {
      return err.error
    } else if (SleepNGAfterError.isClass(err.error)) {
      return err.error
    } else {
      Effect.runPromise(checkLog(`mapError_sleepNg : UnknownException : ${err}`))
      return err
    }
  })
  return effectAll
}

function mapNumber_fromData<T, U>(data: Either.Either<T, U>) {
  const result = Either.mapBoth(data, {
    onLeft: (left) => {
      if (OkData.isClass(left)) {
        return left.wait
      } else if (SleepNGBeforeError.isClass(left)) {
        return 0
      } else if (SleepNGAfterError.isClass(left)) {
        return left.wait
      }

      return 0
    },
    onRight: (right) => {
      if (OkData.isClass(right)) {
        return right.wait
      }

      return 0
    }
  })

  return Either.isLeft(result) ? result.left : result.right
}

const do_sleep1 = (site: string) => (wait: number) =>
  Effect.try(() => {
    const sleepEffect = sleep(wait)
    const effectAll = mapError_sleepNg(site, sleepEffect)
    return effectAll
  })

const do_sleep2 = (site: string) => (wait: number) =>
  Effect.try(() => {
    const sleepEffect = sleep(wait)
    const effectAndThen = Effect.andThen(sleepEffect, (okData) => {
      const topOkData = new TopOkData(site, okData)
      return topOkData
    })
    const effectAll = mapError_sleepNg(site, effectAndThen)
    return effectAll
  })

const mapper1_gen = (site: string) =>
  Effect.gen(function* () {
    const effect = yield* pipe(site, OkData.getLastIndex, getWaitNumber, do_sleep1(site))
    const either = yield* Effect.either(effect)

    if (Either.isRight(either)) {
      const okData = either.right

      const message = okData.getMessage(site)
      Effect.runPromise(checkLog(message))
    } else if (Either.isLeft(either)) {
      const ngData = either.left

      if (SleepNGBeforeError.isClass(ngData)) {
        const message = ngData.getMessage(site)
        Effect.runPromise(checkLog(message))
      } else if (SleepNGAfterError.isClass(ngData)) {
        const message = ngData.getMessage(site)
        Effect.runPromise(checkLog(message))
      }
    }

    return either
  })

const mapper2_gen = (options: { concurrency: number }) => (sites: string[]) =>
  Effect.gen(function* () {
    const site = sites[0]
    const effect2 = yield* pipe(site, TopOkData.getTopIndex, getWaitNumber, do_sleep2(site))
    const either2 = yield* Effect.either(effect2)

    const map = Either.mapBoth(either2, {
      onLeft: (ngData) => {
        const topNgData = new TopSleepNGError(ngData)

        const message = topNgData.getMessage(site)
        Effect.runPromise(checkLog(message))

        const topMessage = topNgData.getTopMessage(site)
        return topMessage
      },
      onRight: (topOkData) =>
        Effect.gen(function* () {
          const message = TopOkData.getMessage(site)
          Effect.runPromise(checkLog(message))

          const either1 = yield* Effect.all(sites.map(mapper1_gen), options)
          const arrayEither1 = either1.map(mapNumber_fromData)
          const concurrency = options['concurrency']
          const chunkArray = Array.chunksOf(arrayEither1, concurrency)
          const waitNumber = Array.reduce(chunkArray, [0], sumNumber(concurrency))

          const topMessage = topOkData.getTopMessage(Math.max(...waitNumber))
          return topMessage
        })
    })

    const result = Either.isLeft(map) ? map : Either.right(yield* map.right)

    return result
  })

describe('effect-ts 非同期処理2つの配列', () => {
  test('使い方1ー全部同時実行からの全部同時実行', testOptions, async () => {
    const sites = [
      // ['https://test/1/1'],
      // ['https://test/2/1', 'https://test/2/2']
      // ['https://test/3/1', 'https://test/3/2', 'https://test/3/3']
      // ['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
      [
        'https://test/5/1',
        'https://test/5/2',
        'https://test/5/3',
        'https://test/5/4',
        'https://test/5/5'
      ]
    ]

    Effect.runPromise(checkLog('[start]'))

    const effect = await Effect.all(sites.map(mapper2_gen({ concurrency: Number.MAX_VALUE })), {
      concurrency: Number.MAX_VALUE
    })
    const result = await Effect.runPromise(effect)

    console.log(result)
  })
})

実行結果

time : 2025-02-24T11:32:12.775Z - text: [start]
time : 2025-02-24T11:32:17.820Z - text: mapper2-index = 5
time : 2025-02-24T11:32:17.833Z - text: https://test/5/3 - ng - Sleep NG before wait
time : 2025-02-24T11:32:18.839Z - text: https://test/5/1 - ok - 1000
time : 2025-02-24T11:32:19.837Z - text: https://test/5/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-24T11:32:21.829Z - text: https://test/5/4 - ng - 4000 - Sleep NG after wait
time : 2025-02-24T11:32:22.837Z - text: https://test/5/5 - ok - 5000
[
  {
    _id: 'Either',
    _tag: 'Right',
    right: 'https://test/5 - ok - 10000'
  }
]

全部同時実行からの直列実行

import { describe, test } from 'vitest'
import { Effect, pipe, Array, Either } from 'effect'
import { UnknownException } from 'effect/Cause'

class SleepNGBeforeError {
  static tagName = 'Sleep.Ng.Before'
  readonly _tag = SleepNGBeforeError.tagName

  static isClass(test: any): test is SleepNGBeforeError {
    return test._tag === SleepNGBeforeError.tagName
  }

  getMessage(site: string) {
    return `${site} - ng - Sleep NG before wait`
  }
}

class SleepNGAfterError {
  static tagName = 'Sleep.Ng.After'
  readonly _tag = SleepNGAfterError.tagName

  wait: number
  constructor(w: number) {
    this.wait = w
  }

  static isClass(test: any): test is SleepNGAfterError {
    return test._tag === SleepNGAfterError.tagName
  }

  getMessage(site: string) {
    return `${site} - ng - ${this.wait} - Sleep NG after wait`
  }
}

type AllNgData = SleepNGBeforeError | SleepNGAfterError | UnknownException

class TopSleepNGError {
  static tagName = 'Top.Sleep.Ng'
  readonly _tag = TopSleepNGError.tagName

  ngData: AllNgData
  constructor(ng: AllNgData) {
    this.ngData = ng
  }

  static isClass(test: any): test is SleepNGAfterError {
    return test._tag === SleepNGAfterError.tagName
  }

  getLastMessage() {
    const lastMessage =
      this.ngData instanceof UnknownException
        ? ' - ng - UnknownException'
        : this.ngData.getMessage('')
    return lastMessage
  }

  getMessage(site: string) {
    const index = TopOkData.getTopIndex(site)
    const lastMessage = this.getLastMessage()
    return `mapper2-index = ${index}${lastMessage}`
  }

  getTopMessage(site: string) {
    const lastMessage = this.getLastMessage()
    return `${site.substring(0, site.length - 2)}${lastMessage}`
  }
}

class TopOkData {
  site: string
  wait: number
  constructor(s: string, ok: OkData) {
    this.site = s
    this.wait = ok.wait
  }

  static isClass(test: any): test is TopOkData {
    return test instanceof TopOkData
  }

  static getTopIndex(site: string) {
    const index = site.substring(site.length - 3, site.length - 2)
    return Number(index)
  }

  static getMessage(site: string) {
    const index = TopOkData.getTopIndex(site)
    return `mapper2-index = ${index}`
  }

  getTopMessage(opWait: number) {
    return `${this.site.substring(0, this.site.length - 2)} - ok - ${this.wait + opWait}`
  }
}

class OkData {
  wait: number
  constructor(w: number) {
    this.wait = w
  }

  static isClass(test: any): test is OkData {
    return test instanceof OkData
  }

  static getLastIndex(site: string) {
    const index = site.slice(-1)
    return Number(index)
  }

  getMessage(site: string) {
    return `${site} - ok - ${this.wait}`
  }
}

const sleep = (wait: number) =>
  Effect.tryPromise<OkData>(
    () =>
      new Promise((resolve, reject) => {
        const waitSeconds = wait / 1000
        if (waitSeconds % 3 === 0) {
          reject(new SleepNGBeforeError())
        } else if (waitSeconds % 2 === 0) {
          setTimeout(() => reject(new SleepNGAfterError(wait)), wait)
        } else {
          setTimeout(() => resolve(new OkData(wait)), wait)
        }
      })
  )

const checkLog = (text: string) =>
  Effect.sync(() => console.log(`time : ${new Date().toISOString()} - text: ${text}`))

const testOptions = {
  timeout: 1000 * 100
}

const sumNumber = (chunkSize: number) => (a: number[], b: number[]) => {
  function addArray(num: number) {
    if (a.length === chunkSize) {
      const aMin = Math.min(...a)
      const aMinIndex = a.indexOf(aMin)
      a[aMinIndex] += num
    } else {
      a.push(num)
    }
  }

  b.forEach((bNum) => addArray(bNum))

  return a
}

function getWaitNumber(index: number) {
  const wait = index * 1000
  return wait
}

function mapError_sleepNg<T>(
  site: string,
  effectAndThen: Effect.Effect<T, UnknownException, never>
) {
  const effectAll = Effect.mapError(effectAndThen, (err) => {
    if (SleepNGBeforeError.isClass(err.error)) {
      return err.error
    } else if (SleepNGAfterError.isClass(err.error)) {
      return err.error
    } else {
      Effect.runPromise(checkLog(`mapError_sleepNg : UnknownException : ${err}`))
      return err
    }
  })
  return effectAll
}

function mapNumber_fromData<T, U>(data: Either.Either<T, U>) {
  const result = Either.mapBoth(data, {
    onLeft: (left) => {
      if (OkData.isClass(left)) {
        return left.wait
      } else if (SleepNGBeforeError.isClass(left)) {
        return 0
      } else if (SleepNGAfterError.isClass(left)) {
        return left.wait
      }

      return 0
    },
    onRight: (right) => {
      if (OkData.isClass(right)) {
        return right.wait
      }

      return 0
    }
  })

  return Either.isLeft(result) ? result.left : result.right
}

const do_sleep1 = (site: string) => (wait: number) =>
  Effect.try(() => {
    const sleepEffect = sleep(wait)
    const effectAll = mapError_sleepNg(site, sleepEffect)
    return effectAll
  })

const do_sleep2 = (site: string) => (wait: number) =>
  Effect.try(() => {
    const sleepEffect = sleep(wait)
    const effectAndThen = Effect.andThen(sleepEffect, (okData) => {
      const topOkData = new TopOkData(site, okData)
      return topOkData
    })
    const effectAll = mapError_sleepNg(site, effectAndThen)
    return effectAll
  })

const mapper1_gen = (site: string) =>
  Effect.gen(function* () {
    const effect = yield* pipe(site, OkData.getLastIndex, getWaitNumber, do_sleep1(site))
    const either = yield* Effect.either(effect)

    if (Either.isRight(either)) {
      const okData = either.right

      const message = okData.getMessage(site)
      Effect.runPromise(checkLog(message))
    } else if (Either.isLeft(either)) {
      const ngData = either.left

      if (SleepNGBeforeError.isClass(ngData)) {
        const message = ngData.getMessage(site)
        Effect.runPromise(checkLog(message))
      } else if (SleepNGAfterError.isClass(ngData)) {
        const message = ngData.getMessage(site)
        Effect.runPromise(checkLog(message))
      }
    }

    return either
  })

const mapper2_gen = (options: { concurrency: number }) => (sites: string[]) =>
  Effect.gen(function* () {
    const site = sites[0]
    const effect2 = yield* pipe(site, TopOkData.getTopIndex, getWaitNumber, do_sleep2(site))
    const either2 = yield* Effect.either(effect2)

    const map = Either.mapBoth(either2, {
      onLeft: (ngData) => {
        const topNgData = new TopSleepNGError(ngData)

        const message = topNgData.getMessage(site)
        Effect.runPromise(checkLog(message))

        const topMessage = topNgData.getTopMessage(site)
        return topMessage
      },
      onRight: (topOkData) =>
        Effect.gen(function* () {
          const message = TopOkData.getMessage(site)
          Effect.runPromise(checkLog(message))

          const either1 = yield* Effect.all(sites.map(mapper1_gen), options)
          const arrayEither1 = either1.map(mapNumber_fromData)
          const concurrency = options['concurrency']
          const chunkArray = Array.chunksOf(arrayEither1, concurrency)
          const waitNumber = Array.reduce(chunkArray, [0], sumNumber(concurrency))

          const topMessage = topOkData.getTopMessage(Math.max(...waitNumber))
          return topMessage
        })
    })

    const result = Either.isLeft(map) ? map : Either.right(yield* map.right)

    return result
  })

describe('effect-ts 非同期処理2つの配列', () => {
  test('使い方3ー全部同時実行からの直列実行', testOptions, async () => {
    const sites = [
      // ['https://test/1/1'],
      // ['https://test/2/1', 'https://test/2/2']
      // ['https://test/3/1', 'https://test/3/2', 'https://test/3/3']
      // ['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
      [
        'https://test/5/1',
        'https://test/5/2',
        'https://test/5/3',
        'https://test/5/4',
        'https://test/5/5'
      ]
    ]

    Effect.runPromise(checkLog('[start]'))

    const effect = await Effect.all(sites.map(mapper2_gen({ concurrency: 1 })), {
      concurrency: Number.MAX_VALUE
    })
    const result = await Effect.runPromise(effect)

    console.log(result)
  })
})

実行結果

time : 2025-02-24T11:33:38.612Z - text: [start]
time : 2025-02-24T11:33:43.637Z - text: mapper2-index = 5
time : 2025-02-24T11:33:44.649Z - text: https://test/5/1 - ok - 1000
time : 2025-02-24T11:33:46.670Z - text: https://test/5/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-24T11:33:46.671Z - text: https://test/5/3 - ng - Sleep NG before wait
time : 2025-02-24T11:33:50.678Z - text: https://test/5/4 - ng - 4000 - Sleep NG after wait
time : 2025-02-24T11:33:55.683Z - text: https://test/5/5 - ok - 5000
[
  {
    _id: 'Either',
    _tag: 'Right',
    right: 'https://test/5 - ok - 17000'
  }
]

全部同時実行からの最大2個まで同時実行

import { describe, test } from 'vitest'
import { Effect, pipe, Array, Either } from 'effect'
import { UnknownException } from 'effect/Cause'

class SleepNGBeforeError {
  static tagName = 'Sleep.Ng.Before'
  readonly _tag = SleepNGBeforeError.tagName

  static isClass(test: any): test is SleepNGBeforeError {
    return test._tag === SleepNGBeforeError.tagName
  }

  getMessage(site: string) {
    return `${site} - ng - Sleep NG before wait`
  }
}

class SleepNGAfterError {
  static tagName = 'Sleep.Ng.After'
  readonly _tag = SleepNGAfterError.tagName

  wait: number
  constructor(w: number) {
    this.wait = w
  }

  static isClass(test: any): test is SleepNGAfterError {
    return test._tag === SleepNGAfterError.tagName
  }

  getMessage(site: string) {
    return `${site} - ng - ${this.wait} - Sleep NG after wait`
  }
}

type AllNgData = SleepNGBeforeError | SleepNGAfterError | UnknownException

class TopSleepNGError {
  static tagName = 'Top.Sleep.Ng'
  readonly _tag = TopSleepNGError.tagName

  ngData: AllNgData
  constructor(ng: AllNgData) {
    this.ngData = ng
  }

  static isClass(test: any): test is SleepNGAfterError {
    return test._tag === SleepNGAfterError.tagName
  }

  getLastMessage() {
    const lastMessage =
      this.ngData instanceof UnknownException
        ? ' - ng - UnknownException'
        : this.ngData.getMessage('')
    return lastMessage
  }

  getMessage(site: string) {
    const index = TopOkData.getTopIndex(site)
    const lastMessage = this.getLastMessage()
    return `mapper2-index = ${index}${lastMessage}`
  }

  getTopMessage(site: string) {
    const lastMessage = this.getLastMessage()
    return `${site.substring(0, site.length - 2)}${lastMessage}`
  }
}

class TopOkData {
  site: string
  wait: number
  constructor(s: string, ok: OkData) {
    this.site = s
    this.wait = ok.wait
  }

  static isClass(test: any): test is TopOkData {
    return test instanceof TopOkData
  }

  static getTopIndex(site: string) {
    const index = site.substring(site.length - 3, site.length - 2)
    return Number(index)
  }

  static getMessage(site: string) {
    const index = TopOkData.getTopIndex(site)
    return `mapper2-index = ${index}`
  }

  getTopMessage(opWait: number) {
    return `${this.site.substring(0, this.site.length - 2)} - ok - ${this.wait + opWait}`
  }
}

class OkData {
  wait: number
  constructor(w: number) {
    this.wait = w
  }

  static isClass(test: any): test is OkData {
    return test instanceof OkData
  }

  static getLastIndex(site: string) {
    const index = site.slice(-1)
    return Number(index)
  }

  getMessage(site: string) {
    return `${site} - ok - ${this.wait}`
  }
}

const sleep = (wait: number) =>
  Effect.tryPromise<OkData>(
    () =>
      new Promise((resolve, reject) => {
        const waitSeconds = wait / 1000
        if (waitSeconds % 3 === 0) {
          reject(new SleepNGBeforeError())
        } else if (waitSeconds % 2 === 0) {
          setTimeout(() => reject(new SleepNGAfterError(wait)), wait)
        } else {
          setTimeout(() => resolve(new OkData(wait)), wait)
        }
      })
  )

const checkLog = (text: string) =>
  Effect.sync(() => console.log(`time : ${new Date().toISOString()} - text: ${text}`))

const testOptions = {
  timeout: 1000 * 100
}

const sumNumber = (chunkSize: number) => (a: number[], b: number[]) => {
  function addArray(num: number) {
    if (a.length === chunkSize) {
      const aMin = Math.min(...a)
      const aMinIndex = a.indexOf(aMin)
      a[aMinIndex] += num
    } else {
      a.push(num)
    }
  }

  b.forEach((bNum) => addArray(bNum))

  return a
}

function getWaitNumber(index: number) {
  const wait = index * 1000
  return wait
}

function mapError_sleepNg<T>(
  site: string,
  effectAndThen: Effect.Effect<T, UnknownException, never>
) {
  const effectAll = Effect.mapError(effectAndThen, (err) => {
    if (SleepNGBeforeError.isClass(err.error)) {
      return err.error
    } else if (SleepNGAfterError.isClass(err.error)) {
      return err.error
    } else {
      Effect.runPromise(checkLog(`mapError_sleepNg : UnknownException : ${err}`))
      return err
    }
  })
  return effectAll
}

function mapNumber_fromData<T, U>(data: Either.Either<T, U>) {
  const result = Either.mapBoth(data, {
    onLeft: (left) => {
      if (OkData.isClass(left)) {
        return left.wait
      } else if (SleepNGBeforeError.isClass(left)) {
        return 0
      } else if (SleepNGAfterError.isClass(left)) {
        return left.wait
      }

      return 0
    },
    onRight: (right) => {
      if (OkData.isClass(right)) {
        return right.wait
      }

      return 0
    }
  })

  return Either.isLeft(result) ? result.left : result.right
}

const do_sleep1 = (site: string) => (wait: number) =>
  Effect.try(() => {
    const sleepEffect = sleep(wait)
    const effectAll = mapError_sleepNg(site, sleepEffect)
    return effectAll
  })

const do_sleep2 = (site: string) => (wait: number) =>
  Effect.try(() => {
    const sleepEffect = sleep(wait)
    const effectAndThen = Effect.andThen(sleepEffect, (okData) => {
      const topOkData = new TopOkData(site, okData)
      return topOkData
    })
    const effectAll = mapError_sleepNg(site, effectAndThen)
    return effectAll
  })

const mapper1_gen = (site: string) =>
  Effect.gen(function* () {
    const effect = yield* pipe(site, OkData.getLastIndex, getWaitNumber, do_sleep1(site))
    const either = yield* Effect.either(effect)

    if (Either.isRight(either)) {
      const okData = either.right

      const message = okData.getMessage(site)
      Effect.runPromise(checkLog(message))
    } else if (Either.isLeft(either)) {
      const ngData = either.left

      if (SleepNGBeforeError.isClass(ngData)) {
        const message = ngData.getMessage(site)
        Effect.runPromise(checkLog(message))
      } else if (SleepNGAfterError.isClass(ngData)) {
        const message = ngData.getMessage(site)
        Effect.runPromise(checkLog(message))
      }
    }

    return either
  })

const mapper2_gen = (options: { concurrency: number }) => (sites: string[]) =>
  Effect.gen(function* () {
    const site = sites[0]
    const effect2 = yield* pipe(site, TopOkData.getTopIndex, getWaitNumber, do_sleep2(site))
    const either2 = yield* Effect.either(effect2)

    const map = Either.mapBoth(either2, {
      onLeft: (ngData) => {
        const topNgData = new TopSleepNGError(ngData)

        const message = topNgData.getMessage(site)
        Effect.runPromise(checkLog(message))

        const topMessage = topNgData.getTopMessage(site)
        return topMessage
      },
      onRight: (topOkData) =>
        Effect.gen(function* () {
          const message = TopOkData.getMessage(site)
          Effect.runPromise(checkLog(message))

          const either1 = yield* Effect.all(sites.map(mapper1_gen), options)
          const arrayEither1 = either1.map(mapNumber_fromData)
          const concurrency = options['concurrency']
          const chunkArray = Array.chunksOf(arrayEither1, concurrency)
          const waitNumber = Array.reduce(chunkArray, [0], sumNumber(concurrency))

          const topMessage = topOkData.getTopMessage(Math.max(...waitNumber))
          return topMessage
        })
    })

    const result = Either.isLeft(map) ? map : Either.right(yield* map.right)

    return result
  })

describe('effect-ts 非同期処理2つの配列', () => {
  test('使い方5ー全部同時実行からの最大2個まで同時実行', testOptions, async () => {
    const sites = [
      // ['https://test/1/1'],
      // ['https://test/2/1', 'https://test/2/2']
      // ['https://test/3/1', 'https://test/3/2', 'https://test/3/3']
      // ['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
      [
        'https://test/5/1',
        'https://test/5/2',
        'https://test/5/3',
        'https://test/5/4',
        'https://test/5/5'
      ]
    ]

    Effect.runPromise(checkLog('[start]'))

    const effect = await Effect.all(sites.map(mapper2_gen({ concurrency: 2 })), {
      concurrency: Number.MAX_VALUE
    })
    const result = await Effect.runPromise(effect)

    console.log(result)
  })
})

実行結果

time : 2025-02-24T11:35:37.523Z - text: [start]
time : 2025-02-24T11:35:42.542Z - text: mapper2-index = 5
time : 2025-02-24T11:35:43.562Z - text: https://test/5/1 - ok - 1000
time : 2025-02-24T11:35:43.576Z - text: https://test/5/3 - ng - Sleep NG before wait
time : 2025-02-24T11:35:44.561Z - text: https://test/5/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-24T11:35:47.587Z - text: https://test/5/4 - ng - 4000 - Sleep NG after wait
time : 2025-02-24T11:35:49.566Z - text: https://test/5/5 - ok - 5000
[
  {
    _id: 'Either',
    _tag: 'Right',
    right: 'https://test/5 - ok - 12000'
  }
]

配列が2つの場合(複雑)

全部同時実行からの全部同時実行

import { describe, test } from 'vitest'
import { Effect, pipe, Array, Either } from 'effect'
import { UnknownException } from 'effect/Cause'

class SleepNGBeforeError {
  static tagName = 'Sleep.Ng.Before'
  readonly _tag = SleepNGBeforeError.tagName

  static isClass(test: any): test is SleepNGBeforeError {
    return test._tag === SleepNGBeforeError.tagName
  }

  getMessage(site: string) {
    return `${site} - ng - Sleep NG before wait`
  }
}

class SleepNGAfterError {
  static tagName = 'Sleep.Ng.After'
  readonly _tag = SleepNGAfterError.tagName

  wait: number
  constructor(w: number) {
    this.wait = w
  }

  static isClass(test: any): test is SleepNGAfterError {
    return test._tag === SleepNGAfterError.tagName
  }

  getMessage(site: string) {
    return `${site} - ng - ${this.wait} - Sleep NG after wait`
  }
}

type AllNgData = SleepNGBeforeError | SleepNGAfterError | UnknownException

class TopSleepNGError {
  static tagName = 'Top.Sleep.Ng'
  readonly _tag = TopSleepNGError.tagName

  ngData: AllNgData
  constructor(ng: AllNgData) {
    this.ngData = ng
  }

  static isClass(test: any): test is SleepNGAfterError {
    return test._tag === SleepNGAfterError.tagName
  }

  getLastMessage() {
    const lastMessage =
      this.ngData instanceof UnknownException
        ? ' - ng - UnknownException'
        : this.ngData.getMessage('')
    return lastMessage
  }

  getMessage(site: string) {
    const index = TopOkData.getTopIndex(site)
    const lastMessage = this.getLastMessage()
    return `mapper2-index = ${index}${lastMessage}`
  }

  getTopMessage(site: string) {
    const lastMessage = this.getLastMessage()
    return `${site.substring(0, site.length - 2)}${lastMessage}`
  }
}

class TopOkData {
  site: string
  wait: number
  constructor(s: string, ok: OkData) {
    this.site = s
    this.wait = ok.wait
  }

  static isClass(test: any): test is TopOkData {
    return test instanceof TopOkData
  }

  static getTopIndex(site: string) {
    const index = site.substring(site.length - 3, site.length - 2)
    return Number(index)
  }

  static getMessage(site: string) {
    const index = TopOkData.getTopIndex(site)
    return `mapper2-index = ${index}`
  }

  getTopMessage(opWait: number) {
    return `${this.site.substring(0, this.site.length - 2)} - ok - ${this.wait + opWait}`
  }
}

class OkData {
  wait: number
  constructor(w: number) {
    this.wait = w
  }

  static isClass(test: any): test is OkData {
    return test instanceof OkData
  }

  static getLastIndex(site: string) {
    const index = site.slice(-1)
    return Number(index)
  }

  getMessage(site: string) {
    return `${site} - ok - ${this.wait}`
  }
}

const sleep = (wait: number) =>
  Effect.tryPromise<OkData>(
    () =>
      new Promise((resolve, reject) => {
        const waitSeconds = wait / 1000
        if (waitSeconds % 3 === 0) {
          reject(new SleepNGBeforeError())
        } else if (waitSeconds % 2 === 0) {
          setTimeout(() => reject(new SleepNGAfterError(wait)), wait)
        } else {
          setTimeout(() => resolve(new OkData(wait)), wait)
        }
      })
  )

const checkLog = (text: string) =>
  Effect.sync(() => console.log(`time : ${new Date().toISOString()} - text: ${text}`))

const testOptions = {
  timeout: 1000 * 100
}

const sumNumber = (chunkSize: number) => (a: number[], b: number[]) => {
  function addArray(num: number) {
    if (a.length === chunkSize) {
      const aMin = Math.min(...a)
      const aMinIndex = a.indexOf(aMin)
      a[aMinIndex] += num
    } else {
      a.push(num)
    }
  }

  b.forEach((bNum) => addArray(bNum))

  return a
}

function getWaitNumber(index: number) {
  const wait = index * 1000
  return wait
}

function mapError_sleepNg<T>(
  site: string,
  effectAndThen: Effect.Effect<T, UnknownException, never>
) {
  const effectAll = Effect.mapError(effectAndThen, (err) => {
    if (SleepNGBeforeError.isClass(err.error)) {
      return err.error
    } else if (SleepNGAfterError.isClass(err.error)) {
      return err.error
    } else {
      Effect.runPromise(checkLog(`mapError_sleepNg : UnknownException : ${err}`))
      return err
    }
  })
  return effectAll
}

function mapNumber_fromData<T, U>(data: Either.Either<T, U>) {
  const result = Either.mapBoth(data, {
    onLeft: (left) => {
      if (OkData.isClass(left)) {
        return left.wait
      } else if (SleepNGBeforeError.isClass(left)) {
        return 0
      } else if (SleepNGAfterError.isClass(left)) {
        return left.wait
      }

      return 0
    },
    onRight: (right) => {
      if (OkData.isClass(right)) {
        return right.wait
      }

      return 0
    }
  })

  return Either.isLeft(result) ? result.left : result.right
}

const do_sleep1 = (site: string) => (wait: number) =>
  Effect.try(() => {
    const sleepEffect = sleep(wait)
    const effectAll = mapError_sleepNg(site, sleepEffect)
    return effectAll
  })

const do_sleep2 = (site: string) => (wait: number) =>
  Effect.try(() => {
    const sleepEffect = sleep(wait)
    const effectAndThen = Effect.andThen(sleepEffect, (okData) => {
      const topOkData = new TopOkData(site, okData)
      return topOkData
    })
    const effectAll = mapError_sleepNg(site, effectAndThen)
    return effectAll
  })

const mapper1_gen = (site: string) =>
  Effect.gen(function* () {
    const effect = yield* pipe(site, OkData.getLastIndex, getWaitNumber, do_sleep1(site))
    const either = yield* Effect.either(effect)

    if (Either.isRight(either)) {
      const okData = either.right

      const message = okData.getMessage(site)
      Effect.runPromise(checkLog(message))
    } else if (Either.isLeft(either)) {
      const ngData = either.left

      if (SleepNGBeforeError.isClass(ngData)) {
        const message = ngData.getMessage(site)
        Effect.runPromise(checkLog(message))
      } else if (SleepNGAfterError.isClass(ngData)) {
        const message = ngData.getMessage(site)
        Effect.runPromise(checkLog(message))
      }
    }

    return either
  })

const mapper2_gen = (options: { concurrency: number }) => (sites: string[]) =>
  Effect.gen(function* () {
    const site = sites[0]
    const effect2 = yield* pipe(site, TopOkData.getTopIndex, getWaitNumber, do_sleep2(site))
    const either2 = yield* Effect.either(effect2)

    const map = Either.mapBoth(either2, {
      onLeft: (ngData) => {
        const topNgData = new TopSleepNGError(ngData)

        const message = topNgData.getMessage(site)
        Effect.runPromise(checkLog(message))

        const topMessage = topNgData.getTopMessage(site)
        return topMessage
      },
      onRight: (topOkData) =>
        Effect.gen(function* () {
          const message = TopOkData.getMessage(site)
          Effect.runPromise(checkLog(message))

          const either1 = yield* Effect.all(sites.map(mapper1_gen), options)
          const arrayEither1 = either1.map(mapNumber_fromData)
          const concurrency = options['concurrency']
          const chunkArray = Array.chunksOf(arrayEither1, concurrency)
          const waitNumber = Array.reduce(chunkArray, [0], sumNumber(concurrency))

          const topMessage = topOkData.getTopMessage(Math.max(...waitNumber))
          return topMessage
        })
    })

    const result = Either.isLeft(map) ? map : Either.right(yield* map.right)

    return result
  })

describe('effect-ts 非同期処理2つの配列', () => {
  test('使い方2ー全部同時実行からの全部同時実行', testOptions, async () => {
    const sites = [
      ['https://test/1/1'],
      ['https://test/2/1', 'https://test/2/2'],
      ['https://test/3/1', 'https://test/3/2', 'https://test/3/3'],
      ['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
      [
        'https://test/5/1',
        'https://test/5/2',
        'https://test/5/3',
        'https://test/5/4',
        'https://test/5/5'
      ]
    ]

    Effect.runPromise(checkLog('[start]'))

    const effect = await Effect.all(sites.map(mapper2_gen({ concurrency: Number.MAX_VALUE })), {
      concurrency: Number.MAX_VALUE
    })
    const result = await Effect.runPromise(effect)

    console.log(result)
  })
})

実行結果

time : 2025-02-24T11:37:05.712Z - text: [start]
time : 2025-02-24T11:37:05.725Z - text: mapper2-index = 3 - ng - Sleep NG before wait
time : 2025-02-24T11:37:06.736Z - text: mapper2-index = 1
time : 2025-02-24T11:37:07.739Z - text: mapper2-index = 2 - ng - 2000 - Sleep NG after wait
time : 2025-02-24T11:37:07.744Z - text: https://test/1/1 - ok - 1000
time : 2025-02-24T11:37:09.729Z - text: mapper2-index = 4 - ng - 4000 - Sleep NG after wait
time : 2025-02-24T11:37:10.730Z - text: mapper2-index = 5
time : 2025-02-24T11:37:10.741Z - text: https://test/5/3 - ng - Sleep NG before wait
time : 2025-02-24T11:37:11.747Z - text: https://test/5/1 - ok - 1000
time : 2025-02-24T11:37:12.750Z - text: https://test/5/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-24T11:37:14.753Z - text: https://test/5/4 - ng - 4000 - Sleep NG after wait
time : 2025-02-24T11:37:15.755Z - text: https://test/5/5 - ok - 5000
[
  { _id: 'Either', _tag: 'Right', right: 'https://test/1 - ok - 2000' },
  {
    _id: 'Either',
    _tag: 'Left',
    left: 'https://test/2 - ng - 2000 - Sleep NG after wait'
  },
  {
    _id: 'Either',
    _tag: 'Left',
    left: 'https://test/3 - ng - Sleep NG before wait'
  },
  {
    _id: 'Either',
    _tag: 'Left',
    left: 'https://test/4 - ng - 4000 - Sleep NG after wait'
  },
  {
    _id: 'Either',
    _tag: 'Right',
    right: 'https://test/5 - ok - 10000'
  }
]

全部同時実行からの直列実行

import { describe, test } from 'vitest'
import { Effect, pipe, Array, Either } from 'effect'
import { UnknownException } from 'effect/Cause'

class SleepNGBeforeError {
  static tagName = 'Sleep.Ng.Before'
  readonly _tag = SleepNGBeforeError.tagName

  static isClass(test: any): test is SleepNGBeforeError {
    return test._tag === SleepNGBeforeError.tagName
  }

  getMessage(site: string) {
    return `${site} - ng - Sleep NG before wait`
  }
}

class SleepNGAfterError {
  static tagName = 'Sleep.Ng.After'
  readonly _tag = SleepNGAfterError.tagName

  wait: number
  constructor(w: number) {
    this.wait = w
  }

  static isClass(test: any): test is SleepNGAfterError {
    return test._tag === SleepNGAfterError.tagName
  }

  getMessage(site: string) {
    return `${site} - ng - ${this.wait} - Sleep NG after wait`
  }
}

type AllNgData = SleepNGBeforeError | SleepNGAfterError | UnknownException

class TopSleepNGError {
  static tagName = 'Top.Sleep.Ng'
  readonly _tag = TopSleepNGError.tagName

  ngData: AllNgData
  constructor(ng: AllNgData) {
    this.ngData = ng
  }

  static isClass(test: any): test is SleepNGAfterError {
    return test._tag === SleepNGAfterError.tagName
  }

  getLastMessage() {
    const lastMessage =
      this.ngData instanceof UnknownException
        ? ' - ng - UnknownException'
        : this.ngData.getMessage('')
    return lastMessage
  }

  getMessage(site: string) {
    const index = TopOkData.getTopIndex(site)
    const lastMessage = this.getLastMessage()
    return `mapper2-index = ${index}${lastMessage}`
  }

  getTopMessage(site: string) {
    const lastMessage = this.getLastMessage()
    return `${site.substring(0, site.length - 2)}${lastMessage}`
  }
}

class TopOkData {
  site: string
  wait: number
  constructor(s: string, ok: OkData) {
    this.site = s
    this.wait = ok.wait
  }

  static isClass(test: any): test is TopOkData {
    return test instanceof TopOkData
  }

  static getTopIndex(site: string) {
    const index = site.substring(site.length - 3, site.length - 2)
    return Number(index)
  }

  static getMessage(site: string) {
    const index = TopOkData.getTopIndex(site)
    return `mapper2-index = ${index}`
  }

  getTopMessage(opWait: number) {
    return `${this.site.substring(0, this.site.length - 2)} - ok - ${this.wait + opWait}`
  }
}

class OkData {
  wait: number
  constructor(w: number) {
    this.wait = w
  }

  static isClass(test: any): test is OkData {
    return test instanceof OkData
  }

  static getLastIndex(site: string) {
    const index = site.slice(-1)
    return Number(index)
  }

  getMessage(site: string) {
    return `${site} - ok - ${this.wait}`
  }
}

const sleep = (wait: number) =>
  Effect.tryPromise<OkData>(
    () =>
      new Promise((resolve, reject) => {
        const waitSeconds = wait / 1000
        if (waitSeconds % 3 === 0) {
          reject(new SleepNGBeforeError())
        } else if (waitSeconds % 2 === 0) {
          setTimeout(() => reject(new SleepNGAfterError(wait)), wait)
        } else {
          setTimeout(() => resolve(new OkData(wait)), wait)
        }
      })
  )

const checkLog = (text: string) =>
  Effect.sync(() => console.log(`time : ${new Date().toISOString()} - text: ${text}`))

const testOptions = {
  timeout: 1000 * 100
}

const sumNumber = (chunkSize: number) => (a: number[], b: number[]) => {
  function addArray(num: number) {
    if (a.length === chunkSize) {
      const aMin = Math.min(...a)
      const aMinIndex = a.indexOf(aMin)
      a[aMinIndex] += num
    } else {
      a.push(num)
    }
  }

  b.forEach((bNum) => addArray(bNum))

  return a
}

function getWaitNumber(index: number) {
  const wait = index * 1000
  return wait
}

function mapError_sleepNg<T>(
  site: string,
  effectAndThen: Effect.Effect<T, UnknownException, never>
) {
  const effectAll = Effect.mapError(effectAndThen, (err) => {
    if (SleepNGBeforeError.isClass(err.error)) {
      return err.error
    } else if (SleepNGAfterError.isClass(err.error)) {
      return err.error
    } else {
      Effect.runPromise(checkLog(`mapError_sleepNg : UnknownException : ${err}`))
      return err
    }
  })
  return effectAll
}

function mapNumber_fromData<T, U>(data: Either.Either<T, U>) {
  const result = Either.mapBoth(data, {
    onLeft: (left) => {
      if (OkData.isClass(left)) {
        return left.wait
      } else if (SleepNGBeforeError.isClass(left)) {
        return 0
      } else if (SleepNGAfterError.isClass(left)) {
        return left.wait
      }

      return 0
    },
    onRight: (right) => {
      if (OkData.isClass(right)) {
        return right.wait
      }

      return 0
    }
  })

  return Either.isLeft(result) ? result.left : result.right
}

const do_sleep1 = (site: string) => (wait: number) =>
  Effect.try(() => {
    const sleepEffect = sleep(wait)
    const effectAll = mapError_sleepNg(site, sleepEffect)
    return effectAll
  })

const do_sleep2 = (site: string) => (wait: number) =>
  Effect.try(() => {
    const sleepEffect = sleep(wait)
    const effectAndThen = Effect.andThen(sleepEffect, (okData) => {
      const topOkData = new TopOkData(site, okData)
      return topOkData
    })
    const effectAll = mapError_sleepNg(site, effectAndThen)
    return effectAll
  })

const mapper1_gen = (site: string) =>
  Effect.gen(function* () {
    const effect = yield* pipe(site, OkData.getLastIndex, getWaitNumber, do_sleep1(site))
    const either = yield* Effect.either(effect)

    if (Either.isRight(either)) {
      const okData = either.right

      const message = okData.getMessage(site)
      Effect.runPromise(checkLog(message))
    } else if (Either.isLeft(either)) {
      const ngData = either.left

      if (SleepNGBeforeError.isClass(ngData)) {
        const message = ngData.getMessage(site)
        Effect.runPromise(checkLog(message))
      } else if (SleepNGAfterError.isClass(ngData)) {
        const message = ngData.getMessage(site)
        Effect.runPromise(checkLog(message))
      }
    }

    return either
  })

const mapper2_gen = (options: { concurrency: number }) => (sites: string[]) =>
  Effect.gen(function* () {
    const site = sites[0]
    const effect2 = yield* pipe(site, TopOkData.getTopIndex, getWaitNumber, do_sleep2(site))
    const either2 = yield* Effect.either(effect2)

    const map = Either.mapBoth(either2, {
      onLeft: (ngData) => {
        const topNgData = new TopSleepNGError(ngData)

        const message = topNgData.getMessage(site)
        Effect.runPromise(checkLog(message))

        const topMessage = topNgData.getTopMessage(site)
        return topMessage
      },
      onRight: (topOkData) =>
        Effect.gen(function* () {
          const message = TopOkData.getMessage(site)
          Effect.runPromise(checkLog(message))

          const either1 = yield* Effect.all(sites.map(mapper1_gen), options)
          const arrayEither1 = either1.map(mapNumber_fromData)
          const concurrency = options['concurrency']
          const chunkArray = Array.chunksOf(arrayEither1, concurrency)
          const waitNumber = Array.reduce(chunkArray, [0], sumNumber(concurrency))

          const topMessage = topOkData.getTopMessage(Math.max(...waitNumber))
          return topMessage
        })
    })

    const result = Either.isLeft(map) ? map : Either.right(yield* map.right)

    return result
  })

describe('effect-ts 非同期処理2つの配列', () => {
  test('使い方4ー全部同時実行からの直列実行', testOptions, async () => {
    const sites = [
      ['https://test/1/1'],
      ['https://test/2/1', 'https://test/2/2'],
      ['https://test/3/1', 'https://test/3/2', 'https://test/3/3'],
      ['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
      [
        'https://test/5/1',
        'https://test/5/2',
        'https://test/5/3',
        'https://test/5/4',
        'https://test/5/5'
      ]
    ]

    Effect.runPromise(checkLog('[start]'))

    const effect = await Effect.all(sites.map(mapper2_gen({ concurrency: 1 })), {
      concurrency: Number.MAX_VALUE
    })
    const result = await Effect.runPromise(effect)

    console.log(result)
  })
})

実行結果

time : 2025-02-24T11:38:48.551Z - text: [start]
time : 2025-02-24T11:38:48.566Z - text: mapper2-index = 3 - ng - Sleep NG before wait
time : 2025-02-24T11:38:49.565Z - text: mapper2-index = 1
time : 2025-02-24T11:38:50.565Z - text: mapper2-index = 2 - ng - 2000 - Sleep NG after wait
time : 2025-02-24T11:38:50.565Z - text: https://test/1/1 - ok - 1000
time : 2025-02-24T11:38:52.576Z - text: mapper2-index = 4 - ng - 4000 - Sleep NG after wait
time : 2025-02-24T11:38:53.569Z - text: mapper2-index = 5
time : 2025-02-24T11:38:54.587Z - text: https://test/5/1 - ok - 1000
time : 2025-02-24T11:38:56.595Z - text: https://test/5/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-24T11:38:56.596Z - text: https://test/5/3 - ng - Sleep NG before wait
time : 2025-02-24T11:39:00.600Z - text: https://test/5/4 - ng - 4000 - Sleep NG after wait
time : 2025-02-24T11:39:05.606Z - text: https://test/5/5 - ok - 5000
[
  { _id: 'Either', _tag: 'Right', right: 'https://test/1 - ok - 2000' },
  {
    _id: 'Either',
    _tag: 'Left',
    left: 'https://test/2 - ng - 2000 - Sleep NG after wait'
  },
  {
    _id: 'Either',
    _tag: 'Left',
    left: 'https://test/3 - ng - Sleep NG before wait'
  },
  {
    _id: 'Either',
    _tag: 'Left',
    left: 'https://test/4 - ng - 4000 - Sleep NG after wait'
  },
  {
    _id: 'Either',
    _tag: 'Right',
    right: 'https://test/5 - ok - 17000'
  }
]

全部同時実行からの最大2個まで同時実行

import { describe, test } from 'vitest'
import { Effect, pipe, Array, Either } from 'effect'
import { UnknownException } from 'effect/Cause'

class SleepNGBeforeError {
  static tagName = 'Sleep.Ng.Before'
  readonly _tag = SleepNGBeforeError.tagName

  static isClass(test: any): test is SleepNGBeforeError {
    return test._tag === SleepNGBeforeError.tagName
  }

  getMessage(site: string) {
    return `${site} - ng - Sleep NG before wait`
  }
}

class SleepNGAfterError {
  static tagName = 'Sleep.Ng.After'
  readonly _tag = SleepNGAfterError.tagName

  wait: number
  constructor(w: number) {
    this.wait = w
  }

  static isClass(test: any): test is SleepNGAfterError {
    return test._tag === SleepNGAfterError.tagName
  }

  getMessage(site: string) {
    return `${site} - ng - ${this.wait} - Sleep NG after wait`
  }
}

type AllNgData = SleepNGBeforeError | SleepNGAfterError | UnknownException

class TopSleepNGError {
  static tagName = 'Top.Sleep.Ng'
  readonly _tag = TopSleepNGError.tagName

  ngData: AllNgData
  constructor(ng: AllNgData) {
    this.ngData = ng
  }

  static isClass(test: any): test is SleepNGAfterError {
    return test._tag === SleepNGAfterError.tagName
  }

  getLastMessage() {
    const lastMessage =
      this.ngData instanceof UnknownException
        ? ' - ng - UnknownException'
        : this.ngData.getMessage('')
    return lastMessage
  }

  getMessage(site: string) {
    const index = TopOkData.getTopIndex(site)
    const lastMessage = this.getLastMessage()
    return `mapper2-index = ${index}${lastMessage}`
  }

  getTopMessage(site: string) {
    const lastMessage = this.getLastMessage()
    return `${site.substring(0, site.length - 2)}${lastMessage}`
  }
}

class TopOkData {
  site: string
  wait: number
  constructor(s: string, ok: OkData) {
    this.site = s
    this.wait = ok.wait
  }

  static isClass(test: any): test is TopOkData {
    return test instanceof TopOkData
  }

  static getTopIndex(site: string) {
    const index = site.substring(site.length - 3, site.length - 2)
    return Number(index)
  }

  static getMessage(site: string) {
    const index = TopOkData.getTopIndex(site)
    return `mapper2-index = ${index}`
  }

  getTopMessage(opWait: number) {
    return `${this.site.substring(0, this.site.length - 2)} - ok - ${this.wait + opWait}`
  }
}

class OkData {
  wait: number
  constructor(w: number) {
    this.wait = w
  }

  static isClass(test: any): test is OkData {
    return test instanceof OkData
  }

  static getLastIndex(site: string) {
    const index = site.slice(-1)
    return Number(index)
  }

  getMessage(site: string) {
    return `${site} - ok - ${this.wait}`
  }
}

const sleep = (wait: number) =>
  Effect.tryPromise<OkData>(
    () =>
      new Promise((resolve, reject) => {
        const waitSeconds = wait / 1000
        if (waitSeconds % 3 === 0) {
          reject(new SleepNGBeforeError())
        } else if (waitSeconds % 2 === 0) {
          setTimeout(() => reject(new SleepNGAfterError(wait)), wait)
        } else {
          setTimeout(() => resolve(new OkData(wait)), wait)
        }
      })
  )

const checkLog = (text: string) =>
  Effect.sync(() => console.log(`time : ${new Date().toISOString()} - text: ${text}`))

const testOptions = {
  timeout: 1000 * 100
}

const sumNumber = (chunkSize: number) => (a: number[], b: number[]) => {
  function addArray(num: number) {
    if (a.length === chunkSize) {
      const aMin = Math.min(...a)
      const aMinIndex = a.indexOf(aMin)
      a[aMinIndex] += num
    } else {
      a.push(num)
    }
  }

  b.forEach((bNum) => addArray(bNum))

  return a
}

function getWaitNumber(index: number) {
  const wait = index * 1000
  return wait
}

function mapError_sleepNg<T>(
  site: string,
  effectAndThen: Effect.Effect<T, UnknownException, never>
) {
  const effectAll = Effect.mapError(effectAndThen, (err) => {
    if (SleepNGBeforeError.isClass(err.error)) {
      return err.error
    } else if (SleepNGAfterError.isClass(err.error)) {
      return err.error
    } else {
      Effect.runPromise(checkLog(`mapError_sleepNg : UnknownException : ${err}`))
      return err
    }
  })
  return effectAll
}

function mapNumber_fromData<T, U>(data: Either.Either<T, U>) {
  const result = Either.mapBoth(data, {
    onLeft: (left) => {
      if (OkData.isClass(left)) {
        return left.wait
      } else if (SleepNGBeforeError.isClass(left)) {
        return 0
      } else if (SleepNGAfterError.isClass(left)) {
        return left.wait
      }

      return 0
    },
    onRight: (right) => {
      if (OkData.isClass(right)) {
        return right.wait
      }

      return 0
    }
  })

  return Either.isLeft(result) ? result.left : result.right
}

const do_sleep1 = (site: string) => (wait: number) =>
  Effect.try(() => {
    const sleepEffect = sleep(wait)
    const effectAll = mapError_sleepNg(site, sleepEffect)
    return effectAll
  })

const do_sleep2 = (site: string) => (wait: number) =>
  Effect.try(() => {
    const sleepEffect = sleep(wait)
    const effectAndThen = Effect.andThen(sleepEffect, (okData) => {
      const topOkData = new TopOkData(site, okData)
      return topOkData
    })
    const effectAll = mapError_sleepNg(site, effectAndThen)
    return effectAll
  })

const mapper1_gen = (site: string) =>
  Effect.gen(function* () {
    const effect = yield* pipe(site, OkData.getLastIndex, getWaitNumber, do_sleep1(site))
    const either = yield* Effect.either(effect)

    if (Either.isRight(either)) {
      const okData = either.right

      const message = okData.getMessage(site)
      Effect.runPromise(checkLog(message))
    } else if (Either.isLeft(either)) {
      const ngData = either.left

      if (SleepNGBeforeError.isClass(ngData)) {
        const message = ngData.getMessage(site)
        Effect.runPromise(checkLog(message))
      } else if (SleepNGAfterError.isClass(ngData)) {
        const message = ngData.getMessage(site)
        Effect.runPromise(checkLog(message))
      }
    }

    return either
  })

const mapper2_gen = (options: { concurrency: number }) => (sites: string[]) =>
  Effect.gen(function* () {
    const site = sites[0]
    const effect2 = yield* pipe(site, TopOkData.getTopIndex, getWaitNumber, do_sleep2(site))
    const either2 = yield* Effect.either(effect2)

    const map = Either.mapBoth(either2, {
      onLeft: (ngData) => {
        const topNgData = new TopSleepNGError(ngData)

        const message = topNgData.getMessage(site)
        Effect.runPromise(checkLog(message))

        const topMessage = topNgData.getTopMessage(site)
        return topMessage
      },
      onRight: (topOkData) =>
        Effect.gen(function* () {
          const message = TopOkData.getMessage(site)
          Effect.runPromise(checkLog(message))

          const either1 = yield* Effect.all(sites.map(mapper1_gen), options)
          const arrayEither1 = either1.map(mapNumber_fromData)
          const concurrency = options['concurrency']
          const chunkArray = Array.chunksOf(arrayEither1, concurrency)
          const waitNumber = Array.reduce(chunkArray, [0], sumNumber(concurrency))

          const topMessage = topOkData.getTopMessage(Math.max(...waitNumber))
          return topMessage
        })
    })

    const result = Either.isLeft(map) ? map : Either.right(yield* map.right)

    return result
  })

describe('effect-ts 非同期処理2つの配列', () => {
  test('使い方6ー全部同時実行からの最大2個まで同時実行', testOptions, async () => {
    const sites = [
      ['https://test/1/1'],
      ['https://test/2/1', 'https://test/2/2'],
      ['https://test/3/1', 'https://test/3/2', 'https://test/3/3'],
      ['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
      [
        'https://test/5/1',
        'https://test/5/2',
        'https://test/5/3',
        'https://test/5/4',
        'https://test/5/5'
      ]
    ]

    Effect.runPromise(checkLog('[start]'))

    const effect = await Effect.all(sites.map(mapper2_gen({ concurrency: 2 })), {
      concurrency: Number.MAX_VALUE
    })
    const result = await Effect.runPromise(effect)

    console.log(result)
  })
})

実行結果

time : 2025-02-24T11:40:45.713Z - text: [start]
time : 2025-02-24T11:40:45.726Z - text: mapper2-index = 3 - ng - Sleep NG before wait
time : 2025-02-24T11:40:46.726Z - text: mapper2-index = 1
time : 2025-02-24T11:40:47.729Z - text: mapper2-index = 2 - ng - 2000 - Sleep NG after wait
time : 2025-02-24T11:40:47.732Z - text: https://test/1/1 - ok - 1000
time : 2025-02-24T11:40:49.734Z - text: mapper2-index = 4 - ng - 4000 - Sleep NG after wait
time : 2025-02-24T11:40:50.739Z - text: mapper2-index = 5
time : 2025-02-24T11:40:51.754Z - text: https://test/5/1 - ok - 1000
time : 2025-02-24T11:40:51.758Z - text: https://test/5/3 - ng - Sleep NG before wait
time : 2025-02-24T11:40:52.757Z - text: https://test/5/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-24T11:40:55.763Z - text: https://test/5/4 - ng - 4000 - Sleep NG after wait
time : 2025-02-24T11:40:57.767Z - text: https://test/5/5 - ok - 5000
[
  { _id: 'Either', _tag: 'Right', right: 'https://test/1 - ok - 2000' },
  {
    _id: 'Either',
    _tag: 'Left',
    left: 'https://test/2 - ng - 2000 - Sleep NG after wait'
  },
  {
    _id: 'Either',
    _tag: 'Left',
    left: 'https://test/3 - ng - Sleep NG before wait'
  },
  {
    _id: 'Either',
    _tag: 'Left',
    left: 'https://test/4 - ng - 4000 - Sleep NG after wait'
  },
  {
    _id: 'Either',
    _tag: 'Right',
    right: 'https://test/5 - ok - 12000'
  }
]
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?