0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

fp-tsで配列の配列の非同期処理のabort対応

Posted at

配列の配列の非同期処理で対象API(以下ではsleep)がabortする場合のメモ

前回

配列が1つの場合

全部同時実行

import { describe, test } from 'vitest'
import * as TE from 'fp-ts/TaskEither'
import { pipe } from 'fp-ts/lib/function'
import * as RA from 'fp-ts/ReadonlyArray'
import * as O from 'fp-ts/Option'
import * as T from 'fp-ts/Task'

const sleep = (wait: number) =>
  new Promise((resolve, reject) => {
    const waitSeconds = wait / 1000
    if (waitSeconds % 3 === 0) {
      reject('Sleep NG before wait')
    } else if (waitSeconds % 2 === 0) {
      setTimeout(() => reject(`${wait} - Sleep NG after wait`), wait)
    } else {
      setTimeout(() => resolve(wait), wait)
    }
  }) as Promise<number>

function checkLog(text: string) {
  console.log(`time : ${new Date().toISOString()} - text: ${text}`)
}

const testOptions = {
  timeout: 1000 * 100
}

const getLastIndex = (abortAction: Function) => (site: string) => {
  const index = site.slice(-1)
  if (Number.isNaN(Number(index))) {
    checkLog(`${site} - abort - LastIndex is NaN`)
    abortAction()
    throw new Error(`${site} - abort - LastIndex is NaN`)
  }
  if (site[site.length - 2] !== '/') {
    checkLog(`${site} - abort - LastIndex >= 10`)
    abortAction()
    throw new Error(`${site} - abort - LastIndex >= 10`)
  }
  return Number(index)
}
const checkIndex =
  (abortAction: Function) => (site: string, topIndex: number, lastIndex: number) => {
    if (topIndex + lastIndex >= 10) {
      checkLog(`${site} - abort - (topIndex + lastIndex) >= 10`)
      abortAction()
      throw new Error(`${site} - abort - (topIndex + lastIndex) >= 10`)
    }
  }

function getWaitNumber(index: number) {
  const wait = index * 1000
  return wait
}

const do_sleep1 = (site: string) => (wait: number) =>
  TE.tryCatch(
    () =>
      sleep(wait).then((_wait) => {
        checkLog(site)
        return `${site} - ok - ${_wait}`
      }),
    (error) => {
      checkLog(`${site} - ng - ${error}`)
      return `${site} - ng - ${error}`
    }
  )

const throwError = (abortController: AbortController) => (error: unknown) => {
  const message = error && typeof error === 'object' && 'message' in error ? error.message : error
  if (abortController.signal.aborted) {
    throw message
  } else {
    return String(message)
  }
}

const mapper1_throwError =
  (topIndex: O.Option<number>) => (abortController: AbortController) => (site: string) => {
    if (abortController.signal.aborted) {
      checkLog(`${site} - aborted`)
      return TE.left(`${site} - aborted`)
    }

    const abortEvent = () => {
      checkLog(`${site} - aborted - from event`)
    }
    abortController.signal.addEventListener('abort', abortEvent)
    function removeEvent() {
      abortController.signal.removeEventListener('abort', abortEvent)
    }
    const abortAction = () => {
      removeEvent()
      abortController.abort()
    }

    const action = pipe(
      topIndex,
      O.match(
        () =>
          pipe(
            TE.tryCatch(
              async () => pipe(site, getLastIndex(abortAction), getWaitNumber, do_sleep1(site)),
              (error) => throwError(abortController)(error)
            ),
            TE.flattenW
          ),
        (topIndex) =>
          pipe(
            TE.tryCatch(
              async () => {
                const lastIndex = getLastIndex(abortAction)(site)
                checkIndex(abortAction)(site, topIndex, lastIndex)

                const action = pipe(lastIndex, getWaitNumber, do_sleep1(site))
                return action
              },
              (error) => throwError(abortController)(error)
            ),
            TE.flattenW
          )
      )
    )
    const actionWithRemoveEvent = pipe(
      action,
      TE.fold(
        (error) => {
          removeEvent()
          return TE.left(error)
        },
        (result) => {
          removeEvent()
          return TE.right(result)
        }
      )
    )

    return actionWithRemoveEvent
  }

describe('fp-ts 非同期処理1つの配列', () => {
  test('使い方1ー全部同時実行', testOptions, async () => {
    const sites = [
      'https://test/1',
      'https://test/2',
      'https://test/3',
      'https://test/4',
      'https://test/5',
      'https://test/10'
    ]

    const abortController = new AbortController()

    checkLog('[start]')

    try {
      const action = pipe(
        sites,
        RA.traverse(T.ApplicativePar)(mapper1_throwError(O.none)(abortController))
      )
      const result = await action()
      const separateResult = pipe(result, RA.separate)

      console.log(separateResult)
    } catch (error) {
      console.log(`error -> ${error}`)
    }
  })
})

実行結果

time : 2025-01-27T12:47:50.742Z - text: [start]
time : 2025-01-27T12:47:50.745Z - text: https://test/10 - abort - LastIndex >= 10
time : 2025-01-27T12:47:50.745Z - text: https://test/1 - aborted - from event
time : 2025-01-27T12:47:50.745Z - text: https://test/2 - aborted - from event
time : 2025-01-27T12:47:50.745Z - text: https://test/3 - aborted - from event
time : 2025-01-27T12:47:50.746Z - text: https://test/4 - aborted - from event
time : 2025-01-27T12:47:50.746Z - text: https://test/5 - aborted - from event
error -> https://test/10 - abort - LastIndex >= 10
time : 2025-01-27T12:47:50.746Z - text: https://test/3 - ng - Sleep NG before wait

実行結果(※中断データを除外した場合)

time : 2025-01-27T12:48:10.719Z - text: [start]
time : 2025-01-27T12:48:10.722Z - text: https://test/3 - ng - Sleep NG before wait
time : 2025-01-27T12:48:11.725Z - text: https://test/1
time : 2025-01-27T12:48:12.725Z - text: https://test/2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T12:48:14.724Z - text: https://test/4 - ng - 4000 - Sleep NG after wait
time : 2025-01-27T12:48:15.732Z - text: https://test/5
{
  left: [
    'https://test/2 - ng - 2000 - Sleep NG after wait',
    'https://test/3 - ng - Sleep NG before wait',
    'https://test/4 - ng - 4000 - Sleep NG after wait'
  ],
  right: [ 'https://test/1 - ok - 1000', 'https://test/5 - ok - 5000' ]
}

最大2個まで同時実行

import { describe, test } from 'vitest'
import * as TE from 'fp-ts/TaskEither'
import { pipe } from 'fp-ts/lib/function'
import * as RA from 'fp-ts/ReadonlyArray'
import * as O from 'fp-ts/Option'
import * as T from 'fp-ts/Task'

const sleep = (wait: number) =>
  new Promise((resolve, reject) => {
    const waitSeconds = wait / 1000
    if (waitSeconds % 3 === 0) {
      reject('Sleep NG before wait')
    } else if (waitSeconds % 2 === 0) {
      setTimeout(() => reject(`${wait} - Sleep NG after wait`), wait)
    } else {
      setTimeout(() => resolve(wait), wait)
    }
  }) as Promise<number>

function checkLog(text: string) {
  console.log(`time : ${new Date().toISOString()} - text: ${text}`)
}

const testOptions = {
  timeout: 1000 * 100
}

const getLastIndex = (abortAction: Function) => (site: string) => {
  const index = site.slice(-1)
  if (Number.isNaN(Number(index))) {
    checkLog(`${site} - abort - LastIndex is NaN`)
    abortAction()
    throw new Error(`${site} - abort - LastIndex is NaN`)
  }
  if (site[site.length - 2] !== '/') {
    checkLog(`${site} - abort - LastIndex >= 10`)
    abortAction()
    throw new Error(`${site} - abort - LastIndex >= 10`)
  }
  return Number(index)
}
const checkIndex =
  (abortAction: Function) => (site: string, topIndex: number, lastIndex: number) => {
    if (topIndex + lastIndex >= 10) {
      checkLog(`${site} - abort - (topIndex + lastIndex) >= 10`)
      abortAction()
      throw new Error(`${site} - abort - (topIndex + lastIndex) >= 10`)
    }
  }

function getWaitNumber(index: number) {
  const wait = index * 1000
  return wait
}

const do_sleep1 = (site: string) => (wait: number) =>
  TE.tryCatch(
    () =>
      sleep(wait).then((_wait) => {
        checkLog(site)
        return `${site} - ok - ${_wait}`
      }),
    (error) => {
      checkLog(`${site} - ng - ${error}`)
      return `${site} - ng - ${error}`
    }
  )

const throwError = (abortController: AbortController) => (error: unknown) => {
  const message = error && typeof error === 'object' && 'message' in error ? error.message : error
  if (abortController.signal.aborted) {
    throw message
  } else {
    return String(message)
  }
}

const mapper1_throwError =
  (topIndex: O.Option<number>) => (abortController: AbortController) => (site: string) => {
    if (abortController.signal.aborted) {
      checkLog(`${site} - aborted`)
      return TE.left(`${site} - aborted`)
    }

    const abortEvent = () => {
      checkLog(`${site} - aborted - from event`)
    }
    abortController.signal.addEventListener('abort', abortEvent)
    function removeEvent() {
      abortController.signal.removeEventListener('abort', abortEvent)
    }
    const abortAction = () => {
      removeEvent()
      abortController.abort()
    }

    const action = pipe(
      topIndex,
      O.match(
        () =>
          pipe(
            TE.tryCatch(
              async () => pipe(site, getLastIndex(abortAction), getWaitNumber, do_sleep1(site)),
              (error) => throwError(abortController)(error)
            ),
            TE.flattenW
          ),
        (topIndex) =>
          pipe(
            TE.tryCatch(
              async () => {
                const lastIndex = getLastIndex(abortAction)(site)
                checkIndex(abortAction)(site, topIndex, lastIndex)

                const action = pipe(lastIndex, getWaitNumber, do_sleep1(site))
                return action
              },
              (error) => throwError(abortController)(error)
            ),
            TE.flattenW
          )
      )
    )
    const actionWithRemoveEvent = pipe(
      action,
      TE.fold(
        (error) => {
          removeEvent()
          return TE.left(error)
        },
        (result) => {
          removeEvent()
          return TE.right(result)
        }
      )
    )

    return actionWithRemoveEvent
  }

describe('fp-ts 非同期処理1つの配列', () => {
  test('使い方2ー最大2個まで同時実行', testOptions, async () => {
    const sites = [
      'https://test/1',
      'https://test/2',
      'https://test/3',
      'https://test/4',
      'https://test/5',
      'https://test/10'
    ]

    const abortController = new AbortController()

    checkLog('[start]')

    try {
      const limit = 2
      const chunkSites = pipe(sites, RA.chunksOf(limit))
      const actionArray = pipe(
        chunkSites,
        RA.map(RA.traverse(T.ApplicativePar)(mapper1_throwError(O.none)(abortController)))
      )
      const action = pipe(actionArray, RA.sequence(T.ApplicativeSeq), T.map(RA.flatten))
      const result = await action()
      const separateResult = pipe(result, RA.separate)

      console.log(separateResult)
    } catch (error) {
      console.log(error)
    }
  })
})

実行結果

time : 2025-01-27T12:51:32.445Z - text: [start]
time : 2025-01-27T12:51:33.455Z - text: https://test/1
time : 2025-01-27T12:51:34.451Z - text: https://test/2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T12:51:34.454Z - text: https://test/3 - ng - Sleep NG before wait
time : 2025-01-27T12:51:38.462Z - text: https://test/4 - ng - 4000 - Sleep NG after wait
time : 2025-01-27T12:51:38.464Z - text: https://test/10 - abort - LastIndex >= 10
time : 2025-01-27T12:51:38.467Z - text: https://test/5 - aborted - from event
https://test/10 - abort - LastIndex >= 10

実行結果(※中断データを除外した場合)

time : 2025-01-27T12:54:12.154Z - text: [start]
time : 2025-01-27T12:54:13.161Z - text: https://test/1
time : 2025-01-27T12:54:14.165Z - text: https://test/2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T12:54:14.167Z - text: https://test/3 - ng - Sleep NG before wait
time : 2025-01-27T12:54:18.168Z - text: https://test/4 - ng - 4000 - Sleep NG after wait
time : 2025-01-27T12:54:23.171Z - text: https://test/5
{
  left: [
    'https://test/2 - ng - 2000 - Sleep NG after wait',
    'https://test/3 - ng - Sleep NG before wait',
    'https://test/4 - ng - 4000 - Sleep NG after wait'
  ],
  right: [ 'https://test/1 - ok - 1000', 'https://test/5 - ok - 5000' ]
}

配列が2つの場合(シンプル)

全部同時実行からの全部同時実行

import { describe, test } from 'vitest'
import * as TE from 'fp-ts/TaskEither'
import { pipe } from 'fp-ts/lib/function'
import * as RA from 'fp-ts/ReadonlyArray'
import * as E from 'fp-ts/Either'
import * as O from 'fp-ts/Option'
import * as T from 'fp-ts/Task'

const sleep = (wait: number) =>
  new Promise((resolve, reject) => {
    const waitSeconds = wait / 1000
    if (waitSeconds % 3 === 0) {
      reject('Sleep NG before wait')
    } else if (waitSeconds % 2 === 0) {
      setTimeout(() => reject(`${wait} - Sleep NG after wait`), wait)
    } else {
      setTimeout(() => resolve(wait), wait)
    }
  }) as Promise<number>

function checkLog(text: string) {
  console.log(`time : ${new Date().toISOString()} - text: ${text}`)
}

const testOptions = {
  timeout: 1000 * 100
}

const getLastIndex = (abortAction: Function) => (site: string) => {
  const index = site.slice(-1)
  if (Number.isNaN(Number(index))) {
    checkLog(`${site} - abort - LastIndex is NaN`)
    abortAction()
    throw new Error(`${site} - abort - LastIndex is NaN`)
  }
  if (site[site.length - 2] !== '/') {
    checkLog(`${site} - abort - LastIndex >= 10`)
    abortAction()
    throw new Error(`${site} - abort - LastIndex >= 10`)
  }
  return Number(index)
}
const getTopIndex = (abortAction: Function) => (site: string) => {
  const index = site.substring(site.length - 3, site.length - 2)
  if (Number.isNaN(Number(index))) {
    checkLog(`${site} - abort - TopIndex is NaN`)
    abortAction()
    throw new Error(`${site} - abort - TopIndex is NaN`)
  }
  return Number(index)
}
const checkIndex =
  (abortAction: Function) => (site: string, topIndex: number, lastIndex: number) => {
    if (topIndex + lastIndex >= 10) {
      checkLog(`${site} - abort - (topIndex + lastIndex) >= 10`)
      abortAction()
      throw new Error(`${site} - abort - (topIndex + lastIndex) >= 10`)
    }
  }

function getWaitNumber(index: number) {
  const wait = index * 1000
  return wait
}

const do_sleep1 = (site: string) => (wait: number) =>
  TE.tryCatch(
    () =>
      sleep(wait).then((_wait) => {
        checkLog(site)
        return `${site} - ok - ${_wait}`
      }),
    (error) => {
      checkLog(`${site} - ng - ${error}`)
      return `${site} - ng - ${error}`
    }
  )
function getWaitFromMapper1Message(mapper1Message: string) {
  const mapper1OkWaitString = mapper1Message.substring(mapper1Message.indexOf('ok') + 5)
  const mapper1OkWait = Number(mapper1OkWaitString)
  if (isNaN(mapper1OkWait)) {
    const mapper1NgWaitString = mapper1Message.substring(mapper1Message.indexOf('ng') + 5)
    const mapper1NgWaitStringWithoutErrorMessage = mapper1NgWaitString.substring(
      0,
      mapper1NgWaitString.indexOf('-')
    )
    const mapper1NgWait = Number(mapper1NgWaitStringWithoutErrorMessage)
    return isNaN(mapper1NgWait) ? 0 : mapper1NgWait
  } else {
    return mapper1OkWait
  }
}

const do_sleep2 = (site: string, abortAction: Function) => (wait: number) =>
  TE.tryCatch(
    () =>
      sleep(wait).then((_wait) => {
        const index = getTopIndex(abortAction)(site)
        checkLog(`mapper2-index = ${index}`)
        return (opWait: number) => `${site.substring(0, site.length - 2)} - ${_wait + opWait}`
      }),
    (error) => {
      const index = getTopIndex(abortAction)(site)
      checkLog(`mapper2-index = ${index} - ng - ${error}`)
      return `${site.substring(0, site.length - 2)} - ng - ${error}`
    }
  )

const throwError = (abortController: AbortController) => (error: unknown) => {
  const message = error && typeof error === 'object' && 'message' in error ? error.message : error
  if (abortController.signal.aborted) {
    throw message
  } else {
    return String(message)
  }
}

const mapper1_throwError =
  (topIndex: O.Option<number>) => (abortController: AbortController) => (site: string) => {
    if (abortController.signal.aborted) {
      checkLog(`${site} - aborted`)
      return TE.left(`${site} - aborted`)
    }

    const abortEvent = () => {
      checkLog(`${site} - aborted - from event`)
    }
    abortController.signal.addEventListener('abort', abortEvent)
    function removeEvent() {
      abortController.signal.removeEventListener('abort', abortEvent)
    }
    const abortAction = () => {
      removeEvent()
      abortController.abort()
    }

    const action = pipe(
      topIndex,
      O.match(
        () =>
          pipe(
            TE.tryCatch(
              async () => pipe(site, getLastIndex(abortAction), getWaitNumber, do_sleep1(site)),
              (error) => throwError(abortController)(error)
            ),
            TE.flattenW
          ),
        (topIndex) =>
          pipe(
            TE.tryCatch(
              async () => {
                const lastIndex = getLastIndex(abortAction)(site)
                checkIndex(abortAction)(site, topIndex, lastIndex)

                const action = pipe(lastIndex, getWaitNumber, do_sleep1(site))
                return action
              },
              (error) => throwError(abortController)(error)
            ),
            TE.flattenW
          )
      )
    )
    const actionWithRemoveEvent = pipe(
      action,
      TE.fold(
        (error) => {
          removeEvent()
          return TE.left(error)
        },
        (result) => {
          removeEvent()
          return TE.right(result)
        }
      )
    )

    return actionWithRemoveEvent
  }

function action2_and_1(
  abortController: AbortController,
  action2: TE.TaskEither<string, (opWait: number) => string>,
  removeEvent: Function,
  action1: () => T.Task<readonly E.Either<string, string>[]>,
  op: (a: number, b: number) => number
) {
  const action = pipe(
    action2,
    TE.foldW(
      (error2) => {
        removeEvent()
        return TE.left(throwError(abortController)(error2))
      },
      (message2Function) => async () => {
        const resultArray1 = await action1()()
        const convertedResultArray1 = pipe(
          resultArray1,
          RA.map((result1) =>
            pipe(
              result1,
              E.fold(
                (error1) => E.right(getWaitFromMapper1Message(error1)),
                (message1) => E.right(getWaitFromMapper1Message(message1))
              )
            )
          )
        )
        const separateResult = pipe(convertedResultArray1, RA.separate)
        const opWait1 = pipe(separateResult.right, RA.reduce(0, op))
        removeEvent()
        return E.right(message2Function(opWait1))
      }
    )
  )
  return action
}

const mapper2_par = (abortController: AbortController) => (sites: string[]) => {
  const site = sites[0]
  const topIndex = getTopIndex(abortController.abort)(site)

  if (abortController.signal.aborted) {
    checkLog(`mapper2-index = ${topIndex} - aborted`)
    return TE.left(`mapper2-index = ${topIndex} - aborted`)
  }

  const abortEvent = () => {
    checkLog(`mapper2-index = ${topIndex} - aborted - from event`)
  }
  abortController.signal.addEventListener('abort', abortEvent)
  function removeEvent() {
    abortController.signal.removeEventListener('abort', abortEvent)
  }
  const abortAction = () => {
    removeEvent()
    abortController.abort()
  }

  const action2 = pipe(
    TE.tryCatch(
      async () => pipe(topIndex, getWaitNumber, do_sleep2(site, abortAction)),
      (error) => throwError(abortController)(error)
    ),
    TE.flattenW
  )

  const action = action2_and_1(
    abortController,
    action2,
    removeEvent,
    () =>
      pipe(
        sites,
        RA.traverse(T.ApplicativePar)(mapper1_throwError(O.some(topIndex))(abortController))
      ),
    Math.max
  )
  return pipe(
    action as any,
    TE.fold(
      (error) => {
        removeEvent()
        return TE.left(error)
      },
      (result) => {
        removeEvent()
        return TE.right(result)
      }
    )
  )
}

describe('fp-ts 非同期処理2つの配列', () => {
  test('使い方1ー全部同時実行からの全部同時実行', testOptions, async () => {
    const sites = [
      // ['https://test/1/1'],
      // ['https://test/2/1', 'https://test/2/2']
      //   ['https://test/3/1', 'https://test/3/2', 'https://test/3/3']
      // ['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
      //   [
      //     'https://test/5/1',
      //     'https://test/5/2',
      //     'https://test/5/3',
      //     'https://test/5/4'
      //     // 'https://test/5/5'
      //   ]
      [
        'https://test/7/1',
        'https://test/7/2',
        'https://test/7/3',
        'https://test/7/4',
        'https://test/7/5',
        'https://test/7/6',
        'https://test/7/7'
      ]
    ]

    const abortController = new AbortController()

    checkLog('[start]')

    try {
      const action = pipe(sites, RA.traverse(T.ApplicativePar)(mapper2_par(abortController)))
      const result = await action()
      const separateResult = pipe(result, RA.separate)

      console.log(separateResult)
    } catch (error) {
      console.log(error)
    }
  })
})

実行結果

time : 2025-01-27T13:00:00.792Z - text: [start]
time : 2025-01-27T13:00:07.806Z - text: mapper2-index = 7
time : 2025-01-27T13:00:07.812Z - text: https://test/7/7 - abort - (topIndex + lastIndex) >= 10
time : 2025-01-27T13:00:07.816Z - text: mapper2-index = 7 - aborted - from event
time : 2025-01-27T13:00:07.817Z - text: https://test/7/1 - aborted - from event
time : 2025-01-27T13:00:07.818Z - text: https://test/7/2 - aborted - from event
time : 2025-01-27T13:00:07.818Z - text: https://test/7/3 - aborted - from event
time : 2025-01-27T13:00:07.818Z - text: https://test/7/4 - aborted - from event
time : 2025-01-27T13:00:07.819Z - text: https://test/7/5 - aborted - from event
time : 2025-01-27T13:00:07.820Z - text: https://test/7/6 - aborted - from event
time : 2025-01-27T13:00:07.820Z - text: https://test/7/6 - abort - (topIndex + lastIndex) >= 10
time : 2025-01-27T13:00:07.822Z - text: https://test/7/5 - abort - (topIndex + lastIndex) >= 10
time : 2025-01-27T13:00:07.824Z - text: https://test/7/4 - abort - (topIndex + lastIndex) >= 10
time : 2025-01-27T13:00:07.824Z - text: https://test/7/3 - abort - (topIndex + lastIndex) >= 10
https://test/7/7 - abort - (topIndex + lastIndex) >= 10

実行結果(※中断データを除外した場合)

time : 2025-01-27T13:02:05.156Z - text: [start]
time : 2025-01-27T13:02:12.163Z - text: mapper2-index = 7
time : 2025-01-27T13:02:13.178Z - text: https://test/7/1
time : 2025-01-27T13:02:14.168Z - text: https://test/7/2 - ng - 2000 - Sleep NG after wait
{ left: [], right: [ 'https://test/7 - 9000' ] }

全部同時実行からの直列実行

import { describe, test } from 'vitest'
import * as TE from 'fp-ts/TaskEither'
import { pipe } from 'fp-ts/lib/function'
import * as RA from 'fp-ts/ReadonlyArray'
import * as E from 'fp-ts/Either'
import * as O from 'fp-ts/Option'
import * as T from 'fp-ts/Task'

const sleep = (wait: number) =>
  new Promise((resolve, reject) => {
    const waitSeconds = wait / 1000
    if (waitSeconds % 3 === 0) {
      reject('Sleep NG before wait')
    } else if (waitSeconds % 2 === 0) {
      setTimeout(() => reject(`${wait} - Sleep NG after wait`), wait)
    } else {
      setTimeout(() => resolve(wait), wait)
    }
  }) as Promise<number>

function checkLog(text: string) {
  console.log(`time : ${new Date().toISOString()} - text: ${text}`)
}

const testOptions = {
  timeout: 1000 * 100
}

function sum(a: number, b: number) {
  return a + b
}

const getLastIndex = (abortAction: Function) => (site: string) => {
  const index = site.slice(-1)
  if (Number.isNaN(Number(index))) {
    checkLog(`${site} - abort - LastIndex is NaN`)
    abortAction()
    throw new Error(`${site} - abort - LastIndex is NaN`)
  }
  if (site[site.length - 2] !== '/') {
    checkLog(`${site} - abort - LastIndex >= 10`)
    abortAction()
    throw new Error(`${site} - abort - LastIndex >= 10`)
  }
  return Number(index)
}
const getTopIndex = (abortAction: Function) => (site: string) => {
  const index = site.substring(site.length - 3, site.length - 2)
  if (Number.isNaN(Number(index))) {
    checkLog(`${site} - abort - TopIndex is NaN`)
    abortAction()
    throw new Error(`${site} - abort - TopIndex is NaN`)
  }
  return Number(index)
}
const checkIndex =
  (abortAction: Function) => (site: string, topIndex: number, lastIndex: number) => {
    if (topIndex + lastIndex >= 10) {
      checkLog(`${site} - abort - (topIndex + lastIndex) >= 10`)
      abortAction()
      throw new Error(`${site} - abort - (topIndex + lastIndex) >= 10`)
    }
  }

function getWaitNumber(index: number) {
  const wait = index * 1000
  return wait
}

const do_sleep1 = (site: string) => (wait: number) =>
  TE.tryCatch(
    () =>
      sleep(wait).then((_wait) => {
        checkLog(site)
        return `${site} - ok - ${_wait}`
      }),
    (error) => {
      checkLog(`${site} - ng - ${error}`)
      return `${site} - ng - ${error}`
    }
  )
function getWaitFromMapper1Message(mapper1Message: string) {
  const mapper1OkWaitString = mapper1Message.substring(mapper1Message.indexOf('ok') + 5)
  const mapper1OkWait = Number(mapper1OkWaitString)
  if (isNaN(mapper1OkWait)) {
    const mapper1NgWaitString = mapper1Message.substring(mapper1Message.indexOf('ng') + 5)
    const mapper1NgWaitStringWithoutErrorMessage = mapper1NgWaitString.substring(
      0,
      mapper1NgWaitString.indexOf('-')
    )
    const mapper1NgWait = Number(mapper1NgWaitStringWithoutErrorMessage)
    return isNaN(mapper1NgWait) ? 0 : mapper1NgWait
  } else {
    return mapper1OkWait
  }
}

const do_sleep2 = (site: string, abortAction: Function) => (wait: number) =>
  TE.tryCatch(
    () =>
      sleep(wait).then((_wait) => {
        const index = getTopIndex(abortAction)(site)
        checkLog(`mapper2-index = ${index}`)
        return (opWait: number) => `${site.substring(0, site.length - 2)} - ${_wait + opWait}`
      }),
    (error) => {
      const index = getTopIndex(abortAction)(site)
      checkLog(`mapper2-index = ${index} - ng - ${error}`)
      return `${site.substring(0, site.length - 2)} - ng - ${error}`
    }
  )

const throwError = (abortController: AbortController) => (error: unknown) => {
  const message = error && typeof error === 'object' && 'message' in error ? error.message : error
  if (abortController.signal.aborted) {
    throw message
  } else {
    return String(message)
  }
}

const mapper1_throwError =
  (topIndex: O.Option<number>) => (abortController: AbortController) => (site: string) => {
    if (abortController.signal.aborted) {
      checkLog(`${site} - aborted`)
      return TE.left(`${site} - aborted`)
    }

    const abortEvent = () => {
      checkLog(`${site} - aborted - from event`)
    }
    abortController.signal.addEventListener('abort', abortEvent)
    function removeEvent() {
      abortController.signal.removeEventListener('abort', abortEvent)
    }
    const abortAction = () => {
      removeEvent()
      abortController.abort()
    }

    const action = pipe(
      topIndex,
      O.match(
        () =>
          pipe(
            TE.tryCatch(
              async () => pipe(site, getLastIndex(abortAction), getWaitNumber, do_sleep1(site)),
              (error) => throwError(abortController)(error)
            ),
            TE.flattenW
          ),
        (topIndex) =>
          pipe(
            TE.tryCatch(
              async () => {
                const lastIndex = getLastIndex(abortAction)(site)
                checkIndex(abortAction)(site, topIndex, lastIndex)

                const action = pipe(lastIndex, getWaitNumber, do_sleep1(site))
                return action
              },
              (error) => throwError(abortController)(error)
            ),
            TE.flattenW
          )
      )
    )
    const actionWithRemoveEvent = pipe(
      action,
      TE.fold(
        (error) => {
          removeEvent()
          return TE.left(error)
        },
        (result) => {
          removeEvent()
          return TE.right(result)
        }
      )
    )

    return actionWithRemoveEvent
  }

function action2_and_1(
  abortController: AbortController,
  action2: TE.TaskEither<string, (opWait: number) => string>,
  removeEvent: Function,
  action1: () => T.Task<readonly E.Either<string, string>[]>,
  op: (a: number, b: number) => number
) {
  const action = pipe(
    action2,
    TE.foldW(
      (error2) => {
        removeEvent()
        return TE.left(throwError(abortController)(error2))
      },
      (message2Function) => async () => {
        const resultArray1 = await action1()()
        const convertedResultArray1 = pipe(
          resultArray1,
          RA.map((result1) =>
            pipe(
              result1,
              E.fold(
                (error1) => E.right(getWaitFromMapper1Message(error1)),
                (message1) => E.right(getWaitFromMapper1Message(message1))
              )
            )
          )
        )
        const separateResult = pipe(convertedResultArray1, RA.separate)
        const opWait1 = pipe(separateResult.right, RA.reduce(0, op))
        removeEvent()
        return E.right(message2Function(opWait1))
      }
    )
  )
  return action
}

const mapper2_seq = (abortController: AbortController) => (sites: string[]) => {
  const site = sites[0]
  const topIndex = getTopIndex(abortController.abort)(site)

  const abortEvent = () => {
    checkLog(`mapper2-index = ${topIndex} - aborted - from event`)
  }
  abortController.signal.addEventListener('abort', abortEvent)
  function removeEvent() {
    abortController.signal.removeEventListener('abort', abortEvent)
  }
  const abortAction = () => {
    removeEvent()
    abortController.abort()
  }

  if (abortController.signal.aborted) {
    checkLog(`mapper2-index = ${topIndex} - aborted`)
    removeEvent()
    return TE.left(`mapper2-index = ${topIndex} - aborted`)
  }

  const action2 = pipe(
    TE.tryCatch(
      async () => pipe(topIndex, getWaitNumber, do_sleep2(site, abortAction)),
      (error) => throwError(abortController)(error)
    ),
    TE.flattenW
  )

  const action = action2_and_1(
    abortController,
    action2,
    removeEvent,
    () =>
      pipe(
        sites,
        RA.traverse(T.ApplicativeSeq)(mapper1_throwError(O.some(topIndex))(abortController))
      ),
    sum
  )
  return action
}

describe('fp-ts 非同期処理2つの配列', () => {
  test('使い方3ー全部同時実行からの直列実行', testOptions, async () => {
    const sites = [
      // ['https://test/1/1'],
      // ['https://test/2/1', 'https://test/2/2']
      //   ['https://test/3/1', 'https://test/3/2', 'https://test/3/3']
      // ['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
      // [
      //   'https://test/5/1',
      //   'https://test/5/2',
      //   'https://test/5/3',
      //   'https://test/5/4',
      //   'https://test/5/5'
      // ]
      [
        'https://test/7/1',
        'https://test/7/2',
        'https://test/7/3',
        'https://test/7/4',
        'https://test/7/5',
        'https://test/7/6',
        'https://test/7/7'
      ]
    ]

    const abortController = new AbortController()

    checkLog('[start]')

    try {
      const action = pipe(sites, RA.traverse(T.ApplicativePar)(mapper2_seq(abortController)))
      const result = await action()
      const separateResult = pipe(result, RA.separate)

      console.log(separateResult)
    } catch (error) {
      console.log(error)
    }
  })
})

実行結果

time : 2025-01-27T13:06:22.590Z - text: [start]
time : 2025-01-27T13:06:29.598Z - text: mapper2-index = 7
time : 2025-01-27T13:06:30.604Z - text: https://test/7/1
time : 2025-01-27T13:06:32.620Z - text: https://test/7/2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T13:06:32.621Z - text: https://test/7/3 - abort - (topIndex + lastIndex) >= 10
time : 2025-01-27T13:06:32.623Z - text: mapper2-index = 7 - aborted - from event
time : 2025-01-27T13:06:32.623Z - text: https://test/7/4 - aborted - from event
time : 2025-01-27T13:06:32.623Z - text: https://test/7/5 - aborted - from event
time : 2025-01-27T13:06:32.623Z - text: https://test/7/6 - aborted - from event
time : 2025-01-27T13:06:32.624Z - text: https://test/7/7 - aborted - from event
https://test/7/3 - abort - (topIndex + lastIndex) >= 10

実行結果(※中断データを除外した場合)

time : 2025-01-27T13:07:15.692Z - text: [start]
time : 2025-01-27T13:07:22.709Z - text: mapper2-index = 7
time : 2025-01-27T13:07:23.728Z - text: https://test/7/1
time : 2025-01-27T13:07:25.738Z - text: https://test/7/2 - ng - 2000 - Sleep NG after wait
{ left: [], right: [ 'https://test/7 - 10000' ] }

全部同時実行からの最大2個まで同時実行

import { describe, test } from 'vitest'
import * as TE from 'fp-ts/TaskEither'
import { pipe } from 'fp-ts/lib/function'
import * as RA from 'fp-ts/ReadonlyArray'
import * as E from 'fp-ts/Either'
import * as O from 'fp-ts/Option'
import * as T from 'fp-ts/Task'

const sleep = (wait: number) =>
  new Promise((resolve, reject) => {
    const waitSeconds = wait / 1000
    if (waitSeconds % 3 === 0) {
      reject('Sleep NG before wait')
    } else if (waitSeconds % 2 === 0) {
      setTimeout(() => reject(`${wait} - Sleep NG after wait`), wait)
    } else {
      setTimeout(() => resolve(wait), wait)
    }
  }) as Promise<number>

function checkLog(text: string) {
  console.log(`time : ${new Date().toISOString()} - text: ${text}`)
}

const testOptions = {
  timeout: 1000 * 100
}

function sum(a: number, b: number) {
  return a + b
}

const getLastIndex = (abortAction: Function) => (site: string) => {
  const index = site.slice(-1)
  if (Number.isNaN(Number(index))) {
    checkLog(`${site} - abort - LastIndex is NaN`)
    abortAction()
    throw new Error(`${site} - abort - LastIndex is NaN`)
  }
  if (site[site.length - 2] !== '/') {
    checkLog(`${site} - abort - LastIndex >= 10`)
    abortAction()
    throw new Error(`${site} - abort - LastIndex >= 10`)
  }
  return Number(index)
}
const getTopIndex = (abortAction: Function) => (site: string) => {
  const index = site.substring(site.length - 3, site.length - 2)
  if (Number.isNaN(Number(index))) {
    checkLog(`${site} - abort - TopIndex is NaN`)
    abortAction()
    throw new Error(`${site} - abort - TopIndex is NaN`)
  }
  return Number(index)
}
const checkIndex =
  (abortAction: Function) => (site: string, topIndex: number, lastIndex: number) => {
    if (topIndex + lastIndex >= 10) {
      checkLog(`${site} - abort - (topIndex + lastIndex) >= 10`)
      abortAction()
      throw new Error(`${site} - abort - (topIndex + lastIndex) >= 10`)
    }
  }

function getWaitNumber(index: number) {
  const wait = index * 1000
  return wait
}

const do_sleep1 = (site: string) => (wait: number) =>
  TE.tryCatch(
    () =>
      sleep(wait).then((_wait) => {
        checkLog(site)
        return `${site} - ok - ${_wait}`
      }),
    (error) => {
      checkLog(`${site} - ng - ${error}`)
      return `${site} - ng - ${error}`
    }
  )
function getWaitFromMapper1Message(mapper1Message: string) {
  const mapper1OkWaitString = mapper1Message.substring(mapper1Message.indexOf('ok') + 5)
  const mapper1OkWait = Number(mapper1OkWaitString)
  if (isNaN(mapper1OkWait)) {
    const mapper1NgWaitString = mapper1Message.substring(mapper1Message.indexOf('ng') + 5)
    const mapper1NgWaitStringWithoutErrorMessage = mapper1NgWaitString.substring(
      0,
      mapper1NgWaitString.indexOf('-')
    )
    const mapper1NgWait = Number(mapper1NgWaitStringWithoutErrorMessage)
    return isNaN(mapper1NgWait) ? 0 : mapper1NgWait
  } else {
    return mapper1OkWait
  }
}

const do_sleep2 = (site: string, abortAction: Function) => (wait: number) =>
  TE.tryCatch(
    () =>
      sleep(wait).then((_wait) => {
        const index = getTopIndex(abortAction)(site)
        checkLog(`mapper2-index = ${index}`)
        return (opWait: number) => `${site.substring(0, site.length - 2)} - ${_wait + opWait}`
      }),
    (error) => {
      const index = getTopIndex(abortAction)(site)
      checkLog(`mapper2-index = ${index} - ng - ${error}`)
      return `${site.substring(0, site.length - 2)} - ng - ${error}`
    }
  )

const flatternEitherStringArray = (
  a: readonly E.Either<string, string>[],
  b: readonly E.Either<string, string>[]
) => {
  if (b.length == 2) {
    const b0 = getWaitFromMapper1Message(E.isRight(b[0]) ? b[0].right : b[0].left)
    const b1 = getWaitFromMapper1Message(E.isRight(b[1]) ? b[1].right : b[1].left)
    if (b0 > b1) {
      const result = [...a, b[0]]
      return result
    } else {
      const result = [...a, b[1]]
      return result
    }
  } else {
    const result = [...a, ...b]
    return result
  }
}

const map_E_flatten_array =
  (
    flatten: (
      a: readonly E.Either<string, string>[],
      b: readonly E.Either<string, string>[]
    ) => readonly E.Either<string, string>[]
  ) =>
  (messageArrayArray: readonly (readonly E.Either<string, string>[])[]) => {
    const converted_message = pipe(messageArrayArray, RA.reduce([], flatten))
    return converted_message
  }

const throwError = (abortController: AbortController) => (error: unknown) => {
  const message = error && typeof error === 'object' && 'message' in error ? error.message : error
  if (abortController.signal.aborted) {
    throw message
  } else {
    return String(message)
  }
}

const mapper1_throwError =
  (topIndex: O.Option<number>) => (abortController: AbortController) => (site: string) => {
    if (abortController.signal.aborted) {
      checkLog(`${site} - aborted`)
      return TE.left(`${site} - aborted`)
    }

    const abortEvent = () => {
      checkLog(`${site} - aborted - from event`)
    }
    abortController.signal.addEventListener('abort', abortEvent)
    function removeEvent() {
      abortController.signal.removeEventListener('abort', abortEvent)
    }
    const abortAction = () => {
      removeEvent()
      abortController.abort()
    }

    const action = pipe(
      topIndex,
      O.match(
        () =>
          pipe(
            TE.tryCatch(
              async () => pipe(site, getLastIndex(abortAction), getWaitNumber, do_sleep1(site)),
              (error) => throwError(abortController)(error)
            ),
            TE.flattenW
          ),
        (topIndex) =>
          pipe(
            TE.tryCatch(
              async () => {
                const lastIndex = getLastIndex(abortAction)(site)
                checkIndex(abortAction)(site, topIndex, lastIndex)

                const action = pipe(lastIndex, getWaitNumber, do_sleep1(site))
                return action
              },
              (error) => throwError(abortController)(error)
            ),
            TE.flattenW
          )
      )
    )
    const actionWithRemoveEvent = pipe(
      action,
      TE.fold(
        (error) => {
          removeEvent()
          return TE.left(error)
        },
        (result) => {
          removeEvent()
          return TE.right(result)
        }
      )
    )

    return actionWithRemoveEvent
  }

function action2_and_1(
  abortController: AbortController,
  action2: TE.TaskEither<string, (opWait: number) => string>,
  removeEvent: Function,
  action1: () => T.Task<readonly E.Either<string, string>[]>,
  op: (a: number, b: number) => number
) {
  const action = pipe(
    action2,
    TE.foldW(
      (error2) => {
        removeEvent()
        return TE.left(throwError(abortController)(error2))
      },
      (message2Function) => async () => {
        const resultArray1 = await action1()()
        const convertedResultArray1 = pipe(
          resultArray1,
          RA.map((result1) =>
            pipe(
              result1,
              E.fold(
                (error1) => E.right(getWaitFromMapper1Message(error1)),
                (message1) => E.right(getWaitFromMapper1Message(message1))
              )
            )
          )
        )
        const separateResult = pipe(convertedResultArray1, RA.separate)
        const opWait1 = pipe(separateResult.right, RA.reduce(0, op))
        removeEvent()
        return E.right(message2Function(opWait1))
      }
    )
  )
  return action
}

const mapper2_limit2 = (abortController: AbortController) => (sites: string[]) => {
  const site = sites[0]
  const topIndex = getTopIndex(abortController.abort)(site)

  const abortEvent = () => {
    checkLog(`mapper2-index = ${topIndex} - aborted - from event`)
  }
  abortController.signal.addEventListener('abort', abortEvent)
  function removeEvent() {
    abortController.signal.removeEventListener('abort', abortEvent)
  }
  const abortAction = () => {
    removeEvent()
    abortController.abort()
  }

  if (abortController.signal.aborted) {
    checkLog(`mapper2-index = ${topIndex} - aborted`)
    removeEvent()
    return TE.left(`mapper2-index = ${topIndex} - aborted`)
  }

  const action2 = pipe(
    TE.tryCatch(
      async () => pipe(topIndex, getWaitNumber, do_sleep2(site, abortAction)),
      (error) => throwError(abortController)(error)
    ),
    TE.flattenW
  )

  const limit = 2
  const chunkSites = pipe(sites, RA.chunksOf(limit))
  const action = action2_and_1(
    abortController,
    action2,
    removeEvent,
    () => {
      const action1 = pipe(
        chunkSites,
        RA.map(RA.traverse(T.ApplicativePar)(mapper1_throwError(O.some(topIndex))(abortController)))
      )

      const flatAction1 = pipe(
        action1,
        RA.sequence(T.ApplicativeSeq),
        T.map(map_E_flatten_array(flatternEitherStringArray))
      )

      return flatAction1
    },
    sum
  )
  return action
}

describe('fp-ts 非同期処理2つの配列', () => {
  test('使い方5ー全部同時実行からの最大2個まで同時実行', testOptions, async () => {
    const sites = [
      // ['https://test/1/1'],
      // ['https://test/2/1', 'https://test/2/2']
      //   ['https://test/3/1', 'https://test/3/2', 'https://test/3/3']
      // ['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
      // [
      //   'https://test/5/1',
      //   'https://test/5/2',
      //   'https://test/5/3',
      //   'https://test/5/4',
      //   'https://test/5/5'
      // ]
      [
        'https://test/7/1',
        'https://test/7/2',
        'https://test/7/3',
        'https://test/7/4',
        'https://test/7/5',
        'https://test/7/6',
        'https://test/7/7'
      ]
    ]

    const abortController = new AbortController()

    checkLog('[start]')

    try {
      const action = pipe(sites, RA.traverse(T.ApplicativePar)(mapper2_limit2(abortController)))
      const result = await action()
      const separateResult = pipe(result, RA.separate)

      console.log(separateResult)
    } catch (error) {
      console.log(error)
    }
  })
})

実行結果

time : 2025-01-27T15:37:31.223Z - text: [start]
time : 2025-01-27T15:37:38.242Z - text: mapper2-index = 7
time : 2025-01-27T15:37:39.254Z - text: https://test/7/1
time : 2025-01-27T15:37:40.253Z - text: https://test/7/2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T15:37:40.253Z - text: https://test/7/4 - abort - (topIndex + lastIndex) >= 10
time : 2025-01-27T15:37:40.255Z - text: mapper2-index = 7 - aborted - from event
time : 2025-01-27T15:37:40.255Z - text: https://test/7/3 - aborted - from event
time : 2025-01-27T15:37:40.255Z - text: https://test/7/5 - aborted - from event
time : 2025-01-27T15:37:40.255Z - text: https://test/7/6 - aborted - from event
time : 2025-01-27T15:37:40.255Z - text: https://test/7/7 - aborted - from event
time : 2025-01-27T15:37:40.256Z - text: https://test/7/3 - abort - (topIndex + lastIndex) >= 10
https://test/7/4 - abort - (topIndex + lastIndex) >= 10

実行結果(※中断データを除外した場合)

time : 2025-01-27T15:37:54.906Z - text: [start]
time : 2025-01-27T15:38:01.911Z - text: mapper2-index = 7
time : 2025-01-27T15:38:02.931Z - text: https://test/7/1
time : 2025-01-27T15:38:03.930Z - text: https://test/7/2 - ng - 2000 - Sleep NG after wait
{ left: [], right: [ 'https://test/7 - 9000' ] }

配列が2つの場合(複雑)

全部同時実行からの全部同時実行

import { describe, test } from 'vitest'
import * as TE from 'fp-ts/TaskEither'
import { pipe } from 'fp-ts/lib/function'
import * as RA from 'fp-ts/ReadonlyArray'
import * as E from 'fp-ts/Either'
import * as O from 'fp-ts/Option'
import * as T from 'fp-ts/Task'

const sleep = (wait: number) =>
  new Promise((resolve, reject) => {
    const waitSeconds = wait / 1000
    if (waitSeconds % 3 === 0) {
      reject('Sleep NG before wait')
    } else if (waitSeconds % 2 === 0) {
      setTimeout(() => reject(`${wait} - Sleep NG after wait`), wait)
    } else {
      setTimeout(() => resolve(wait), wait)
    }
  }) as Promise<number>

function checkLog(text: string) {
  console.log(`time : ${new Date().toISOString()} - text: ${text}`)
}

const testOptions = {
  timeout: 1000 * 100
}

const getLastIndex = (abortAction: Function) => (site: string) => {
  const index = site.slice(-1)
  if (Number.isNaN(Number(index))) {
    checkLog(`${site} - abort - LastIndex is NaN`)
    abortAction()
    throw new Error(`${site} - abort - LastIndex is NaN`)
  }
  if (site[site.length - 2] !== '/') {
    checkLog(`${site} - abort - LastIndex >= 10`)
    abortAction()
    throw new Error(`${site} - abort - LastIndex >= 10`)
  }
  return Number(index)
}
const getTopIndex = (abortAction: Function) => (site: string) => {
  const index = site.substring(site.length - 3, site.length - 2)
  if (Number.isNaN(Number(index))) {
    checkLog(`${site} - abort - TopIndex is NaN`)
    abortAction()
    throw new Error(`${site} - abort - TopIndex is NaN`)
  }
  return Number(index)
}
const checkIndex =
  (abortAction: Function) => (site: string, topIndex: number, lastIndex: number) => {
    if (topIndex + lastIndex >= 10) {
      checkLog(`${site} - abort - (topIndex + lastIndex) >= 10`)
      abortAction()
      throw new Error(`${site} - abort - (topIndex + lastIndex) >= 10`)
    }
  }

function getWaitNumber(index: number) {
  const wait = index * 1000
  return wait
}

const do_sleep1 = (site: string) => (wait: number) =>
  TE.tryCatch(
    () =>
      sleep(wait).then((_wait) => {
        checkLog(site)
        return `${site} - ok - ${_wait}`
      }),
    (error) => {
      checkLog(`${site} - ng - ${error}`)
      return `${site} - ng - ${error}`
    }
  )
function getWaitFromMapper1Message(mapper1Message: string) {
  const mapper1OkWaitString = mapper1Message.substring(mapper1Message.indexOf('ok') + 5)
  const mapper1OkWait = Number(mapper1OkWaitString)
  if (isNaN(mapper1OkWait)) {
    const mapper1NgWaitString = mapper1Message.substring(mapper1Message.indexOf('ng') + 5)
    const mapper1NgWaitStringWithoutErrorMessage = mapper1NgWaitString.substring(
      0,
      mapper1NgWaitString.indexOf('-')
    )
    const mapper1NgWait = Number(mapper1NgWaitStringWithoutErrorMessage)
    return isNaN(mapper1NgWait) ? 0 : mapper1NgWait
  } else {
    return mapper1OkWait
  }
}

const do_sleep2 = (site: string, abortAction: Function) => (wait: number) =>
  TE.tryCatch(
    () =>
      sleep(wait).then((_wait) => {
        const index = getTopIndex(abortAction)(site)
        checkLog(`mapper2-index = ${index}`)
        return (opWait: number) => `${site.substring(0, site.length - 2)} - ${_wait + opWait}`
      }),
    (error) => {
      const index = getTopIndex(abortAction)(site)
      checkLog(`mapper2-index = ${index} - ng - ${error}`)
      return `${site.substring(0, site.length - 2)} - ng - ${error}`
    }
  )

const throwError = (abortController: AbortController) => (error: unknown) => {
  const message = error && typeof error === 'object' && 'message' in error ? error.message : error
  if (abortController.signal.aborted) {
    throw message
  } else {
    return String(message)
  }
}

const mapper1_throwError =
  (topIndex: O.Option<number>) => (abortController: AbortController) => (site: string) => {
    if (abortController.signal.aborted) {
      checkLog(`${site} - aborted`)
      return TE.left(`${site} - aborted`)
    }

    const abortEvent = () => {
      checkLog(`${site} - aborted - from event`)
    }
    abortController.signal.addEventListener('abort', abortEvent)
    function removeEvent() {
      abortController.signal.removeEventListener('abort', abortEvent)
    }
    const abortAction = () => {
      removeEvent()
      abortController.abort()
    }

    const action = pipe(
      topIndex,
      O.match(
        () =>
          pipe(
            TE.tryCatch(
              async () => pipe(site, getLastIndex(abortAction), getWaitNumber, do_sleep1(site)),
              (error) => throwError(abortController)(error)
            ),
            TE.flattenW
          ),
        (topIndex) =>
          pipe(
            TE.tryCatch(
              async () => {
                const lastIndex = getLastIndex(abortAction)(site)
                checkIndex(abortAction)(site, topIndex, lastIndex)

                const action = pipe(lastIndex, getWaitNumber, do_sleep1(site))
                return action
              },
              (error) => throwError(abortController)(error)
            ),
            TE.flattenW
          )
      )
    )
    const actionWithRemoveEvent = pipe(
      action,
      TE.fold(
        (error) => {
          removeEvent()
          return TE.left(error)
        },
        (result) => {
          removeEvent()
          return TE.right(result)
        }
      )
    )

    return actionWithRemoveEvent
  }

function action2_and_1(
  abortController: AbortController,
  action2: TE.TaskEither<string, (opWait: number) => string>,
  removeEvent: Function,
  action1: () => T.Task<readonly E.Either<string, string>[]>,
  op: (a: number, b: number) => number
) {
  const action = pipe(
    action2,
    TE.foldW(
      (error2) => {
        removeEvent()
        return TE.left(throwError(abortController)(error2))
      },
      (message2Function) => async () => {
        const resultArray1 = await action1()()
        const convertedResultArray1 = pipe(
          resultArray1,
          RA.map((result1) =>
            pipe(
              result1,
              E.fold(
                (error1) => E.right(getWaitFromMapper1Message(error1)),
                (message1) => E.right(getWaitFromMapper1Message(message1))
              )
            )
          )
        )
        const separateResult = pipe(convertedResultArray1, RA.separate)
        const opWait1 = pipe(separateResult.right, RA.reduce(0, op))
        removeEvent()
        return E.right(message2Function(opWait1))
      }
    )
  )
  return action
}

const mapper2_par = (abortController: AbortController) => (sites: string[]) => {
  const site = sites[0]
  const topIndex = getTopIndex(abortController.abort)(site)

  if (abortController.signal.aborted) {
    checkLog(`mapper2-index = ${topIndex} - aborted`)
    return TE.left(`mapper2-index = ${topIndex} - aborted`)
  }

  const abortEvent = () => {
    checkLog(`mapper2-index = ${topIndex} - aborted - from event`)
  }
  abortController.signal.addEventListener('abort', abortEvent)
  function removeEvent() {
    abortController.signal.removeEventListener('abort', abortEvent)
  }
  const abortAction = () => {
    removeEvent()
    abortController.abort()
  }

  const action2 = pipe(
    TE.tryCatch(
      async () => pipe(topIndex, getWaitNumber, do_sleep2(site, abortAction)),
      (error) => throwError(abortController)(error)
    ),
    TE.flattenW
  )

  const action = action2_and_1(
    abortController,
    action2,
    removeEvent,
    () =>
      pipe(
        sites,
        RA.traverse(T.ApplicativePar)(mapper1_throwError(O.some(topIndex))(abortController))
      ),
    Math.max
  )
  return pipe(
    action as any,
    TE.fold(
      (error) => {
        removeEvent()
        return TE.left(error)
      },
      (result) => {
        removeEvent()
        return TE.right(result)
      }
    )
  )
}

describe('fp-ts 非同期処理2つの配列', () => {
  test('使い方2ー全部同時実行からの全部同時実行', testOptions, async () => {
    const sites = [
      ['https://test/1/1'],
      ['https://test/2/1', 'https://test/2/2'],
      ['https://test/3/1', 'https://test/3/2', 'https://test/3/3'],
      ['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
      [
        'https://test/5/1',
        'https://test/5/2',
        'https://test/5/3',
        'https://test/5/4',
        'https://test/5/5'
      ],
      [
        'https://test/7/1',
        'https://test/7/2',
        'https://test/7/3',
        'https://test/7/4',
        'https://test/7/5',
        'https://test/7/6',
        'https://test/7/7'
      ]
    ]

    const abortController = new AbortController()

    checkLog('[start]')

    try {
      const action = pipe(sites, RA.traverse(T.ApplicativePar)(mapper2_par(abortController)))
      const result = await action()
      const separateResult = pipe(result, RA.separate)

      console.log(separateResult)
    } catch (error) {
      console.log(error)
    }
  })
})

実行結果

time : 2025-01-27T13:50:16.315Z - text: [start]
time : 2025-01-27T13:50:16.318Z - text: mapper2-index = 3 - ng - Sleep NG before wait
time : 2025-01-27T13:50:17.327Z - text: mapper2-index = 1
time : 2025-01-27T13:50:18.330Z - text: mapper2-index = 2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T13:50:18.331Z - text: https://test/1/1
time : 2025-01-27T13:50:20.333Z - text: mapper2-index = 4 - ng - 4000 - Sleep NG after wait
time : 2025-01-27T13:50:21.325Z - text: mapper2-index = 5
time : 2025-01-27T13:50:21.326Z - text: https://test/5/5 - abort - (topIndex + lastIndex) >= 10
time : 2025-01-27T13:50:21.327Z - text: mapper2-index = 5 - aborted - from event
time : 2025-01-27T13:50:21.327Z - text: mapper2-index = 7 - aborted - from event
time : 2025-01-27T13:50:21.327Z - text: https://test/5/1 - aborted - from event
time : 2025-01-27T13:50:21.327Z - text: https://test/5/2 - aborted - from event
time : 2025-01-27T13:50:21.327Z - text: https://test/5/3 - aborted - from event
time : 2025-01-27T13:50:21.327Z - text: https://test/5/4 - aborted - from event
time : 2025-01-27T13:50:21.327Z - text: https://test/5/3 - ng - Sleep NG before wait
https://test/5/5 - abort - (topIndex + lastIndex) >= 10

実行結果(※中断データを除外した場合)

time : 2025-01-27T13:59:36.765Z - text: [start]
time : 2025-01-27T13:59:36.770Z - text: mapper2-index = 3 - ng - Sleep NG before wait
time : 2025-01-27T13:59:37.782Z - text: mapper2-index = 1
time : 2025-01-27T13:59:38.784Z - text: mapper2-index = 2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T13:59:38.787Z - text: https://test/1/1
time : 2025-01-27T13:59:40.773Z - text: mapper2-index = 4 - ng - 4000 - Sleep NG after wait
time : 2025-01-27T13:59:41.779Z - text: mapper2-index = 5
time : 2025-01-27T13:59:41.781Z - text: https://test/5/3 - ng - Sleep NG before wait
time : 2025-01-27T13:59:42.784Z - text: https://test/5/1
time : 2025-01-27T13:59:43.772Z - text: mapper2-index = 7
time : 2025-01-27T13:59:43.787Z - text: https://test/5/2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T13:59:44.774Z - text: https://test/7/1
time : 2025-01-27T13:59:45.776Z - text: https://test/7/2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T13:59:45.791Z - text: https://test/5/4 - ng - 4000 - Sleep NG after wait
{
  left: [
    'https://test/2 - ng - 2000 - Sleep NG after wait',
    'https://test/3 - ng - Sleep NG before wait',
    'https://test/4 - ng - 4000 - Sleep NG after wait'
  ],
  right: [
    'https://test/1 - 2000',
    'https://test/5 - 9000',
    'https://test/7 - 9000'
  ]
}

全部同時実行からの直列実行

import { describe, test } from 'vitest'
import * as TE from 'fp-ts/TaskEither'
import { pipe } from 'fp-ts/lib/function'
import * as RA from 'fp-ts/ReadonlyArray'
import * as E from 'fp-ts/Either'
import * as O from 'fp-ts/Option'
import * as T from 'fp-ts/Task'

const sleep = (wait: number) =>
  new Promise((resolve, reject) => {
    const waitSeconds = wait / 1000
    if (waitSeconds % 3 === 0) {
      reject('Sleep NG before wait')
    } else if (waitSeconds % 2 === 0) {
      setTimeout(() => reject(`${wait} - Sleep NG after wait`), wait)
    } else {
      setTimeout(() => resolve(wait), wait)
    }
  }) as Promise<number>

function checkLog(text: string) {
  console.log(`time : ${new Date().toISOString()} - text: ${text}`)
}

const testOptions = {
  timeout: 1000 * 100
}

function sum(a: number, b: number) {
  return a + b
}

const getLastIndex = (abortAction: Function) => (site: string) => {
  const index = site.slice(-1)
  if (Number.isNaN(Number(index))) {
    checkLog(`${site} - abort - LastIndex is NaN`)
    abortAction()
    throw new Error(`${site} - abort - LastIndex is NaN`)
  }
  if (site[site.length - 2] !== '/') {
    checkLog(`${site} - abort - LastIndex >= 10`)
    abortAction()
    throw new Error(`${site} - abort - LastIndex >= 10`)
  }
  return Number(index)
}
const getTopIndex = (abortAction: Function) => (site: string) => {
  const index = site.substring(site.length - 3, site.length - 2)
  if (Number.isNaN(Number(index))) {
    checkLog(`${site} - abort - TopIndex is NaN`)
    abortAction()
    throw new Error(`${site} - abort - TopIndex is NaN`)
  }
  return Number(index)
}
const checkIndex =
  (abortAction: Function) => (site: string, topIndex: number, lastIndex: number) => {
    if (topIndex + lastIndex >= 10) {
      checkLog(`${site} - abort - (topIndex + lastIndex) >= 10`)
      abortAction()
      throw new Error(`${site} - abort - (topIndex + lastIndex) >= 10`)
    }
  }

function getWaitNumber(index: number) {
  const wait = index * 1000
  return wait
}

const do_sleep1 = (site: string) => (wait: number) =>
  TE.tryCatch(
    () =>
      sleep(wait).then((_wait) => {
        checkLog(site)
        return `${site} - ok - ${_wait}`
      }),
    (error) => {
      checkLog(`${site} - ng - ${error}`)
      return `${site} - ng - ${error}`
    }
  )
function getWaitFromMapper1Message(mapper1Message: string) {
  const mapper1OkWaitString = mapper1Message.substring(mapper1Message.indexOf('ok') + 5)
  const mapper1OkWait = Number(mapper1OkWaitString)
  if (isNaN(mapper1OkWait)) {
    const mapper1NgWaitString = mapper1Message.substring(mapper1Message.indexOf('ng') + 5)
    const mapper1NgWaitStringWithoutErrorMessage = mapper1NgWaitString.substring(
      0,
      mapper1NgWaitString.indexOf('-')
    )
    const mapper1NgWait = Number(mapper1NgWaitStringWithoutErrorMessage)
    return isNaN(mapper1NgWait) ? 0 : mapper1NgWait
  } else {
    return mapper1OkWait
  }
}

const do_sleep2 = (site: string, abortAction: Function) => (wait: number) =>
  TE.tryCatch(
    () =>
      sleep(wait).then((_wait) => {
        const index = getTopIndex(abortAction)(site)
        checkLog(`mapper2-index = ${index}`)
        return (opWait: number) => `${site.substring(0, site.length - 2)} - ${_wait + opWait}`
      }),
    (error) => {
      const index = getTopIndex(abortAction)(site)
      checkLog(`mapper2-index = ${index} - ng - ${error}`)
      return `${site.substring(0, site.length - 2)} - ng - ${error}`
    }
  )

const throwError = (abortController: AbortController) => (error: unknown) => {
  const message = error && typeof error === 'object' && 'message' in error ? error.message : error
  if (abortController.signal.aborted) {
    throw message
  } else {
    return String(message)
  }
}

const mapper1_throwError =
  (topIndex: O.Option<number>) => (abortController: AbortController) => (site: string) => {
    if (abortController.signal.aborted) {
      checkLog(`${site} - aborted`)
      return TE.left(`${site} - aborted`)
    }

    const abortEvent = () => {
      checkLog(`${site} - aborted - from event`)
    }
    abortController.signal.addEventListener('abort', abortEvent)
    function removeEvent() {
      abortController.signal.removeEventListener('abort', abortEvent)
    }
    const abortAction = () => {
      removeEvent()
      abortController.abort()
    }

    const action = pipe(
      topIndex,
      O.match(
        () =>
          pipe(
            TE.tryCatch(
              async () => pipe(site, getLastIndex(abortAction), getWaitNumber, do_sleep1(site)),
              (error) => throwError(abortController)(error)
            ),
            TE.flattenW
          ),
        (topIndex) =>
          pipe(
            TE.tryCatch(
              async () => {
                const lastIndex = getLastIndex(abortAction)(site)
                checkIndex(abortAction)(site, topIndex, lastIndex)

                const action = pipe(lastIndex, getWaitNumber, do_sleep1(site))
                return action
              },
              (error) => throwError(abortController)(error)
            ),
            TE.flattenW
          )
      )
    )
    const actionWithRemoveEvent = pipe(
      action,
      TE.fold(
        (error) => {
          removeEvent()
          return TE.left(error)
        },
        (result) => {
          removeEvent()
          return TE.right(result)
        }
      )
    )

    return actionWithRemoveEvent
  }

function action2_and_1(
  abortController: AbortController,
  action2: TE.TaskEither<string, (opWait: number) => string>,
  removeEvent: Function,
  action1: () => T.Task<readonly E.Either<string, string>[]>,
  op: (a: number, b: number) => number
) {
  const action = pipe(
    action2,
    TE.foldW(
      (error2) => {
        removeEvent()
        return TE.left(throwError(abortController)(error2))
      },
      (message2Function) => async () => {
        const resultArray1 = await action1()()
        const convertedResultArray1 = pipe(
          resultArray1,
          RA.map((result1) =>
            pipe(
              result1,
              E.fold(
                (error1) => E.right(getWaitFromMapper1Message(error1)),
                (message1) => E.right(getWaitFromMapper1Message(message1))
              )
            )
          )
        )
        const separateResult = pipe(convertedResultArray1, RA.separate)
        const opWait1 = pipe(separateResult.right, RA.reduce(0, op))
        removeEvent()
        return E.right(message2Function(opWait1))
      }
    )
  )
  return action
}

const mapper2_seq = (abortController: AbortController) => (sites: string[]) => {
  const site = sites[0]
  const topIndex = getTopIndex(abortController.abort)(site)

  const abortEvent = () => {
    checkLog(`mapper2-index = ${topIndex} - aborted - from event`)
  }
  abortController.signal.addEventListener('abort', abortEvent)
  function removeEvent() {
    abortController.signal.removeEventListener('abort', abortEvent)
  }
  const abortAction = () => {
    removeEvent()
    abortController.abort()
  }

  if (abortController.signal.aborted) {
    checkLog(`mapper2-index = ${topIndex} - aborted`)
    removeEvent()
    return TE.left(`mapper2-index = ${topIndex} - aborted`)
  }

  const action2 = pipe(
    TE.tryCatch(
      async () => pipe(topIndex, getWaitNumber, do_sleep2(site, abortAction)),
      (error) => throwError(abortController)(error)
    ),
    TE.flattenW
  )

  const action = action2_and_1(
    abortController,
    action2,
    removeEvent,
    () =>
      pipe(
        sites,
        RA.traverse(T.ApplicativeSeq)(mapper1_throwError(O.some(topIndex))(abortController))
      ),
    sum
  )
  return action
}

describe('fp-ts 非同期処理2つの配列', () => {
  test('使い方4ー全部同時実行からの直列実行', testOptions, async () => {
    const sites = [
      ['https://test/1/1'],
      ['https://test/2/1', 'https://test/2/2'],
      ['https://test/3/1', 'https://test/3/2', 'https://test/3/3'],
      ['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
      [
        'https://test/5/1',
        'https://test/5/2',
        'https://test/5/3',
        'https://test/5/4',
        'https://test/5/5'
      ],
      [
        'https://test/7/1',
        'https://test/7/2',
        'https://test/7/3',
        'https://test/7/4',
        'https://test/7/5',
        'https://test/7/6',
        'https://test/7/7'
      ]
    ]

    const abortController = new AbortController()

    checkLog('[start]')

    try {
      const action = pipe(sites, RA.traverse(T.ApplicativePar)(mapper2_seq(abortController)))
      const result = await action()
      const separateResult = pipe(result, RA.separate)

      console.log(separateResult)
    } catch (error) {
      console.log(error)
    }
  })
})

実行結果

time : 2025-01-27T14:02:06.786Z - text: [start]
time : 2025-01-27T14:02:06.790Z - text: mapper2-index = 3 - ng - Sleep NG before wait
time : 2025-01-27T14:02:07.792Z - text: mapper2-index = 1
time : 2025-01-27T14:02:08.794Z - text: mapper2-index = 2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T14:02:08.795Z - text: https://test/1/1
time : 2025-01-27T14:02:10.799Z - text: mapper2-index = 4 - ng - 4000 - Sleep NG after wait
time : 2025-01-27T14:02:11.795Z - text: mapper2-index = 5
time : 2025-01-27T14:02:12.812Z - text: https://test/5/1
time : 2025-01-27T14:02:13.797Z - text: mapper2-index = 7
time : 2025-01-27T14:02:14.803Z - text: https://test/7/1
time : 2025-01-27T14:02:14.817Z - text: https://test/5/2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T14:02:14.818Z - text: https://test/5/3 - ng - Sleep NG before wait
time : 2025-01-27T14:02:16.810Z - text: https://test/7/2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T14:02:16.811Z - text: https://test/7/3 - abort - (topIndex + lastIndex) >= 10
time : 2025-01-27T14:02:16.814Z - text: mapper2-index = 5 - aborted - from event
time : 2025-01-27T14:02:16.815Z - text: mapper2-index = 7 - aborted - from event
time : 2025-01-27T14:02:16.815Z - text: https://test/5/4 - aborted - from event
time : 2025-01-27T14:02:16.815Z - text: https://test/5/5 - aborted - from event
time : 2025-01-27T14:02:16.815Z - text: https://test/7/4 - aborted - from event
time : 2025-01-27T14:02:16.816Z - text: https://test/7/5 - aborted - from event
time : 2025-01-27T14:02:16.816Z - text: https://test/7/6 - aborted - from event
time : 2025-01-27T14:02:16.816Z - text: https://test/7/7 - aborted - from event
https://test/7/3 - abort - (topIndex + lastIndex) >= 10

実行結果(※中断データを除外した場合)

time : 2025-01-27T14:06:20.077Z - text: [start]
time : 2025-01-27T14:06:20.081Z - text: mapper2-index = 3 - ng - Sleep NG before wait
time : 2025-01-27T14:06:21.091Z - text: mapper2-index = 1
time : 2025-01-27T14:06:22.083Z - text: mapper2-index = 2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T14:06:22.099Z - text: https://test/1/1
time : 2025-01-27T14:06:24.091Z - text: mapper2-index = 4 - ng - 4000 - Sleep NG after wait
time : 2025-01-27T14:06:25.093Z - text: mapper2-index = 5
time : 2025-01-27T14:06:26.098Z - text: https://test/5/1
time : 2025-01-27T14:06:27.086Z - text: mapper2-index = 7
time : 2025-01-27T14:06:28.095Z - text: https://test/7/1
time : 2025-01-27T14:06:28.098Z - text: https://test/5/2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T14:06:28.099Z - text: https://test/5/3 - ng - Sleep NG before wait
time : 2025-01-27T14:06:30.104Z - text: https://test/7/2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T14:06:32.110Z - text: https://test/5/4 - ng - 4000 - Sleep NG after wait
{
  left: [
    'https://test/2 - ng - 2000 - Sleep NG after wait',
    'https://test/3 - ng - Sleep NG before wait',
    'https://test/4 - ng - 4000 - Sleep NG after wait'
  ],
  right: [
    'https://test/1 - 2000',
    'https://test/5 - 12000',
    'https://test/7 - 10000'
  ]
}

全部同時実行からの最大2個まで同時実行

import { describe, test } from 'vitest'
import * as TE from 'fp-ts/TaskEither'
import { pipe } from 'fp-ts/lib/function'
import * as RA from 'fp-ts/ReadonlyArray'
import * as E from 'fp-ts/Either'
import * as O from 'fp-ts/Option'
import * as T from 'fp-ts/Task'

const sleep = (wait: number) =>
  new Promise((resolve, reject) => {
    const waitSeconds = wait / 1000
    if (waitSeconds % 3 === 0) {
      reject('Sleep NG before wait')
    } else if (waitSeconds % 2 === 0) {
      setTimeout(() => reject(`${wait} - Sleep NG after wait`), wait)
    } else {
      setTimeout(() => resolve(wait), wait)
    }
  }) as Promise<number>

function checkLog(text: string) {
  console.log(`time : ${new Date().toISOString()} - text: ${text}`)
}

const testOptions = {
  timeout: 1000 * 100
}

function sum(a: number, b: number) {
  return a + b
}

const getLastIndex = (abortAction: Function) => (site: string) => {
  const index = site.slice(-1)
  if (Number.isNaN(Number(index))) {
    checkLog(`${site} - abort - LastIndex is NaN`)
    abortAction()
    throw new Error(`${site} - abort - LastIndex is NaN`)
  }
  if (site[site.length - 2] !== '/') {
    checkLog(`${site} - abort - LastIndex >= 10`)
    abortAction()
    throw new Error(`${site} - abort - LastIndex >= 10`)
  }
  return Number(index)
}
const getTopIndex = (abortAction: Function) => (site: string) => {
  const index = site.substring(site.length - 3, site.length - 2)
  if (Number.isNaN(Number(index))) {
    checkLog(`${site} - abort - TopIndex is NaN`)
    abortAction()
    throw new Error(`${site} - abort - TopIndex is NaN`)
  }
  return Number(index)
}
const checkIndex =
  (abortAction: Function) => (site: string, topIndex: number, lastIndex: number) => {
    if (topIndex + lastIndex >= 10) {
      checkLog(`${site} - abort - (topIndex + lastIndex) >= 10`)
      abortAction()
      throw new Error(`${site} - abort - (topIndex + lastIndex) >= 10`)
    }
  }

function getWaitNumber(index: number) {
  const wait = index * 1000
  return wait
}

const do_sleep1 = (site: string) => (wait: number) =>
  TE.tryCatch(
    () =>
      sleep(wait).then((_wait) => {
        checkLog(site)
        return `${site} - ok - ${_wait}`
      }),
    (error) => {
      checkLog(`${site} - ng - ${error}`)
      return `${site} - ng - ${error}`
    }
  )
function getWaitFromMapper1Message(mapper1Message: string) {
  const mapper1OkWaitString = mapper1Message.substring(mapper1Message.indexOf('ok') + 5)
  const mapper1OkWait = Number(mapper1OkWaitString)
  if (isNaN(mapper1OkWait)) {
    const mapper1NgWaitString = mapper1Message.substring(mapper1Message.indexOf('ng') + 5)
    const mapper1NgWaitStringWithoutErrorMessage = mapper1NgWaitString.substring(
      0,
      mapper1NgWaitString.indexOf('-')
    )
    const mapper1NgWait = Number(mapper1NgWaitStringWithoutErrorMessage)
    return isNaN(mapper1NgWait) ? 0 : mapper1NgWait
  } else {
    return mapper1OkWait
  }
}

const do_sleep2 = (site: string, abortAction: Function) => (wait: number) =>
  TE.tryCatch(
    () =>
      sleep(wait).then((_wait) => {
        const index = getTopIndex(abortAction)(site)
        checkLog(`mapper2-index = ${index}`)
        return (opWait: number) => `${site.substring(0, site.length - 2)} - ${_wait + opWait}`
      }),
    (error) => {
      const index = getTopIndex(abortAction)(site)
      checkLog(`mapper2-index = ${index} - ng - ${error}`)
      return `${site.substring(0, site.length - 2)} - ng - ${error}`
    }
  )

const flatternEitherStringArray = (
  a: readonly E.Either<string, string>[],
  b: readonly E.Either<string, string>[]
) => {
  if (b.length == 2) {
    const b0 = getWaitFromMapper1Message(E.isRight(b[0]) ? b[0].right : b[0].left)
    const b1 = getWaitFromMapper1Message(E.isRight(b[1]) ? b[1].right : b[1].left)
    if (b0 > b1) {
      const result = [...a, b[0]]
      return result
    } else {
      const result = [...a, b[1]]
      return result
    }
  } else {
    const result = [...a, ...b]
    return result
  }
}

const map_E_flatten_array =
  (
    flatten: (
      a: readonly E.Either<string, string>[],
      b: readonly E.Either<string, string>[]
    ) => readonly E.Either<string, string>[]
  ) =>
  (messageArrayArray: readonly (readonly E.Either<string, string>[])[]) => {
    const converted_message = pipe(messageArrayArray, RA.reduce([], flatten))
    return converted_message
  }

const throwError = (abortController: AbortController) => (error: unknown) => {
  const message = error && typeof error === 'object' && 'message' in error ? error.message : error
  if (abortController.signal.aborted) {
    throw message
  } else {
    return String(message)
  }
}

const mapper1_throwError =
  (topIndex: O.Option<number>) => (abortController: AbortController) => (site: string) => {
    if (abortController.signal.aborted) {
      checkLog(`${site} - aborted`)
      return TE.left(`${site} - aborted`)
    }

    const abortEvent = () => {
      checkLog(`${site} - aborted - from event`)
    }
    abortController.signal.addEventListener('abort', abortEvent)
    function removeEvent() {
      abortController.signal.removeEventListener('abort', abortEvent)
    }
    const abortAction = () => {
      removeEvent()
      abortController.abort()
    }

    const action = pipe(
      topIndex,
      O.match(
        () =>
          pipe(
            TE.tryCatch(
              async () => pipe(site, getLastIndex(abortAction), getWaitNumber, do_sleep1(site)),
              (error) => throwError(abortController)(error)
            ),
            TE.flattenW
          ),
        (topIndex) =>
          pipe(
            TE.tryCatch(
              async () => {
                const lastIndex = getLastIndex(abortAction)(site)
                checkIndex(abortAction)(site, topIndex, lastIndex)

                const action = pipe(lastIndex, getWaitNumber, do_sleep1(site))
                return action
              },
              (error) => throwError(abortController)(error)
            ),
            TE.flattenW
          )
      )
    )
    const actionWithRemoveEvent = pipe(
      action,
      TE.fold(
        (error) => {
          removeEvent()
          return TE.left(error)
        },
        (result) => {
          removeEvent()
          return TE.right(result)
        }
      )
    )

    return actionWithRemoveEvent
  }

function action2_and_1(
  abortController: AbortController,
  action2: TE.TaskEither<string, (opWait: number) => string>,
  removeEvent: Function,
  action1: () => T.Task<readonly E.Either<string, string>[]>,
  op: (a: number, b: number) => number
) {
  const action = pipe(
    action2,
    TE.foldW(
      (error2) => {
        removeEvent()
        return TE.left(throwError(abortController)(error2))
      },
      (message2Function) => async () => {
        const resultArray1 = await action1()()
        const convertedResultArray1 = pipe(
          resultArray1,
          RA.map((result1) =>
            pipe(
              result1,
              E.fold(
                (error1) => E.right(getWaitFromMapper1Message(error1)),
                (message1) => E.right(getWaitFromMapper1Message(message1))
              )
            )
          )
        )
        const separateResult = pipe(convertedResultArray1, RA.separate)
        const opWait1 = pipe(separateResult.right, RA.reduce(0, op))
        removeEvent()
        return E.right(message2Function(opWait1))
      }
    )
  )
  return action
}

const mapper2_limit2 = (abortController: AbortController) => (sites: string[]) => {
  const site = sites[0]
  const topIndex = getTopIndex(abortController.abort)(site)

  const abortEvent = () => {
    checkLog(`mapper2-index = ${topIndex} - aborted - from event`)
  }
  abortController.signal.addEventListener('abort', abortEvent)
  function removeEvent() {
    abortController.signal.removeEventListener('abort', abortEvent)
  }
  const abortAction = () => {
    removeEvent()
    abortController.abort()
  }

  if (abortController.signal.aborted) {
    checkLog(`mapper2-index = ${topIndex} - aborted`)
    removeEvent()
    return TE.left(`mapper2-index = ${topIndex} - aborted`)
  }

  const action2 = pipe(
    TE.tryCatch(
      async () => pipe(topIndex, getWaitNumber, do_sleep2(site, abortAction)),
      (error) => throwError(abortController)(error)
    ),
    TE.flattenW
  )

  const limit = 2
  const chunkSites = pipe(sites, RA.chunksOf(limit))
  const action = action2_and_1(
    abortController,
    action2,
    removeEvent,
    () => {
      const action1 = pipe(
        chunkSites,
        RA.map(RA.traverse(T.ApplicativePar)(mapper1_throwError(O.some(topIndex))(abortController)))
      )

      const flatAction1 = pipe(
        action1,
        RA.sequence(T.ApplicativeSeq),
        T.map(map_E_flatten_array(flatternEitherStringArray))
      )

      return flatAction1
    },
    sum
  )
  return action
}

describe('fp-ts 非同期処理2つの配列', () => {
  test('使い方6ー全部同時実行からの最大2個まで同時実行', testOptions, async () => {
    const sites = [
      ['https://test/1/1'],
      ['https://test/2/1', 'https://test/2/2'],
      ['https://test/3/1', 'https://test/3/2', 'https://test/3/3'],
      ['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
      [
        'https://test/5/1',
        'https://test/5/2',
        'https://test/5/3',
        'https://test/5/4',
        'https://test/5/5'
      ],
      [
        'https://test/7/1',
        'https://test/7/2',
        'https://test/7/3',
        'https://test/7/4',
        'https://test/7/5',
        'https://test/7/6',
        'https://test/7/7'
      ]
    ]

    const abortController = new AbortController()

    checkLog('[start]')

    try {
      const action = pipe(sites, RA.traverse(T.ApplicativePar)(mapper2_limit2(abortController)))
      const result = await action()
      const separateResult = pipe(result, RA.separate)

      console.log(separateResult)
    } catch (error) {
      console.log(error)
    }
  })
})

実行結果

time : 2025-01-27T15:44:10.954Z - text: [start]
time : 2025-01-27T15:44:10.958Z - text: mapper2-index = 3 - ng - Sleep NG before wait
time : 2025-01-27T15:44:11.965Z - text: mapper2-index = 1
time : 2025-01-27T15:44:12.968Z - text: mapper2-index = 2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T15:44:12.969Z - text: https://test/1/1
time : 2025-01-27T15:44:14.962Z - text: mapper2-index = 4 - ng - 4000 - Sleep NG after wait
time : 2025-01-27T15:44:15.966Z - text: mapper2-index = 5
time : 2025-01-27T15:44:16.970Z - text: https://test/5/1
time : 2025-01-27T15:44:17.959Z - text: mapper2-index = 7
time : 2025-01-27T15:44:17.973Z - text: https://test/5/2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T15:44:17.975Z - text: https://test/5/3 - ng - Sleep NG before wait
time : 2025-01-27T15:44:18.977Z - text: https://test/7/1
time : 2025-01-27T15:44:19.966Z - text: https://test/7/2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T15:44:19.967Z - text: https://test/7/4 - abort - (topIndex + lastIndex) >= 10
time : 2025-01-27T15:44:19.971Z - text: mapper2-index = 5 - aborted - from event
time : 2025-01-27T15:44:19.972Z - text: mapper2-index = 7 - aborted - from event
time : 2025-01-27T15:44:19.972Z - text: https://test/5/4 - aborted - from event
time : 2025-01-27T15:44:19.972Z - text: https://test/5/5 - aborted - from event
time : 2025-01-27T15:44:19.972Z - text: https://test/7/3 - aborted - from event
time : 2025-01-27T15:44:19.973Z - text: https://test/7/5 - aborted - from event
time : 2025-01-27T15:44:19.973Z - text: https://test/7/6 - aborted - from event
time : 2025-01-27T15:44:19.973Z - text: https://test/7/7 - aborted - from event
time : 2025-01-27T15:44:19.974Z - text: https://test/7/3 - abort - (topIndex + lastIndex) >= 10
https://test/7/4 - abort - (topIndex + lastIndex) >= 10

実行結果(※中断データを除外した場合)

time : 2025-01-27T15:44:46.209Z - text: [start]
time : 2025-01-27T15:44:46.213Z - text: mapper2-index = 3 - ng - Sleep NG before wait
time : 2025-01-27T15:44:47.226Z - text: mapper2-index = 1
time : 2025-01-27T15:44:48.214Z - text: mapper2-index = 2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T15:44:48.234Z - text: https://test/1/1
time : 2025-01-27T15:44:50.222Z - text: mapper2-index = 4 - ng - 4000 - Sleep NG after wait
time : 2025-01-27T15:44:51.222Z - text: mapper2-index = 5
time : 2025-01-27T15:44:52.238Z - text: https://test/5/1
time : 2025-01-27T15:44:53.215Z - text: mapper2-index = 7
time : 2025-01-27T15:44:53.229Z - text: https://test/5/2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T15:44:53.232Z - text: https://test/5/3 - ng - Sleep NG before wait
time : 2025-01-27T15:44:54.226Z - text: https://test/7/1
time : 2025-01-27T15:44:55.232Z - text: https://test/7/2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T15:44:57.233Z - text: https://test/5/4 - ng - 4000 - Sleep NG after wait
{
  left: [
    'https://test/2 - ng - 2000 - Sleep NG after wait',
    'https://test/3 - ng - Sleep NG before wait',
    'https://test/4 - ng - 4000 - Sleep NG after wait'
  ],
  right: [
    'https://test/1 - 2000',
    'https://test/5 - 11000',
    'https://test/7 - 9000'
  ]
}
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?