0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

fp-tsで配列の配列の非同期処理

Posted at

前回p-mapで実施したものをfp-tsに置き換えたメモ
一部効率が落ちたものがある

前回

配列が1つの場合

全部同時実行

import { describe, test } from 'vitest'
import * as TE from 'fp-ts/TaskEither'
import { pipe } from 'fp-ts/lib/function'
import * as RA from 'fp-ts/ReadonlyArray'

const sleep = (wait: number) =>
  new Promise((resolve) => {
    setTimeout(() => resolve(wait), wait)
  }) as Promise<number>

function checkLog(text: string) {
  console.log(`time : ${new Date().toISOString()} - text: ${text}`)
}

const testOptions = {
  timeout: 1000 * 100
}

function getLastIndex(site: string) {
  const index = site.slice(-1)
  return Number(index)
}

function getWaitNumber(index: number) {
  const wait = index * 1000
  return wait
}

const do_sleep1 = (site: string) => (wait: number) =>
  TE.tryCatch(
    () =>
      sleep(wait).then((_wait) => {
        checkLog(site)
        return `${site} - ${_wait}`
      }),
    () => 'Error 1'
  )

const mapper1 = (site: string) => {
  const action = pipe(site, getLastIndex, getWaitNumber, do_sleep1(site))
  return action
}

describe('fp-ts 非同期処理1つの配列', () => {
  test('使い方1ー全部同時実行', testOptions, async () => {
    const sites = [
      'https://test/1',
      'https://test/2',
      'https://test/3',
      'https://test/4',
      'https://test/5'
    ]

    checkLog('[start]')

    const action = pipe(sites, RA.traverse(TE.ApplicativePar)(mapper1))
    const result = await action()

    console.log(result)
  })
})

実行結果

time : 2025-01-18T12:02:45.343Z - text: [start]
time : 2025-01-18T12:02:46.355Z - text: https://test/1
time : 2025-01-18T12:02:47.352Z - text: https://test/2
time : 2025-01-18T12:02:48.349Z - text: https://test/3
time : 2025-01-18T12:02:49.348Z - text: https://test/4
time : 2025-01-18T12:02:50.361Z - text: https://test/5
{
  _tag: 'Right',
  right: [
    'https://test/1 - 1000',
    'https://test/2 - 2000',
    'https://test/3 - 3000',
    'https://test/4 - 4000',
    'https://test/5 - 5000'
  ]
}

最大2個まで同時実行 ※一部性能劣化

import { describe, test } from 'vitest'
import * as TE from 'fp-ts/TaskEither'
import { pipe } from 'fp-ts/lib/function'
import * as RA from 'fp-ts/ReadonlyArray'

const sleep = (wait: number) =>
  new Promise((resolve) => {
    setTimeout(() => resolve(wait), wait)
  }) as Promise<number>

function checkLog(text: string) {
  console.log(`time : ${new Date().toISOString()} - text: ${text}`)
}

const testOptions = {
  timeout: 1000 * 100
}

function getLastIndex(site: string) {
  const index = site.slice(-1)
  return Number(index)
}

function getWaitNumber(index: number) {
  const wait = index * 1000
  return wait
}

const do_sleep1 = (site: string) => (wait: number) =>
  TE.tryCatch(
    () =>
      sleep(wait).then((_wait) => {
        checkLog(site)
        return `${site} - ${_wait}`
      }),
    () => 'Error 1'
  )

const mapper1 = (site: string) => {
  const action = pipe(site, getLastIndex, getWaitNumber, do_sleep1(site))
  return action
}

describe('fp-ts 非同期処理1つの配列', () => {
  test('使い方2ー最大2個まで同時実行', testOptions, async () => {
    const sites = [
      'https://test/1',
      'https://test/2',
      'https://test/3',
      'https://test/4',
      'https://test/5'
    ]

    checkLog('[start]')

    const limit = 2
    const chunkSites = pipe(sites, RA.chunksOf(limit))
    const action = pipe(chunkSites, RA.map(RA.traverse(TE.ApplicativePar)(mapper1)))
    const flatAction = pipe(action, RA.sequence(TE.ApplicativeSeq), TE.map(RA.flatten))
    const result = await flatAction()

    console.log(result)
  })
})

実行結果

time : 2025-01-18T12:04:28.452Z - text: [start]
time : 2025-01-18T12:04:29.456Z - text: https://test/1
time : 2025-01-18T12:04:30.463Z - text: https://test/2
time : 2025-01-18T12:04:33.470Z - text: https://test/3
time : 2025-01-18T12:04:34.473Z - text: https://test/4
time : 2025-01-18T12:04:39.480Z - text: https://test/5
{
  _tag: 'Right',
  right: [
    'https://test/1 - 1000',
    'https://test/2 - 2000',
    'https://test/3 - 3000',
    'https://test/4 - 4000',
    'https://test/5 - 5000'
  ]
}

配列が2つの場合(シンプル)

全部同時実行からの全部同時実行

import { describe, test } from 'vitest'
import * as TE from 'fp-ts/TaskEither'
import { pipe } from 'fp-ts/lib/function'
import * as RA from 'fp-ts/ReadonlyArray'
import { sequenceT } from 'fp-ts/Apply'

const sleep = (wait: number) =>
  new Promise((resolve) => {
    setTimeout(() => resolve(wait), wait)
  }) as Promise<number>

function checkLog(text: string) {
  console.log(`time : ${new Date().toISOString()} - text: ${text}`)
}

const testOptions = {
  timeout: 1000 * 100
}

function getLastIndex(site: string) {
  const index = site.slice(-1)
  return Number(index)
}
function getTopIndex(site: string) {
  const index = site.substring(site.length - 3, site.length - 2)
  return Number(index)
}

function getWaitNumber(index: number) {
  const wait = index * 1000
  return wait
}

const do_sleep1 = (site: string) => (wait: number) =>
  TE.tryCatch(
    () =>
      sleep(wait).then((_wait) => {
        checkLog(site)
        return `${site} - ${_wait}`
      }),
    () => 'Error 1'
  )
function getWaitFromMapper1Message(mapper1Message: string) {
  const mapper1WaitString = mapper1Message.substring(mapper1Message.indexOf('-') + 1)
  const mapper1Wait = Number(mapper1WaitString)
  return mapper1Wait
}

const do_sleep2 = (site: string) => (wait: number) =>
  TE.tryCatch(
    () =>
      sleep(wait).then((_wait) => {
        const index = getTopIndex(site)
        checkLog(`mapper2-index = ${index}`)
        return (opWait: number) => `${site.substring(0, site.length - 2)} - ${_wait + opWait}`
      }),
    () => 'Error 2'
  )

const op_TE_fold = (op: (a: number, b: number) => number) =>
  TE.foldW(
    (err) => TE.left(err),
    ([message2, message1Array]: [(sumWait: number) => string, readonly string[]]) => {
      const opWait = pipe(message1Array, RA.map(getWaitFromMapper1Message), RA.reduce(0, op))
      return TE.right(message2(opWait))
    }
  )

const map_TE_2 =
  (op: (a: number, b: number) => number) =>
  (message: TE.TaskEither<string, [(sumWait: number) => string, readonly string[]]>) => {
    const converted_message = pipe(message, op_TE_fold(op))
    return converted_message
  }

const mapper1 = (site: string) => {
  const action = pipe(site, getLastIndex, getWaitNumber, do_sleep1(site))
  return action
}

const mapper2_par = (sites: string[]) => {
  const site = sites[0]
  const action1 = pipe(site, getTopIndex, getWaitNumber, do_sleep2(site))
  const action2 = pipe(sites, RA.traverse(TE.ApplicativePar)(mapper1))
  const action = pipe(sequenceT(TE.ApplicativeSeq)(action1, action2), map_TE_2(Math.max))
  return action
}

describe('fp-ts 非同期処理2つの配列', () => {
  test('使い方1ー全部同時実行からの全部同時実行', testOptions, async () => {
    const sites = [
      // ['https://test/1/1'],
      // ['https://test/2/1', 'https://test/2/2']
      ['https://test/3/1', 'https://test/3/2', 'https://test/3/3']
      // ['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
      // [
      //   'https://test/5/1',
      //   'https://test/5/2',
      //   'https://test/5/3',
      //   'https://test/5/4',
      //   'https://test/5/5'
      // ]
    ]

    checkLog('[start]')

    const action = pipe(sites, RA.traverse(TE.ApplicativePar)(mapper2_par))
    const result = await action()

    console.log(result)
  })
})

実行結果

time : 2025-01-18T12:08:56.472Z - text: [start]
time : 2025-01-18T12:08:59.492Z - text: mapper2-index = 3
time : 2025-01-18T12:09:00.508Z - text: https://test/3/1
time : 2025-01-18T12:09:01.498Z - text: https://test/3/2
time : 2025-01-18T12:09:02.509Z - text: https://test/3/3
{ _tag: 'Right', right: [ 'https://test/3 - 6000' ] }

全部同時実行からの直列実行

import { describe, test } from 'vitest'
import * as TE from 'fp-ts/TaskEither'
import { pipe } from 'fp-ts/lib/function'
import * as RA from 'fp-ts/ReadonlyArray'
import { sequenceT } from 'fp-ts/Apply'

const sleep = (wait: number) =>
  new Promise((resolve) => {
    setTimeout(() => resolve(wait), wait)
  }) as Promise<number>

function checkLog(text: string) {
  console.log(`time : ${new Date().toISOString()} - text: ${text}`)
}

const testOptions = {
  timeout: 1000 * 100
}

function sum(a: number, b: number) {
  return a + b
}

function getLastIndex(site: string) {
  const index = site.slice(-1)
  return Number(index)
}
function getTopIndex(site: string) {
  const index = site.substring(site.length - 3, site.length - 2)
  return Number(index)
}

function getWaitNumber(index: number) {
  const wait = index * 1000
  return wait
}

const do_sleep1 = (site: string) => (wait: number) =>
  TE.tryCatch(
    () =>
      sleep(wait).then((_wait) => {
        checkLog(site)
        return `${site} - ${_wait}`
      }),
    () => 'Error 1'
  )
function getWaitFromMapper1Message(mapper1Message: string) {
  const mapper1WaitString = mapper1Message.substring(mapper1Message.indexOf('-') + 1)
  const mapper1Wait = Number(mapper1WaitString)
  return mapper1Wait
}

const do_sleep2 = (site: string) => (wait: number) =>
  TE.tryCatch(
    () =>
      sleep(wait).then((_wait) => {
        const index = getTopIndex(site)
        checkLog(`mapper2-index = ${index}`)
        return (opWait: number) => `${site.substring(0, site.length - 2)} - ${_wait + opWait}`
      }),
    () => 'Error 2'
  )

const op_TE_fold = (op: (a: number, b: number) => number) =>
  TE.foldW(
    (err) => TE.left(err),
    ([message2, message1Array]: [(sumWait: number) => string, readonly string[]]) => {
      const opWait = pipe(message1Array, RA.map(getWaitFromMapper1Message), RA.reduce(0, op))
      return TE.right(message2(opWait))
    }
  )

const map_TE_2 =
  (op: (a: number, b: number) => number) =>
  (message: TE.TaskEither<string, [(sumWait: number) => string, readonly string[]]>) => {
    const converted_message = pipe(message, op_TE_fold(op))
    return converted_message
  }

const mapper1 = (site: string) => {
  const action = pipe(site, getLastIndex, getWaitNumber, do_sleep1(site))
  return action
}

const mapper2_seq = (sites: string[]) => {
  const site = sites[0]
  const action1 = pipe(site, getTopIndex, getWaitNumber, do_sleep2(site))
  const action2 = pipe(sites, RA.traverse(TE.ApplicativeSeq)(mapper1))
  const action = pipe(sequenceT(TE.ApplicativeSeq)(action1, action2), map_TE_2(sum))
  return action
}

describe('fp-ts 非同期処理2つの配列', () => {
  test('使い方3ー全部同時実行からの直列実行', testOptions, async () => {
    const sites = [
      // ['https://test/1/1'],
      // ['https://test/2/1', 'https://test/2/2']
      ['https://test/3/1', 'https://test/3/2', 'https://test/3/3']
      // ['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
      // [
      //   'https://test/5/1',
      //   'https://test/5/2',
      //   'https://test/5/3',
      //   'https://test/5/4',
      //   'https://test/5/5'
      // ]
    ]

    checkLog('[start]')

    const action = pipe(sites, RA.traverse(TE.ApplicativePar)(mapper2_seq))
    const result = await action()

    console.log(result)
  })
})

実行結果

time : 2025-01-18T12:11:04.539Z - text: [start]
time : 2025-01-18T12:11:07.547Z - text: mapper2-index = 3
time : 2025-01-18T12:11:08.556Z - text: https://test/3/1
time : 2025-01-18T12:11:10.560Z - text: https://test/3/2
time : 2025-01-18T12:11:13.565Z - text: https://test/3/3
{ _tag: 'Right', right: [ 'https://test/3 - 9000' ] }

全部同時実行からの最大2個まで同時実行 ※一部性能劣化

import { describe, test } from 'vitest'
import * as TE from 'fp-ts/TaskEither'
import { pipe } from 'fp-ts/lib/function'
import * as RA from 'fp-ts/ReadonlyArray'
import { sequenceT } from 'fp-ts/Apply'

const sleep = (wait: number) =>
  new Promise((resolve) => {
    setTimeout(() => resolve(wait), wait)
  }) as Promise<number>

function checkLog(text: string) {
  console.log(`time : ${new Date().toISOString()} - text: ${text}`)
}

const testOptions = {
  timeout: 1000 * 100
}

function sum(a: number, b: number) {
  return a + b
}

function getLastIndex(site: string) {
  const index = site.slice(-1)
  return Number(index)
}
function getTopIndex(site: string) {
  const index = site.substring(site.length - 3, site.length - 2)
  return Number(index)
}

function getWaitNumber(index: number) {
  const wait = index * 1000
  return wait
}

const do_sleep1 = (site: string) => (wait: number) =>
  TE.tryCatch(
    () =>
      sleep(wait).then((_wait) => {
        checkLog(site)
        return `${site} - ${_wait}`
      }),
    () => 'Error 1'
  )
function getWaitFromMapper1Message(mapper1Message: string) {
  const mapper1WaitString = mapper1Message.substring(mapper1Message.indexOf('-') + 1)
  const mapper1Wait = Number(mapper1WaitString)
  return mapper1Wait
}

const do_sleep2 = (site: string) => (wait: number) =>
  TE.tryCatch(
    () =>
      sleep(wait).then((_wait) => {
        const index = getTopIndex(site)
        checkLog(`mapper2-index = ${index}`)
        return (opWait: number) => `${site.substring(0, site.length - 2)} - ${_wait + opWait}`
      }),
    () => 'Error 2'
  )

const op_TE_fold = (op: (a: number, b: number) => number) =>
  TE.foldW(
    (err) => TE.left(err),
    ([message2, message1Array]: [(sumWait: number) => string, readonly string[]]) => {
      const opWait = pipe(message1Array, RA.map(getWaitFromMapper1Message), RA.reduce(0, op))
      return TE.right(message2(opWait))
    }
  )

const map_TE_2 =
  (op: (a: number, b: number) => number) =>
  (message: TE.TaskEither<string, [(sumWait: number) => string, readonly string[]]>) => {
    const converted_message = pipe(message, op_TE_fold(op))
    return converted_message
  }

const opStringArray = (a: readonly string[], b: readonly string[]) => {
  if (b.length == 2) {
    const b0 = getWaitFromMapper1Message(b[0])
    const b1 = getWaitFromMapper1Message(b[1])
    if (b0 > b1) {
      const result = [...a, b[0]]
      return result
    } else {
      const result = [...a, b[1]]
      return result
    }
  } else {
    const result = [...a, ...b]
    return result
  }
}

const map_TE_2_array =
  (op: (a: readonly string[], b: readonly string[]) => readonly string[]) =>
  (messageArray: TE.TaskEither<string, readonly (readonly string[])[]>) => {
    const converted_message = pipe(
      messageArray,
      TE.fold(
        (err) => TE.left(err),
        (message1Array: readonly (readonly string[])[]) => {
          const opWait = pipe(message1Array, RA.reduce([], op))
          return TE.right(opWait)
        }
      )
    )
    return converted_message
  }

const mapper1 = (site: string) => {
  const action = pipe(site, getLastIndex, getWaitNumber, do_sleep1(site))
  return action
}

const mapper2_limit2 = (sites: string[]) => {
  const site = sites[0]
  const action1 = pipe(site, getTopIndex, getWaitNumber, do_sleep2(site))

  const limit = 2
  const chunkSites = pipe(sites, RA.chunksOf(limit))
  const action2 = pipe(chunkSites, RA.map(RA.traverse(TE.ApplicativePar)(mapper1)))
  const flatAction2 = pipe(action2, RA.sequence(TE.ApplicativeSeq), map_TE_2_array(opStringArray))

  const action = pipe(sequenceT(TE.ApplicativeSeq)(action1, flatAction2), map_TE_2(sum))
  return action
}

describe('fp-ts 非同期処理2つの配列', () => {
  test('使い方5ー全部同時実行からの最大2個まで同時実行', testOptions, async () => {
    const sites = [
      // ['https://test/1/1'],
      // ['https://test/2/1', 'https://test/2/2']
      ['https://test/3/1', 'https://test/3/2', 'https://test/3/3']
      // ['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
      // [
      //   'https://test/5/1',
      //   'https://test/5/2',
      //   'https://test/5/3',
      //   'https://test/5/4',
      //   'https://test/5/5'
      // ]
    ]

    checkLog('[start]')

    const action = pipe(sites, RA.traverse(TE.ApplicativePar)(mapper2_limit2))
    const result = await action()

    console.log(result)
  })
})

実行結果

time : 2025-01-18T12:12:57.869Z - text: [start]
time : 2025-01-18T12:13:00.877Z - text: mapper2-index = 3
time : 2025-01-18T12:13:01.883Z - text: https://test/3/1
time : 2025-01-18T12:13:02.891Z - text: https://test/3/2
time : 2025-01-18T12:13:05.903Z - text: https://test/3/3
{ _tag: 'Right', right: [ 'https://test/3 - 8000' ] }

配列が2つの場合(複雑)

全部同時実行からの全部同時実行

import { describe, test } from 'vitest'
import * as TE from 'fp-ts/TaskEither'
import { pipe } from 'fp-ts/lib/function'
import * as RA from 'fp-ts/ReadonlyArray'
import { sequenceT } from 'fp-ts/Apply'

const sleep = (wait: number) =>
  new Promise((resolve) => {
    setTimeout(() => resolve(wait), wait)
  }) as Promise<number>

function checkLog(text: string) {
  console.log(`time : ${new Date().toISOString()} - text: ${text}`)
}

const testOptions = {
  timeout: 1000 * 100
}

function getLastIndex(site: string) {
  const index = site.slice(-1)
  return Number(index)
}
function getTopIndex(site: string) {
  const index = site.substring(site.length - 3, site.length - 2)
  return Number(index)
}

function getWaitNumber(index: number) {
  const wait = index * 1000
  return wait
}

const do_sleep1 = (site: string) => (wait: number) =>
  TE.tryCatch(
    () =>
      sleep(wait).then((_wait) => {
        checkLog(site)
        return `${site} - ${_wait}`
      }),
    () => 'Error 1'
  )
function getWaitFromMapper1Message(mapper1Message: string) {
  const mapper1WaitString = mapper1Message.substring(mapper1Message.indexOf('-') + 1)
  const mapper1Wait = Number(mapper1WaitString)
  return mapper1Wait
}

const do_sleep2 = (site: string) => (wait: number) =>
  TE.tryCatch(
    () =>
      sleep(wait).then((_wait) => {
        const index = getTopIndex(site)
        checkLog(`mapper2-index = ${index}`)
        return (opWait: number) => `${site.substring(0, site.length - 2)} - ${_wait + opWait}`
      }),
    () => 'Error 2'
  )

const op_TE_fold = (op: (a: number, b: number) => number) =>
  TE.foldW(
    (err) => TE.left(err),
    ([message2, message1Array]: [(sumWait: number) => string, readonly string[]]) => {
      const opWait = pipe(message1Array, RA.map(getWaitFromMapper1Message), RA.reduce(0, op))
      return TE.right(message2(opWait))
    }
  )

const map_TE_2 =
  (op: (a: number, b: number) => number) =>
  (message: TE.TaskEither<string, [(sumWait: number) => string, readonly string[]]>) => {
    const converted_message = pipe(message, op_TE_fold(op))
    return converted_message
  }

const mapper1 = (site: string) => {
  const action = pipe(site, getLastIndex, getWaitNumber, do_sleep1(site))
  return action
}

const mapper2_par = (sites: string[]) => {
  const site = sites[0]
  const action1 = pipe(site, getTopIndex, getWaitNumber, do_sleep2(site))
  const action2 = pipe(sites, RA.traverse(TE.ApplicativePar)(mapper1))
  const action = pipe(sequenceT(TE.ApplicativeSeq)(action1, action2), map_TE_2(Math.max))
  return action
}

describe('fp-ts 非同期処理2つの配列', () => {
  test('使い方2ー全部同時実行からの全部同時実行', testOptions, async () => {
    const sites = [
      ['https://test/1/1'],
      ['https://test/2/1', 'https://test/2/2'],
      ['https://test/3/1', 'https://test/3/2', 'https://test/3/3'],
      ['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
      [
        'https://test/5/1',
        'https://test/5/2',
        'https://test/5/3',
        'https://test/5/4',
        'https://test/5/5'
      ]
    ]

    checkLog('[start]')

    const action = pipe(sites, RA.traverse(TE.ApplicativePar)(mapper2_par))
    const result = await action()

    console.log(result)
  })
})

実行結果

time : 2025-01-18T12:15:32.505Z - text: [start]
time : 2025-01-18T12:15:33.510Z - text: mapper2-index = 1
time : 2025-01-18T12:15:34.512Z - text: mapper2-index = 2
time : 2025-01-18T12:15:34.513Z - text: https://test/1/1
time : 2025-01-18T12:15:35.508Z - text: mapper2-index = 3
time : 2025-01-18T12:15:35.524Z - text: https://test/2/1
time : 2025-01-18T12:15:36.510Z - text: mapper2-index = 4
time : 2025-01-18T12:15:36.511Z - text: https://test/3/1
time : 2025-01-18T12:15:36.514Z - text: https://test/2/2
time : 2025-01-18T12:15:37.514Z - text: mapper2-index = 5
time : 2025-01-18T12:15:37.515Z - text: https://test/3/2
time : 2025-01-18T12:15:37.515Z - text: https://test/4/1
time : 2025-01-18T12:15:38.518Z - text: https://test/3/3
time : 2025-01-18T12:15:38.518Z - text: https://test/4/2
time : 2025-01-18T12:15:38.518Z - text: https://test/5/1
time : 2025-01-18T12:15:39.526Z - text: https://test/4/3
time : 2025-01-18T12:15:39.526Z - text: https://test/5/2
time : 2025-01-18T12:15:40.511Z - text: https://test/4/4
time : 2025-01-18T12:15:40.523Z - text: https://test/5/3
time : 2025-01-18T12:15:41.526Z - text: https://test/5/4
time : 2025-01-18T12:15:42.527Z - text: https://test/5/5
{
  _tag: 'Right',
  right: [
    'https://test/1 - 2000',
    'https://test/2 - 4000',
    'https://test/3 - 6000',
    'https://test/4 - 8000',
    'https://test/5 - 10000'
  ]
}

全部同時実行からの直列実行

import { describe, test } from 'vitest'
import * as TE from 'fp-ts/TaskEither'
import { pipe } from 'fp-ts/lib/function'
import * as RA from 'fp-ts/ReadonlyArray'
import { sequenceT } from 'fp-ts/Apply'

const sleep = (wait: number) =>
  new Promise((resolve) => {
    setTimeout(() => resolve(wait), wait)
  }) as Promise<number>

function checkLog(text: string) {
  console.log(`time : ${new Date().toISOString()} - text: ${text}`)
}

const testOptions = {
  timeout: 1000 * 100
}

function sum(a: number, b: number) {
  return a + b
}

function getLastIndex(site: string) {
  const index = site.slice(-1)
  return Number(index)
}
function getTopIndex(site: string) {
  const index = site.substring(site.length - 3, site.length - 2)
  return Number(index)
}

function getWaitNumber(index: number) {
  const wait = index * 1000
  return wait
}

const do_sleep1 = (site: string) => (wait: number) =>
  TE.tryCatch(
    () =>
      sleep(wait).then((_wait) => {
        checkLog(site)
        return `${site} - ${_wait}`
      }),
    () => 'Error 1'
  )
function getWaitFromMapper1Message(mapper1Message: string) {
  const mapper1WaitString = mapper1Message.substring(mapper1Message.indexOf('-') + 1)
  const mapper1Wait = Number(mapper1WaitString)
  return mapper1Wait
}

const do_sleep2 = (site: string) => (wait: number) =>
  TE.tryCatch(
    () =>
      sleep(wait).then((_wait) => {
        const index = getTopIndex(site)
        checkLog(`mapper2-index = ${index}`)
        return (opWait: number) => `${site.substring(0, site.length - 2)} - ${_wait + opWait}`
      }),
    () => 'Error 2'
  )

const op_TE_fold = (op: (a: number, b: number) => number) =>
  TE.foldW(
    (err) => TE.left(err),
    ([message2, message1Array]: [(sumWait: number) => string, readonly string[]]) => {
      const opWait = pipe(message1Array, RA.map(getWaitFromMapper1Message), RA.reduce(0, op))
      return TE.right(message2(opWait))
    }
  )

const map_TE_2 =
  (op: (a: number, b: number) => number) =>
  (message: TE.TaskEither<string, [(sumWait: number) => string, readonly string[]]>) => {
    const converted_message = pipe(message, op_TE_fold(op))
    return converted_message
  }

const mapper1 = (site: string) => {
  const action = pipe(site, getLastIndex, getWaitNumber, do_sleep1(site))
  return action
}

const mapper2_seq = (sites: string[]) => {
  const site = sites[0]
  const action1 = pipe(site, getTopIndex, getWaitNumber, do_sleep2(site))
  const action2 = pipe(sites, RA.traverse(TE.ApplicativeSeq)(mapper1))
  const action = pipe(sequenceT(TE.ApplicativeSeq)(action1, action2), map_TE_2(sum))
  return action
}

describe('fp-ts 非同期処理2つの配列', () => {
  test('使い方4ー全部同時実行からの直列実行', testOptions, async () => {
    const sites = [
      ['https://test/1/1'],
      ['https://test/2/1', 'https://test/2/2'],
      ['https://test/3/1', 'https://test/3/2', 'https://test/3/3'],
      ['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
      [
        'https://test/5/1',
        'https://test/5/2',
        'https://test/5/3',
        'https://test/5/4',
        'https://test/5/5'
      ]
    ]

    checkLog('[start]')

    const action = pipe(sites, RA.traverse(TE.ApplicativePar)(mapper2_seq))
    const result = await action()

    console.log(result)
  })
})

実行結果

time : 2025-01-18T12:18:01.178Z - text: [start]
time : 2025-01-18T12:18:02.184Z - text: mapper2-index = 1
time : 2025-01-18T12:18:03.183Z - text: mapper2-index = 2
time : 2025-01-18T12:18:03.200Z - text: https://test/1/1
time : 2025-01-18T12:18:04.197Z - text: mapper2-index = 3
time : 2025-01-18T12:18:04.198Z - text: https://test/2/1
time : 2025-01-18T12:18:05.183Z - text: mapper2-index = 4
time : 2025-01-18T12:18:05.202Z - text: https://test/3/1
time : 2025-01-18T12:18:06.195Z - text: mapper2-index = 5
time : 2025-01-18T12:18:06.196Z - text: https://test/4/1
time : 2025-01-18T12:18:06.201Z - text: https://test/2/2
time : 2025-01-18T12:18:07.201Z - text: https://test/5/1
time : 2025-01-18T12:18:07.211Z - text: https://test/3/2
time : 2025-01-18T12:18:08.198Z - text: https://test/4/2
time : 2025-01-18T12:18:09.202Z - text: https://test/5/2
time : 2025-01-18T12:18:10.211Z - text: https://test/3/3
time : 2025-01-18T12:18:11.205Z - text: https://test/4/3
time : 2025-01-18T12:18:12.212Z - text: https://test/5/3
time : 2025-01-18T12:18:15.221Z - text: https://test/4/4
time : 2025-01-18T12:18:16.221Z - text: https://test/5/4
time : 2025-01-18T12:18:21.226Z - text: https://test/5/5
{
  _tag: 'Right',
  right: [
    'https://test/1 - 2000',
    'https://test/2 - 5000',
    'https://test/3 - 9000',
    'https://test/4 - 14000',
    'https://test/5 - 20000'
  ]
}

全部同時実行からの最大2個まで同時実行 ※一部性能劣化

import { describe, test } from 'vitest'
import * as TE from 'fp-ts/TaskEither'
import { pipe } from 'fp-ts/lib/function'
import * as RA from 'fp-ts/ReadonlyArray'
import { sequenceT } from 'fp-ts/Apply'

const sleep = (wait: number) =>
  new Promise((resolve) => {
    setTimeout(() => resolve(wait), wait)
  }) as Promise<number>

function checkLog(text: string) {
  console.log(`time : ${new Date().toISOString()} - text: ${text}`)
}

const testOptions = {
  timeout: 1000 * 100
}

function sum(a: number, b: number) {
  return a + b
}

function getLastIndex(site: string) {
  const index = site.slice(-1)
  return Number(index)
}
function getTopIndex(site: string) {
  const index = site.substring(site.length - 3, site.length - 2)
  return Number(index)
}

function getWaitNumber(index: number) {
  const wait = index * 1000
  return wait
}

const do_sleep1 = (site: string) => (wait: number) =>
  TE.tryCatch(
    () =>
      sleep(wait).then((_wait) => {
        checkLog(site)
        return `${site} - ${_wait}`
      }),
    () => 'Error 1'
  )
function getWaitFromMapper1Message(mapper1Message: string) {
  const mapper1WaitString = mapper1Message.substring(mapper1Message.indexOf('-') + 1)
  const mapper1Wait = Number(mapper1WaitString)
  return mapper1Wait
}

const do_sleep2 = (site: string) => (wait: number) =>
  TE.tryCatch(
    () =>
      sleep(wait).then((_wait) => {
        const index = getTopIndex(site)
        checkLog(`mapper2-index = ${index}`)
        return (opWait: number) => `${site.substring(0, site.length - 2)} - ${_wait + opWait}`
      }),
    () => 'Error 2'
  )

const op_TE_fold = (op: (a: number, b: number) => number) =>
  TE.foldW(
    (err) => TE.left(err),
    ([message2, message1Array]: [(sumWait: number) => string, readonly string[]]) => {
      const opWait = pipe(message1Array, RA.map(getWaitFromMapper1Message), RA.reduce(0, op))
      return TE.right(message2(opWait))
    }
  )

const map_TE_2 =
  (op: (a: number, b: number) => number) =>
  (message: TE.TaskEither<string, [(sumWait: number) => string, readonly string[]]>) => {
    const converted_message = pipe(message, op_TE_fold(op))
    return converted_message
  }

const opStringArray = (a: readonly string[], b: readonly string[]) => {
  if (b.length == 2) {
    const b0 = getWaitFromMapper1Message(b[0])
    const b1 = getWaitFromMapper1Message(b[1])
    if (b0 > b1) {
      const result = [...a, b[0]]
      return result
    } else {
      const result = [...a, b[1]]
      return result
    }
  } else {
    const result = [...a, ...b]
    return result
  }
}

const map_TE_2_array =
  (op: (a: readonly string[], b: readonly string[]) => readonly string[]) =>
  (messageArray: TE.TaskEither<string, readonly (readonly string[])[]>) => {
    const converted_message = pipe(
      messageArray,
      TE.fold(
        (err) => TE.left(err),
        (message1Array: readonly (readonly string[])[]) => {
          const opWait = pipe(message1Array, RA.reduce([], op))
          return TE.right(opWait)
        }
      )
    )
    return converted_message
  }

const mapper1 = (site: string) => {
  const action = pipe(site, getLastIndex, getWaitNumber, do_sleep1(site))
  return action
}

const mapper2_limit2 = (sites: string[]) => {
  const site = sites[0]
  const action1 = pipe(site, getTopIndex, getWaitNumber, do_sleep2(site))

  const limit = 2
  const chunkSites = pipe(sites, RA.chunksOf(limit))
  const action2 = pipe(chunkSites, RA.map(RA.traverse(TE.ApplicativePar)(mapper1)))
  const flatAction2 = pipe(action2, RA.sequence(TE.ApplicativeSeq), map_TE_2_array(opStringArray))

  const action = pipe(sequenceT(TE.ApplicativeSeq)(action1, flatAction2), map_TE_2(sum))
  return action
}

describe('fp-ts 非同期処理2つの配列', () => {
  test('使い方6ー全部同時実行からの最大2個まで同時実行', testOptions, async () => {
    const sites = [
      ['https://test/1/1'],
      ['https://test/2/1', 'https://test/2/2'],
      ['https://test/3/1', 'https://test/3/2', 'https://test/3/3'],
      ['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
      [
        'https://test/5/1',
        'https://test/5/2',
        'https://test/5/3',
        'https://test/5/4',
        'https://test/5/5'
      ]
    ]

    checkLog('[start]')

    const action = pipe(sites, RA.traverse(TE.ApplicativePar)(mapper2_limit2))
    const result = await action()

    console.log(result)
  })
})

実行結果

time : 2025-01-18T12:20:17.256Z - text: [start]
time : 2025-01-18T12:20:18.268Z - text: mapper2-index = 1
time : 2025-01-18T12:20:19.268Z - text: mapper2-index = 2
time : 2025-01-18T12:20:19.269Z - text: https://test/1/1
time : 2025-01-18T12:20:20.266Z - text: mapper2-index = 3
time : 2025-01-18T12:20:20.274Z - text: https://test/2/1
time : 2025-01-18T12:20:21.266Z - text: mapper2-index = 4
time : 2025-01-18T12:20:21.266Z - text: https://test/3/1
time : 2025-01-18T12:20:21.269Z - text: https://test/2/2
time : 2025-01-18T12:20:22.267Z - text: mapper2-index = 5
time : 2025-01-18T12:20:22.268Z - text: https://test/4/1
time : 2025-01-18T12:20:22.268Z - text: https://test/3/2
time : 2025-01-18T12:20:23.269Z - text: https://test/4/2
time : 2025-01-18T12:20:23.269Z - text: https://test/5/1
time : 2025-01-18T12:20:24.270Z - text: https://test/5/2
time : 2025-01-18T12:20:25.270Z - text: https://test/3/3
time : 2025-01-18T12:20:26.271Z - text: https://test/4/3
time : 2025-01-18T12:20:27.271Z - text: https://test/4/4
time : 2025-01-18T12:20:27.272Z - text: https://test/5/3
time : 2025-01-18T12:20:28.288Z - text: https://test/5/4
time : 2025-01-18T12:20:33.290Z - text: https://test/5/5
{
  _tag: 'Right',
  right: [
    'https://test/1 - 2000',
    'https://test/2 - 4000',
    'https://test/3 - 8000',
    'https://test/4 - 10000',
    'https://test/5 - 16000'
  ]
}
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?