配列の配列の非同期処理で対象API(以下ではsleep)がrejectする場合のメモ
前回
配列が1つの場合
全部同時実行
import { describe, test } from 'vitest'
import * as TE from 'fp-ts/TaskEither'
import { pipe } from 'fp-ts/lib/function'
import * as RA from 'fp-ts/ReadonlyArray'
import * as E from 'fp-ts/Either'
const sleep = (wait: number) =>
new Promise((resolve, reject) => {
const waitSeconds = wait / 1000
if (waitSeconds % 3 === 0) {
reject('Sleep NG before wait')
} else if (waitSeconds % 2 === 0) {
setTimeout(() => reject(`${wait} - Sleep NG after wait`), wait)
} else {
setTimeout(() => resolve(wait), wait)
}
}) as Promise<number>
function checkLog(text: string) {
console.log(`time : ${new Date().toISOString()} - text: ${text}`)
}
const testOptions = {
timeout: 1000 * 100
}
function getLastIndex(site: string) {
const index = site.slice(-1)
return Number(index)
}
function getWaitNumber(index: number) {
const wait = index * 1000
return wait
}
function checkOkNg_FromMessage(aboutOkMessage: string) {
const resultMessage: E.Either<string, string> =
aboutOkMessage.indexOf('ng') > 0 ? E.left(aboutOkMessage) : E.right(aboutOkMessage)
return resultMessage
}
const do_sleep1 = (site: string) => (wait: number) =>
TE.tryCatch(
() =>
sleep(wait)
.then((_wait) => {
checkLog(site)
return `${site} - ok - ${_wait}`
})
.catch((error) => {
checkLog(`${site} - ng - ${error}`)
return `${site} - ng - ${error}`
}),
() => 'Error 1'
)
const mapper1 = (site: string) => {
const action = pipe(site, getLastIndex, getWaitNumber, do_sleep1(site))
return action
}
describe('fp-ts 非同期処理1つの配列', () => {
test('使い方1ー全部同時実行', testOptions, async () => {
const sites = [
'https://test/1',
'https://test/2',
'https://test/3',
'https://test/4',
'https://test/5'
]
checkLog('[start]')
const aboutOkAction = pipe(sites, RA.traverse(TE.ApplicativePar)(mapper1))
const aboutOkResult = await aboutOkAction()
const result = pipe(
aboutOkResult,
E.foldW(
(error) => E.left(error),
(aboutOkMessageArray: readonly string[]) =>
pipe(aboutOkMessageArray, RA.partitionMap(checkOkNg_FromMessage))
)
)
console.log(result)
})
})
実行結果
time : 2025-01-19T08:03:58.381Z - text: [start]
time : 2025-01-19T08:03:58.384Z - text: https://test/3 - ng - Sleep NG before wait
time : 2025-01-19T08:03:59.390Z - text: https://test/1
time : 2025-01-19T08:04:00.395Z - text: https://test/2 - ng - 2000 - Sleep NG after wait
time : 2025-01-19T08:04:02.393Z - text: https://test/4 - ng - 4000 - Sleep NG after wait
time : 2025-01-19T08:04:03.392Z - text: https://test/5
{
left: [
'https://test/2 - ng - 2000 - Sleep NG after wait',
'https://test/3 - ng - Sleep NG before wait',
'https://test/4 - ng - 4000 - Sleep NG after wait'
],
right: [ 'https://test/1 - ok - 1000', 'https://test/5 - ok - 5000' ]
}
最大2個まで同時実行 ※一部性能劣化
import { describe, test } from 'vitest'
import * as TE from 'fp-ts/TaskEither'
import { pipe } from 'fp-ts/lib/function'
import * as RA from 'fp-ts/ReadonlyArray'
import * as E from 'fp-ts/Either'
const sleep = (wait: number) =>
new Promise((resolve, reject) => {
const waitSeconds = wait / 1000
if (waitSeconds % 3 === 0) {
reject('Sleep NG before wait')
} else if (waitSeconds % 2 === 0) {
setTimeout(() => reject(`${wait} - Sleep NG after wait`), wait)
} else {
setTimeout(() => resolve(wait), wait)
}
}) as Promise<number>
function checkLog(text: string) {
console.log(`time : ${new Date().toISOString()} - text: ${text}`)
}
const testOptions = {
timeout: 1000 * 100
}
function getLastIndex(site: string) {
const index = site.slice(-1)
return Number(index)
}
function getWaitNumber(index: number) {
const wait = index * 1000
return wait
}
function checkOkNg_FromMessage(aboutOkMessage: string) {
const resultMessage: E.Either<string, string> =
aboutOkMessage.indexOf('ng') > 0 ? E.left(aboutOkMessage) : E.right(aboutOkMessage)
return resultMessage
}
const do_sleep1 = (site: string) => (wait: number) =>
TE.tryCatch(
() =>
sleep(wait)
.then((_wait) => {
checkLog(site)
return `${site} - ok - ${_wait}`
})
.catch((error) => {
checkLog(`${site} - ng - ${error}`)
return `${site} - ng - ${error}`
}),
() => 'Error 1'
)
const mapper1 = (site: string) => {
const action = pipe(site, getLastIndex, getWaitNumber, do_sleep1(site))
return action
}
describe('fp-ts 非同期処理1つの配列', () => {
test('使い方2ー最大2個まで同時実行', testOptions, async () => {
const sites = [
'https://test/1',
'https://test/2',
'https://test/3',
'https://test/4',
'https://test/5'
]
checkLog('[start]')
const limit = 2
const chunkSites = pipe(sites, RA.chunksOf(limit))
const aboutOkAction = pipe(chunkSites, RA.map(RA.traverse(TE.ApplicativePar)(mapper1)))
const aboutOkFlatAction = pipe(
aboutOkAction,
RA.sequence(TE.ApplicativeSeq),
TE.map(RA.flatten)
)
const aboutOkResult = await aboutOkFlatAction()
const result = pipe(
aboutOkResult,
E.foldW(
(error) => E.left(error),
(aboutOkMessageArray: readonly string[]) =>
pipe(aboutOkMessageArray, RA.partitionMap(checkOkNg_FromMessage))
)
)
console.log(result)
})
})
実行結果
time : 2025-01-19T08:05:17.141Z - text: [start]
time : 2025-01-19T08:05:18.154Z - text: https://test/1
time : 2025-01-19T08:05:19.148Z - text: https://test/2 - ng - 2000 - Sleep NG after wait
time : 2025-01-19T08:05:19.149Z - text: https://test/3 - ng - Sleep NG before wait
time : 2025-01-19T08:05:23.151Z - text: https://test/4 - ng - 4000 - Sleep NG after wait
time : 2025-01-19T08:05:28.160Z - text: https://test/5
{
left: [
'https://test/2 - ng - 2000 - Sleep NG after wait',
'https://test/3 - ng - Sleep NG before wait',
'https://test/4 - ng - 4000 - Sleep NG after wait'
],
right: [ 'https://test/1 - ok - 1000', 'https://test/5 - ok - 5000' ]
}
配列が2つの場合(シンプル)
全部同時実行からの全部同時実行
import { describe, test } from 'vitest'
import * as TE from 'fp-ts/TaskEither'
import { pipe } from 'fp-ts/lib/function'
import * as RA from 'fp-ts/ReadonlyArray'
import * as E from 'fp-ts/Either'
const sleep = (wait: number) =>
new Promise((resolve, reject) => {
const waitSeconds = wait / 1000
if (waitSeconds % 3 === 0) {
reject('Sleep NG before wait')
} else if (waitSeconds % 2 === 0) {
setTimeout(() => reject(`${wait} - Sleep NG after wait`), wait)
} else {
setTimeout(() => resolve(wait), wait)
}
}) as Promise<number>
function checkLog(text: string) {
console.log(`time : ${new Date().toISOString()} - text: ${text}`)
}
const testOptions = {
timeout: 1000 * 100
}
function getLastIndex(site: string) {
const index = site.slice(-1)
return Number(index)
}
function getTopIndex(site: string) {
const index = site.substring(site.length - 3, site.length - 2)
return Number(index)
}
function getWaitNumber(index: number) {
const wait = index * 1000
return wait
}
function checkOkNg_FromMessage(aboutOkMessage: string) {
const resultMessage: E.Either<string, string> =
aboutOkMessage.indexOf('ng') > 0 ? E.left(aboutOkMessage) : E.right(aboutOkMessage)
return resultMessage
}
const do_sleep1 = (site: string) => (wait: number) =>
TE.tryCatch(
() =>
sleep(wait)
.then((_wait) => {
checkLog(site)
return `${site} - ok - ${_wait}`
})
.catch((error) => {
checkLog(`${site} - ng - ${error}`)
return `${site} - ng - ${error}`
}),
() => 'Error 1'
)
function getWaitFromMapper1Message(mapper1Message: string) {
const mapper1OkWaitString = mapper1Message.substring(mapper1Message.indexOf('ok') + 5)
const mapper1OkWait = Number(mapper1OkWaitString)
if (isNaN(mapper1OkWait)) {
const mapper1NgWaitString = mapper1Message.substring(mapper1Message.indexOf('ng') + 5)
const mapper1NgWaitStringWithoutErrorMessage = mapper1NgWaitString.substring(
0,
mapper1NgWaitString.indexOf('-')
)
const mapper1NgWait = Number(mapper1NgWaitStringWithoutErrorMessage)
return isNaN(mapper1NgWait) ? 0 : mapper1NgWait
} else {
return mapper1OkWait
}
}
const do_sleep2 = (site: string) => (wait: number) =>
TE.tryCatch(
() =>
sleep(wait)
.then((_wait) => {
const index = getTopIndex(site)
checkLog(`mapper2-index = ${index}`)
return (opWait: number) => `${site.substring(0, site.length - 2)} - ${_wait + opWait}`
})
.catch((error) => {
const index = getTopIndex(site)
checkLog(`mapper2-index = ${index} - ng - ${error}`)
return () => `${site.substring(0, site.length - 2)} - ng - ${error}`
}),
() => 'Error 2'
)
const op_message1_and_2 =
(op: (a: number, b: number) => number) =>
([message2, message1Array]: [
((opWait: number) => string) | (() => string),
readonly string[]
]) => {
const opWait = pipe(message1Array, RA.map(getWaitFromMapper1Message), RA.reduce(0, op))
return TE.right(message2(opWait))
}
const mapper1 = (site: string) => {
const action = pipe(site, getLastIndex, getWaitNumber, do_sleep1(site))
return action
}
function action2_and_1(
action2: TE.TaskEither<string, ((opWait: number) => string) | (() => string)>,
action1: TE.TaskEither<string, readonly string[]>,
op: (a: number, b: number) => number
) {
return pipe(
action2,
TE.fold(
(error2) => TE.right(error2),
(message2) =>
pipe(
message2(123),
checkOkNg_FromMessage,
E.fold(
(ngMessage2) => TE.right(ngMessage2),
() =>
pipe(
action1,
TE.foldW(
(error1) => TE.right(error1),
(message1) => op_message1_and_2(op)([message2, message1])
)
)
)
)
)
)
}
const mapper2_par = (sites: string[]) => {
const site = sites[0]
const action2 = pipe(site, getTopIndex, getWaitNumber, do_sleep2(site))
const action1 = pipe(sites, RA.traverse(TE.ApplicativePar)(mapper1))
const action = action2_and_1(action2, action1, Math.max)
return action
}
describe('fp-ts 非同期処理2つの配列', () => {
test('使い方1ー全部同時実行からの全部同時実行', testOptions, async () => {
const sites = [
// ['https://test/1/1'],
// ['https://test/2/1', 'https://test/2/2']
// ['https://test/3/1', 'https://test/3/2', 'https://test/3/3']
// ['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
[
'https://test/5/1',
'https://test/5/2',
'https://test/5/3',
'https://test/5/4',
'https://test/5/5'
]
]
checkLog('[start]')
const aboutOkAction = pipe(sites, RA.traverse(TE.ApplicativePar)(mapper2_par))
const aboutOkResult = await aboutOkAction()
const result = pipe(
aboutOkResult,
E.foldW(
(error) => E.left(error),
(aboutOkMessageArray: readonly string[]) =>
pipe(aboutOkMessageArray, RA.partitionMap(checkOkNg_FromMessage))
)
)
console.log(result)
})
})
実行結果
time : 2025-01-19T08:09:18.172Z - text: [start]
time : 2025-01-19T08:09:23.185Z - text: mapper2-index = 5
time : 2025-01-19T08:09:23.187Z - text: https://test/5/3 - ng - Sleep NG before wait
time : 2025-01-19T08:09:24.199Z - text: https://test/5/1
time : 2025-01-19T08:09:25.201Z - text: https://test/5/2 - ng - 2000 - Sleep NG after wait
time : 2025-01-19T08:09:27.192Z - text: https://test/5/4 - ng - 4000 - Sleep NG after wait
time : 2025-01-19T08:09:28.194Z - text: https://test/5/5
{ left: [], right: [ 'https://test/5 - 10000' ] }
全部同時実行からの直列実行
import { describe, test } from 'vitest'
import * as TE from 'fp-ts/TaskEither'
import { pipe } from 'fp-ts/lib/function'
import * as RA from 'fp-ts/ReadonlyArray'
import * as E from 'fp-ts/Either'
const sleep = (wait: number) =>
new Promise((resolve, reject) => {
const waitSeconds = wait / 1000
if (waitSeconds % 3 === 0) {
reject('Sleep NG before wait')
} else if (waitSeconds % 2 === 0) {
setTimeout(() => reject(`${wait} - Sleep NG after wait`), wait)
} else {
setTimeout(() => resolve(wait), wait)
}
}) as Promise<number>
function checkLog(text: string) {
console.log(`time : ${new Date().toISOString()} - text: ${text}`)
}
const testOptions = {
timeout: 1000 * 100
}
function sum(a: number, b: number) {
return a + b
}
function getLastIndex(site: string) {
const index = site.slice(-1)
return Number(index)
}
function getTopIndex(site: string) {
const index = site.substring(site.length - 3, site.length - 2)
return Number(index)
}
function getWaitNumber(index: number) {
const wait = index * 1000
return wait
}
function checkOkNg_FromMessage(aboutOkMessage: string) {
const resultMessage: E.Either<string, string> =
aboutOkMessage.indexOf('ng') > 0 ? E.left(aboutOkMessage) : E.right(aboutOkMessage)
return resultMessage
}
const do_sleep1 = (site: string) => (wait: number) =>
TE.tryCatch(
() =>
sleep(wait)
.then((_wait) => {
checkLog(site)
return `${site} - ok - ${_wait}`
})
.catch((error) => {
checkLog(`${site} - ng - ${error}`)
return `${site} - ng - ${error}`
}),
() => 'Error 1'
)
function getWaitFromMapper1Message(mapper1Message: string) {
const mapper1OkWaitString = mapper1Message.substring(mapper1Message.indexOf('ok') + 5)
const mapper1OkWait = Number(mapper1OkWaitString)
if (isNaN(mapper1OkWait)) {
const mapper1NgWaitString = mapper1Message.substring(mapper1Message.indexOf('ng') + 5)
const mapper1NgWaitStringWithoutErrorMessage = mapper1NgWaitString.substring(
0,
mapper1NgWaitString.indexOf('-')
)
const mapper1NgWait = Number(mapper1NgWaitStringWithoutErrorMessage)
return isNaN(mapper1NgWait) ? 0 : mapper1NgWait
} else {
return mapper1OkWait
}
}
const do_sleep2 = (site: string) => (wait: number) =>
TE.tryCatch(
() =>
sleep(wait)
.then((_wait) => {
const index = getTopIndex(site)
checkLog(`mapper2-index = ${index}`)
return (opWait: number) => `${site.substring(0, site.length - 2)} - ${_wait + opWait}`
})
.catch((error) => {
const index = getTopIndex(site)
checkLog(`mapper2-index = ${index} - ng - ${error}`)
return () => `${site.substring(0, site.length - 2)} - ng - ${error}`
}),
() => 'Error 2'
)
const op_message1_and_2 =
(op: (a: number, b: number) => number) =>
([message2, message1Array]: [
((opWait: number) => string) | (() => string),
readonly string[]
]) => {
const opWait = pipe(message1Array, RA.map(getWaitFromMapper1Message), RA.reduce(0, op))
return TE.right(message2(opWait))
}
const mapper1 = (site: string) => {
const action = pipe(site, getLastIndex, getWaitNumber, do_sleep1(site))
return action
}
function action2_and_1(
action2: TE.TaskEither<string, ((opWait: number) => string) | (() => string)>,
action1: TE.TaskEither<string, readonly string[]>,
op: (a: number, b: number) => number
) {
return pipe(
action2,
TE.fold(
(error2) => TE.right(error2),
(message2) =>
pipe(
message2(123),
checkOkNg_FromMessage,
E.fold(
(ngMessage2) => TE.right(ngMessage2),
() =>
pipe(
action1,
TE.foldW(
(error1) => TE.right(error1),
(message1) => op_message1_and_2(op)([message2, message1])
)
)
)
)
)
)
}
const mapper2_seq = (sites: string[]) => {
const site = sites[0]
const action2 = pipe(site, getTopIndex, getWaitNumber, do_sleep2(site))
const action1 = pipe(sites, RA.traverse(TE.ApplicativeSeq)(mapper1))
const action = action2_and_1(action2, action1, sum)
return action
}
describe('fp-ts 非同期処理2つの配列', () => {
test('使い方3ー全部同時実行からの直列実行', testOptions, async () => {
const sites = [
// ['https://test/1/1'],
// ['https://test/2/1', 'https://test/2/2']
// ['https://test/3/1', 'https://test/3/2', 'https://test/3/3']
// ['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
[
'https://test/5/1',
'https://test/5/2',
'https://test/5/3',
'https://test/5/4',
'https://test/5/5'
]
]
checkLog('[start]')
const aboutOkAction = pipe(sites, RA.traverse(TE.ApplicativePar)(mapper2_seq))
const aboutOkResult = await aboutOkAction()
const result = pipe(
aboutOkResult,
E.foldW(
(error) => E.left(error),
(aboutOkMessageArray: readonly string[]) =>
pipe(aboutOkMessageArray, RA.partitionMap(checkOkNg_FromMessage))
)
)
console.log(result)
})
})
実行結果
time : 2025-01-19T08:12:59.525Z - text: [start]
time : 2025-01-19T08:13:04.529Z - text: mapper2-index = 5
time : 2025-01-19T08:13:05.543Z - text: https://test/5/1
time : 2025-01-19T08:13:07.557Z - text: https://test/5/2 - ng - 2000 - Sleep NG after wait
time : 2025-01-19T08:13:07.558Z - text: https://test/5/3 - ng - Sleep NG before wait
time : 2025-01-19T08:13:11.571Z - text: https://test/5/4 - ng - 4000 - Sleep NG after wait
time : 2025-01-19T08:13:16.590Z - text: https://test/5/5
{ left: [], right: [ 'https://test/5 - 17000' ] }
全部同時実行からの最大2個まで同時実行 ※一部性能劣化
import { describe, test } from 'vitest'
import * as TE from 'fp-ts/TaskEither'
import { pipe } from 'fp-ts/lib/function'
import * as RA from 'fp-ts/ReadonlyArray'
import * as E from 'fp-ts/Either'
const sleep = (wait: number) =>
new Promise((resolve, reject) => {
const waitSeconds = wait / 1000
if (waitSeconds % 3 === 0) {
reject('Sleep NG before wait')
} else if (waitSeconds % 2 === 0) {
setTimeout(() => reject(`${wait} - Sleep NG after wait`), wait)
} else {
setTimeout(() => resolve(wait), wait)
}
}) as Promise<number>
function checkLog(text: string) {
console.log(`time : ${new Date().toISOString()} - text: ${text}`)
}
const testOptions = {
timeout: 1000 * 100
}
function sum(a: number, b: number) {
return a + b
}
function getLastIndex(site: string) {
const index = site.slice(-1)
return Number(index)
}
function getTopIndex(site: string) {
const index = site.substring(site.length - 3, site.length - 2)
return Number(index)
}
function getWaitNumber(index: number) {
const wait = index * 1000
return wait
}
function checkOkNg_FromMessage(aboutOkMessage: string) {
const resultMessage: E.Either<string, string> =
aboutOkMessage.indexOf('ng') > 0 ? E.left(aboutOkMessage) : E.right(aboutOkMessage)
return resultMessage
}
const do_sleep1 = (site: string) => (wait: number) =>
TE.tryCatch(
() =>
sleep(wait)
.then((_wait) => {
checkLog(site)
return `${site} - ok - ${_wait}`
})
.catch((error) => {
checkLog(`${site} - ng - ${error}`)
return `${site} - ng - ${error}`
}),
() => 'Error 1'
)
function getWaitFromMapper1Message(mapper1Message: string) {
const mapper1OkWaitString = mapper1Message.substring(mapper1Message.indexOf('ok') + 5)
const mapper1OkWait = Number(mapper1OkWaitString)
if (isNaN(mapper1OkWait)) {
const mapper1NgWaitString = mapper1Message.substring(mapper1Message.indexOf('ng') + 5)
const mapper1NgWaitStringWithoutErrorMessage = mapper1NgWaitString.substring(
0,
mapper1NgWaitString.indexOf('-')
)
const mapper1NgWait = Number(mapper1NgWaitStringWithoutErrorMessage)
return isNaN(mapper1NgWait) ? 0 : mapper1NgWait
} else {
return mapper1OkWait
}
}
const do_sleep2 = (site: string) => (wait: number) =>
TE.tryCatch(
() =>
sleep(wait)
.then((_wait) => {
const index = getTopIndex(site)
checkLog(`mapper2-index = ${index}`)
return (opWait: number) => `${site.substring(0, site.length - 2)} - ${_wait + opWait}`
})
.catch((error) => {
const index = getTopIndex(site)
checkLog(`mapper2-index = ${index} - ng - ${error}`)
return () => `${site.substring(0, site.length - 2)} - ng - ${error}`
}),
() => 'Error 2'
)
const op_message1_and_2 =
(op: (a: number, b: number) => number) =>
([message2, message1Array]: [
((opWait: number) => string) | (() => string),
readonly string[]
]) => {
const opWait = pipe(message1Array, RA.map(getWaitFromMapper1Message), RA.reduce(0, op))
return TE.right(message2(opWait))
}
const opStringArray = (a: readonly string[], b: readonly string[]) => {
if (b.length == 2) {
const b0 = getWaitFromMapper1Message(b[0])
const b1 = getWaitFromMapper1Message(b[1])
if (b0 > b1) {
const result = [...a, b[0]]
return result
} else {
const result = [...a, b[1]]
return result
}
} else {
const result = [...a, ...b]
return result
}
}
const map_TE_2_array =
(op: (a: readonly string[], b: readonly string[]) => readonly string[]) =>
(messageArray: TE.TaskEither<string, readonly (readonly string[])[]>) => {
const converted_message = pipe(
messageArray,
TE.fold(
(err) => TE.left(err),
(message1Array: readonly (readonly string[])[]) => {
const opWait = pipe(message1Array, RA.reduce([], op))
return TE.right(opWait)
}
)
)
return converted_message
}
const mapper1 = (site: string) => {
const action = pipe(site, getLastIndex, getWaitNumber, do_sleep1(site))
return action
}
function action2_and_1(
action2: TE.TaskEither<string, ((opWait: number) => string) | (() => string)>,
action1: TE.TaskEither<string, readonly string[]>,
op: (a: number, b: number) => number
) {
return pipe(
action2,
TE.fold(
(error2) => TE.right(error2),
(message2) =>
pipe(
message2(123),
checkOkNg_FromMessage,
E.fold(
(ngMessage2) => TE.right(ngMessage2),
() =>
pipe(
action1,
TE.foldW(
(error1) => TE.right(error1),
(message1) => op_message1_and_2(op)([message2, message1])
)
)
)
)
)
)
}
const mapper2_limit2 = (sites: string[]) => {
const site = sites[0]
const action2 = pipe(site, getTopIndex, getWaitNumber, do_sleep2(site))
const limit = 2
const chunkSites = pipe(sites, RA.chunksOf(limit))
const action1 = pipe(chunkSites, RA.map(RA.traverse(TE.ApplicativePar)(mapper1)))
const flatAction1 = pipe(action1, RA.sequence(TE.ApplicativeSeq), map_TE_2_array(opStringArray))
const action = action2_and_1(action2, flatAction1, sum)
return action
}
describe('fp-ts 非同期処理2つの配列', () => {
test('使い方5ー全部同時実行からの最大2個まで同時実行', testOptions, async () => {
const sites = [
// ['https://test/1/1'],
// ['https://test/2/1', 'https://test/2/2']
// ['https://test/3/1', 'https://test/3/2', 'https://test/3/3']
// ['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
[
'https://test/5/1',
'https://test/5/2',
'https://test/5/3',
'https://test/5/4',
'https://test/5/5'
]
]
checkLog('[start]')
const aboutOkAction = pipe(sites, RA.traverse(TE.ApplicativePar)(mapper2_limit2))
const aboutOkResult = await aboutOkAction()
const result = pipe(
aboutOkResult,
E.foldW(
(error) => E.left(error),
(aboutOkMessageArray: readonly string[]) =>
pipe(aboutOkMessageArray, RA.partitionMap(checkOkNg_FromMessage))
)
)
console.log(result)
})
})
実行結果
time : 2025-01-19T08:18:19.837Z - text: [start]
time : 2025-01-19T08:18:24.843Z - text: mapper2-index = 5
time : 2025-01-19T08:18:25.856Z - text: https://test/5/1
time : 2025-01-19T08:18:26.856Z - text: https://test/5/2 - ng - 2000 - Sleep NG after wait
time : 2025-01-19T08:18:26.857Z - text: https://test/5/3 - ng - Sleep NG before wait
time : 2025-01-19T08:18:30.859Z - text: https://test/5/4 - ng - 4000 - Sleep NG after wait
time : 2025-01-19T08:18:35.875Z - text: https://test/5/5
{ left: [], right: [ 'https://test/5 - 16000' ] }
配列が2つの場合(複雑)
全部同時実行からの全部同時実行
import { describe, test } from 'vitest'
import * as TE from 'fp-ts/TaskEither'
import { pipe } from 'fp-ts/lib/function'
import * as RA from 'fp-ts/ReadonlyArray'
import * as E from 'fp-ts/Either'
const sleep = (wait: number) =>
new Promise((resolve, reject) => {
const waitSeconds = wait / 1000
if (waitSeconds % 3 === 0) {
reject('Sleep NG before wait')
} else if (waitSeconds % 2 === 0) {
setTimeout(() => reject(`${wait} - Sleep NG after wait`), wait)
} else {
setTimeout(() => resolve(wait), wait)
}
}) as Promise<number>
function checkLog(text: string) {
console.log(`time : ${new Date().toISOString()} - text: ${text}`)
}
const testOptions = {
timeout: 1000 * 100
}
function getLastIndex(site: string) {
const index = site.slice(-1)
return Number(index)
}
function getTopIndex(site: string) {
const index = site.substring(site.length - 3, site.length - 2)
return Number(index)
}
function getWaitNumber(index: number) {
const wait = index * 1000
return wait
}
function checkOkNg_FromMessage(aboutOkMessage: string) {
const resultMessage: E.Either<string, string> =
aboutOkMessage.indexOf('ng') > 0 ? E.left(aboutOkMessage) : E.right(aboutOkMessage)
return resultMessage
}
const do_sleep1 = (site: string) => (wait: number) =>
TE.tryCatch(
() =>
sleep(wait)
.then((_wait) => {
checkLog(site)
return `${site} - ok - ${_wait}`
})
.catch((error) => {
checkLog(`${site} - ng - ${error}`)
return `${site} - ng - ${error}`
}),
() => 'Error 1'
)
function getWaitFromMapper1Message(mapper1Message: string) {
const mapper1OkWaitString = mapper1Message.substring(mapper1Message.indexOf('ok') + 5)
const mapper1OkWait = Number(mapper1OkWaitString)
if (isNaN(mapper1OkWait)) {
const mapper1NgWaitString = mapper1Message.substring(mapper1Message.indexOf('ng') + 5)
const mapper1NgWaitStringWithoutErrorMessage = mapper1NgWaitString.substring(
0,
mapper1NgWaitString.indexOf('-')
)
const mapper1NgWait = Number(mapper1NgWaitStringWithoutErrorMessage)
return isNaN(mapper1NgWait) ? 0 : mapper1NgWait
} else {
return mapper1OkWait
}
}
const do_sleep2 = (site: string) => (wait: number) =>
TE.tryCatch(
() =>
sleep(wait)
.then((_wait) => {
const index = getTopIndex(site)
checkLog(`mapper2-index = ${index}`)
return (opWait: number) => `${site.substring(0, site.length - 2)} - ${_wait + opWait}`
})
.catch((error) => {
const index = getTopIndex(site)
checkLog(`mapper2-index = ${index} - ng - ${error}`)
return () => `${site.substring(0, site.length - 2)} - ng - ${error}`
}),
() => 'Error 2'
)
const op_message1_and_2 =
(op: (a: number, b: number) => number) =>
([message2, message1Array]: [
((opWait: number) => string) | (() => string),
readonly string[]
]) => {
const opWait = pipe(message1Array, RA.map(getWaitFromMapper1Message), RA.reduce(0, op))
return TE.right(message2(opWait))
}
const mapper1 = (site: string) => {
const action = pipe(site, getLastIndex, getWaitNumber, do_sleep1(site))
return action
}
function action2_and_1(
action2: TE.TaskEither<string, ((opWait: number) => string) | (() => string)>,
action1: TE.TaskEither<string, readonly string[]>,
op: (a: number, b: number) => number
) {
return pipe(
action2,
TE.fold(
(error2) => TE.right(error2),
(message2) =>
pipe(
message2(123),
checkOkNg_FromMessage,
E.fold(
(ngMessage2) => TE.right(ngMessage2),
() =>
pipe(
action1,
TE.foldW(
(error1) => TE.right(error1),
(message1) => op_message1_and_2(op)([message2, message1])
)
)
)
)
)
)
}
const mapper2_par = (sites: string[]) => {
const site = sites[0]
const action2 = pipe(site, getTopIndex, getWaitNumber, do_sleep2(site))
const action1 = pipe(sites, RA.traverse(TE.ApplicativePar)(mapper1))
const action = action2_and_1(action2, action1, Math.max)
return action
}
describe('fp-ts 非同期処理2つの配列', () => {
test('使い方2ー全部同時実行からの全部同時実行', testOptions, async () => {
const sites = [
['https://test/1/1'],
['https://test/2/1', 'https://test/2/2'],
['https://test/3/1', 'https://test/3/2', 'https://test/3/3'],
['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
[
'https://test/5/1',
'https://test/5/2',
'https://test/5/3',
'https://test/5/4',
'https://test/5/5'
]
]
checkLog('[start]')
const aboutOkAction = pipe(sites, RA.traverse(TE.ApplicativePar)(mapper2_par))
const aboutOkResult = await aboutOkAction()
const result = pipe(
aboutOkResult,
E.foldW(
(error) => E.left(error),
(aboutOkMessageArray: readonly string[]) =>
pipe(aboutOkMessageArray, RA.partitionMap(checkOkNg_FromMessage))
)
)
console.log(result)
})
})
実行結果
time : 2025-01-19T08:23:54.146Z - text: [start]
time : 2025-01-19T08:23:54.149Z - text: mapper2-index = 3 - ng - Sleep NG before wait
time : 2025-01-19T08:23:55.150Z - text: mapper2-index = 1
time : 2025-01-19T08:23:56.156Z - text: mapper2-index = 2 - ng - 2000 - Sleep NG after wait
time : 2025-01-19T08:23:56.157Z - text: https://test/1/1
time : 2025-01-19T08:23:58.160Z - text: mapper2-index = 4 - ng - 4000 - Sleep NG after wait
time : 2025-01-19T08:23:59.163Z - text: mapper2-index = 5
time : 2025-01-19T08:23:59.165Z - text: https://test/5/3 - ng - Sleep NG before wait
time : 2025-01-19T08:24:00.178Z - text: https://test/5/1
time : 2025-01-19T08:24:01.180Z - text: https://test/5/2 - ng - 2000 - Sleep NG after wait
time : 2025-01-19T08:24:03.167Z - text: https://test/5/4 - ng - 4000 - Sleep NG after wait
time : 2025-01-19T08:24:04.166Z - text: https://test/5/5
{
left: [
'https://test/2 - ng - 2000 - Sleep NG after wait',
'https://test/3 - ng - Sleep NG before wait',
'https://test/4 - ng - 4000 - Sleep NG after wait'
],
right: [ 'https://test/1 - 2000', 'https://test/5 - 10000' ]
}
全部同時実行からの直列実行
import { describe, test } from 'vitest'
import * as TE from 'fp-ts/TaskEither'
import { pipe } from 'fp-ts/lib/function'
import * as RA from 'fp-ts/ReadonlyArray'
import * as E from 'fp-ts/Either'
const sleep = (wait: number) =>
new Promise((resolve, reject) => {
const waitSeconds = wait / 1000
if (waitSeconds % 3 === 0) {
reject('Sleep NG before wait')
} else if (waitSeconds % 2 === 0) {
setTimeout(() => reject(`${wait} - Sleep NG after wait`), wait)
} else {
setTimeout(() => resolve(wait), wait)
}
}) as Promise<number>
function checkLog(text: string) {
console.log(`time : ${new Date().toISOString()} - text: ${text}`)
}
const testOptions = {
timeout: 1000 * 100
}
function sum(a: number, b: number) {
return a + b
}
function getLastIndex(site: string) {
const index = site.slice(-1)
return Number(index)
}
function getTopIndex(site: string) {
const index = site.substring(site.length - 3, site.length - 2)
return Number(index)
}
function getWaitNumber(index: number) {
const wait = index * 1000
return wait
}
function checkOkNg_FromMessage(aboutOkMessage: string) {
const resultMessage: E.Either<string, string> =
aboutOkMessage.indexOf('ng') > 0 ? E.left(aboutOkMessage) : E.right(aboutOkMessage)
return resultMessage
}
const do_sleep1 = (site: string) => (wait: number) =>
TE.tryCatch(
() =>
sleep(wait)
.then((_wait) => {
checkLog(site)
return `${site} - ok - ${_wait}`
})
.catch((error) => {
checkLog(`${site} - ng - ${error}`)
return `${site} - ng - ${error}`
}),
() => 'Error 1'
)
function getWaitFromMapper1Message(mapper1Message: string) {
const mapper1OkWaitString = mapper1Message.substring(mapper1Message.indexOf('ok') + 5)
const mapper1OkWait = Number(mapper1OkWaitString)
if (isNaN(mapper1OkWait)) {
const mapper1NgWaitString = mapper1Message.substring(mapper1Message.indexOf('ng') + 5)
const mapper1NgWaitStringWithoutErrorMessage = mapper1NgWaitString.substring(
0,
mapper1NgWaitString.indexOf('-')
)
const mapper1NgWait = Number(mapper1NgWaitStringWithoutErrorMessage)
return isNaN(mapper1NgWait) ? 0 : mapper1NgWait
} else {
return mapper1OkWait
}
}
const do_sleep2 = (site: string) => (wait: number) =>
TE.tryCatch(
() =>
sleep(wait)
.then((_wait) => {
const index = getTopIndex(site)
checkLog(`mapper2-index = ${index}`)
return (opWait: number) => `${site.substring(0, site.length - 2)} - ${_wait + opWait}`
})
.catch((error) => {
const index = getTopIndex(site)
checkLog(`mapper2-index = ${index} - ng - ${error}`)
return () => `${site.substring(0, site.length - 2)} - ng - ${error}`
}),
() => 'Error 2'
)
const op_message1_and_2 =
(op: (a: number, b: number) => number) =>
([message2, message1Array]: [
((opWait: number) => string) | (() => string),
readonly string[]
]) => {
const opWait = pipe(message1Array, RA.map(getWaitFromMapper1Message), RA.reduce(0, op))
return TE.right(message2(opWait))
}
const mapper1 = (site: string) => {
const action = pipe(site, getLastIndex, getWaitNumber, do_sleep1(site))
return action
}
function action2_and_1(
action2: TE.TaskEither<string, ((opWait: number) => string) | (() => string)>,
action1: TE.TaskEither<string, readonly string[]>,
op: (a: number, b: number) => number
) {
return pipe(
action2,
TE.fold(
(error2) => TE.right(error2),
(message2) =>
pipe(
message2(123),
checkOkNg_FromMessage,
E.fold(
(ngMessage2) => TE.right(ngMessage2),
() =>
pipe(
action1,
TE.foldW(
(error1) => TE.right(error1),
(message1) => op_message1_and_2(op)([message2, message1])
)
)
)
)
)
)
}
const mapper2_seq = (sites: string[]) => {
const site = sites[0]
const action2 = pipe(site, getTopIndex, getWaitNumber, do_sleep2(site))
const action1 = pipe(sites, RA.traverse(TE.ApplicativeSeq)(mapper1))
const action = action2_and_1(action2, action1, sum)
return action
}
describe('fp-ts 非同期処理2つの配列', () => {
test('使い方4ー全部同時実行からの直列実行', testOptions, async () => {
const sites = [
['https://test/1/1'],
['https://test/2/1', 'https://test/2/2'],
['https://test/3/1', 'https://test/3/2', 'https://test/3/3'],
['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
[
'https://test/5/1',
'https://test/5/2',
'https://test/5/3',
'https://test/5/4',
'https://test/5/5'
]
]
checkLog('[start]')
const aboutOkAction = pipe(sites, RA.traverse(TE.ApplicativePar)(mapper2_seq))
const aboutOkResult = await aboutOkAction()
const result = pipe(
aboutOkResult,
E.foldW(
(error) => E.left(error),
(aboutOkMessageArray: readonly string[]) =>
pipe(aboutOkMessageArray, RA.partitionMap(checkOkNg_FromMessage))
)
)
console.log(result)
})
})
実行結果
time : 2025-01-19T08:27:39.259Z - text: [start]
time : 2025-01-19T08:27:39.262Z - text: mapper2-index = 3 - ng - Sleep NG before wait
time : 2025-01-19T08:27:40.274Z - text: mapper2-index = 1
time : 2025-01-19T08:27:41.273Z - text: mapper2-index = 2 - ng - 2000 - Sleep NG after wait
time : 2025-01-19T08:27:41.289Z - text: https://test/1/1
time : 2025-01-19T08:27:43.273Z - text: mapper2-index = 4 - ng - 4000 - Sleep NG after wait
time : 2025-01-19T08:27:44.264Z - text: mapper2-index = 5
time : 2025-01-19T08:27:45.273Z - text: https://test/5/1
time : 2025-01-19T08:27:47.290Z - text: https://test/5/2 - ng - 2000 - Sleep NG after wait
time : 2025-01-19T08:27:47.292Z - text: https://test/5/3 - ng - Sleep NG before wait
time : 2025-01-19T08:27:51.297Z - text: https://test/5/4 - ng - 4000 - Sleep NG after wait
time : 2025-01-19T08:27:56.298Z - text: https://test/5/5
{
left: [
'https://test/2 - ng - 2000 - Sleep NG after wait',
'https://test/3 - ng - Sleep NG before wait',
'https://test/4 - ng - 4000 - Sleep NG after wait'
],
right: [ 'https://test/1 - 2000', 'https://test/5 - 17000' ]
}
全部同時実行からの最大2個まで同時実行 ※一部性能劣化
import { describe, test } from 'vitest'
import * as TE from 'fp-ts/TaskEither'
import { pipe } from 'fp-ts/lib/function'
import * as RA from 'fp-ts/ReadonlyArray'
import * as E from 'fp-ts/Either'
const sleep = (wait: number) =>
new Promise((resolve, reject) => {
const waitSeconds = wait / 1000
if (waitSeconds % 3 === 0) {
reject('Sleep NG before wait')
} else if (waitSeconds % 2 === 0) {
setTimeout(() => reject(`${wait} - Sleep NG after wait`), wait)
} else {
setTimeout(() => resolve(wait), wait)
}
}) as Promise<number>
function checkLog(text: string) {
console.log(`time : ${new Date().toISOString()} - text: ${text}`)
}
const testOptions = {
timeout: 1000 * 100
}
function sum(a: number, b: number) {
return a + b
}
function getLastIndex(site: string) {
const index = site.slice(-1)
return Number(index)
}
function getTopIndex(site: string) {
const index = site.substring(site.length - 3, site.length - 2)
return Number(index)
}
function getWaitNumber(index: number) {
const wait = index * 1000
return wait
}
function checkOkNg_FromMessage(aboutOkMessage: string) {
const resultMessage: E.Either<string, string> =
aboutOkMessage.indexOf('ng') > 0 ? E.left(aboutOkMessage) : E.right(aboutOkMessage)
return resultMessage
}
const do_sleep1 = (site: string) => (wait: number) =>
TE.tryCatch(
() =>
sleep(wait)
.then((_wait) => {
checkLog(site)
return `${site} - ok - ${_wait}`
})
.catch((error) => {
checkLog(`${site} - ng - ${error}`)
return `${site} - ng - ${error}`
}),
() => 'Error 1'
)
function getWaitFromMapper1Message(mapper1Message: string) {
const mapper1OkWaitString = mapper1Message.substring(mapper1Message.indexOf('ok') + 5)
const mapper1OkWait = Number(mapper1OkWaitString)
if (isNaN(mapper1OkWait)) {
const mapper1NgWaitString = mapper1Message.substring(mapper1Message.indexOf('ng') + 5)
const mapper1NgWaitStringWithoutErrorMessage = mapper1NgWaitString.substring(
0,
mapper1NgWaitString.indexOf('-')
)
const mapper1NgWait = Number(mapper1NgWaitStringWithoutErrorMessage)
return isNaN(mapper1NgWait) ? 0 : mapper1NgWait
} else {
return mapper1OkWait
}
}
const do_sleep2 = (site: string) => (wait: number) =>
TE.tryCatch(
() =>
sleep(wait)
.then((_wait) => {
const index = getTopIndex(site)
checkLog(`mapper2-index = ${index}`)
return (opWait: number) => `${site.substring(0, site.length - 2)} - ${_wait + opWait}`
})
.catch((error) => {
const index = getTopIndex(site)
checkLog(`mapper2-index = ${index} - ng - ${error}`)
return () => `${site.substring(0, site.length - 2)} - ng - ${error}`
}),
() => 'Error 2'
)
const op_message1_and_2 =
(op: (a: number, b: number) => number) =>
([message2, message1Array]: [
((opWait: number) => string) | (() => string),
readonly string[]
]) => {
const opWait = pipe(message1Array, RA.map(getWaitFromMapper1Message), RA.reduce(0, op))
return TE.right(message2(opWait))
}
const opStringArray = (a: readonly string[], b: readonly string[]) => {
if (b.length == 2) {
const b0 = getWaitFromMapper1Message(b[0])
const b1 = getWaitFromMapper1Message(b[1])
if (b0 > b1) {
const result = [...a, b[0]]
return result
} else {
const result = [...a, b[1]]
return result
}
} else {
const result = [...a, ...b]
return result
}
}
const map_TE_2_array =
(op: (a: readonly string[], b: readonly string[]) => readonly string[]) =>
(messageArray: TE.TaskEither<string, readonly (readonly string[])[]>) => {
const converted_message = pipe(
messageArray,
TE.fold(
(err) => TE.left(err),
(message1Array: readonly (readonly string[])[]) => {
const opWait = pipe(message1Array, RA.reduce([], op))
return TE.right(opWait)
}
)
)
return converted_message
}
const mapper1 = (site: string) => {
const action = pipe(site, getLastIndex, getWaitNumber, do_sleep1(site))
return action
}
function action2_and_1(
action2: TE.TaskEither<string, ((opWait: number) => string) | (() => string)>,
action1: TE.TaskEither<string, readonly string[]>,
op: (a: number, b: number) => number
) {
return pipe(
action2,
TE.fold(
(error2) => TE.right(error2),
(message2) =>
pipe(
message2(123),
checkOkNg_FromMessage,
E.fold(
(ngMessage2) => TE.right(ngMessage2),
() =>
pipe(
action1,
TE.foldW(
(error1) => TE.right(error1),
(message1) => op_message1_and_2(op)([message2, message1])
)
)
)
)
)
)
}
const mapper2_limit2 = (sites: string[]) => {
const site = sites[0]
const action2 = pipe(site, getTopIndex, getWaitNumber, do_sleep2(site))
const limit = 2
const chunkSites = pipe(sites, RA.chunksOf(limit))
const action1 = pipe(chunkSites, RA.map(RA.traverse(TE.ApplicativePar)(mapper1)))
const flatAction1 = pipe(action1, RA.sequence(TE.ApplicativeSeq), map_TE_2_array(opStringArray))
const action = action2_and_1(action2, flatAction1, sum)
return action
}
describe('fp-ts 非同期処理2つの配列', () => {
test('使い方6ー全部同時実行からの最大2個まで同時実行', testOptions, async () => {
const sites = [
['https://test/1/1'],
['https://test/2/1', 'https://test/2/2'],
['https://test/3/1', 'https://test/3/2', 'https://test/3/3'],
['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
[
'https://test/5/1',
'https://test/5/2',
'https://test/5/3',
'https://test/5/4',
'https://test/5/5'
]
]
checkLog('[start]')
const aboutOkAction = pipe(sites, RA.traverse(TE.ApplicativePar)(mapper2_limit2))
const aboutOkResult = await aboutOkAction()
const result = pipe(
aboutOkResult,
E.foldW(
(error) => E.left(error),
(aboutOkMessageArray: readonly string[]) =>
pipe(aboutOkMessageArray, RA.partitionMap(checkOkNg_FromMessage))
)
)
console.log(result)
})
})
実行結果
time : 2025-01-19T08:32:06.335Z - text: [start]
time : 2025-01-19T08:32:06.339Z - text: mapper2-index = 3 - ng - Sleep NG before wait
time : 2025-01-19T08:32:07.354Z - text: mapper2-index = 1
time : 2025-01-19T08:32:08.340Z - text: mapper2-index = 2 - ng - 2000 - Sleep NG after wait
time : 2025-01-19T08:32:08.357Z - text: https://test/1/1
time : 2025-01-19T08:32:10.345Z - text: mapper2-index = 4 - ng - 4000 - Sleep NG after wait
time : 2025-01-19T08:32:11.343Z - text: mapper2-index = 5
time : 2025-01-19T08:32:12.353Z - text: https://test/5/1
time : 2025-01-19T08:32:13.349Z - text: https://test/5/2 - ng - 2000 - Sleep NG after wait
time : 2025-01-19T08:32:13.351Z - text: https://test/5/3 - ng - Sleep NG before wait
time : 2025-01-19T08:32:17.361Z - text: https://test/5/4 - ng - 4000 - Sleep NG after wait
time : 2025-01-19T08:32:22.368Z - text: https://test/5/5
{
left: [
'https://test/2 - ng - 2000 - Sleep NG after wait',
'https://test/3 - ng - Sleep NG before wait',
'https://test/4 - ng - 4000 - Sleep NG after wait'
],
right: [ 'https://test/1 - 2000', 'https://test/5 - 16000' ]
}