配列の配列の非同期処理で対象API(以下ではsleep)がabortする場合のメモ
前回
配列が1つの場合
全部同時実行
import { describe, test } from 'vitest'
import * as TE from 'fp-ts/TaskEither'
import { pipe } from 'fp-ts/lib/function'
import * as RA from 'fp-ts/ReadonlyArray'
import * as O from 'fp-ts/Option'
import * as T from 'fp-ts/Task'
const sleep = (wait: number) =>
new Promise((resolve, reject) => {
const waitSeconds = wait / 1000
if (waitSeconds % 3 === 0) {
reject('Sleep NG before wait')
} else if (waitSeconds % 2 === 0) {
setTimeout(() => reject(`${wait} - Sleep NG after wait`), wait)
} else {
setTimeout(() => resolve(wait), wait)
}
}) as Promise<number>
function checkLog(text: string) {
console.log(`time : ${new Date().toISOString()} - text: ${text}`)
}
const testOptions = {
timeout: 1000 * 100
}
const getLastIndex = (abortAction: Function) => (site: string) => {
const index = site.slice(-1)
if (Number.isNaN(Number(index))) {
checkLog(`${site} - abort - LastIndex is NaN`)
abortAction()
throw new Error(`${site} - abort - LastIndex is NaN`)
}
if (site[site.length - 2] !== '/') {
checkLog(`${site} - abort - LastIndex >= 10`)
abortAction()
throw new Error(`${site} - abort - LastIndex >= 10`)
}
return Number(index)
}
const checkIndex =
(abortAction: Function) => (site: string, topIndex: number, lastIndex: number) => {
if (topIndex + lastIndex >= 10) {
checkLog(`${site} - abort - (topIndex + lastIndex) >= 10`)
abortAction()
throw new Error(`${site} - abort - (topIndex + lastIndex) >= 10`)
}
}
function getWaitNumber(index: number) {
const wait = index * 1000
return wait
}
const do_sleep1 = (site: string) => (wait: number) =>
TE.tryCatch(
() =>
sleep(wait).then((_wait) => {
checkLog(site)
return `${site} - ok - ${_wait}`
}),
(error) => {
checkLog(`${site} - ng - ${error}`)
return `${site} - ng - ${error}`
}
)
const throwError = (abortController: AbortController) => (error: unknown) => {
const message = error && typeof error === 'object' && 'message' in error ? error.message : error
if (abortController.signal.aborted) {
throw message
} else {
return String(message)
}
}
const mapper1_throwError =
(topIndex: O.Option<number>) => (abortController: AbortController) => (site: string) => {
if (abortController.signal.aborted) {
checkLog(`${site} - aborted`)
return TE.left(`${site} - aborted`)
}
const abortEvent = () => {
checkLog(`${site} - aborted - from event`)
}
abortController.signal.addEventListener('abort', abortEvent)
function removeEvent() {
abortController.signal.removeEventListener('abort', abortEvent)
}
const abortAction = () => {
removeEvent()
abortController.abort()
}
const action = pipe(
topIndex,
O.match(
() =>
pipe(
TE.tryCatch(
async () => pipe(site, getLastIndex(abortAction), getWaitNumber, do_sleep1(site)),
(error) => throwError(abortController)(error)
),
TE.flattenW
),
(topIndex) =>
pipe(
TE.tryCatch(
async () => {
const lastIndex = getLastIndex(abortAction)(site)
checkIndex(abortAction)(site, topIndex, lastIndex)
const action = pipe(lastIndex, getWaitNumber, do_sleep1(site))
return action
},
(error) => throwError(abortController)(error)
),
TE.flattenW
)
)
)
const actionWithRemoveEvent = pipe(
action,
TE.fold(
(error) => {
removeEvent()
return TE.left(error)
},
(result) => {
removeEvent()
return TE.right(result)
}
)
)
return actionWithRemoveEvent
}
describe('fp-ts 非同期処理1つの配列', () => {
test('使い方1ー全部同時実行', testOptions, async () => {
const sites = [
'https://test/1',
'https://test/2',
'https://test/3',
'https://test/4',
'https://test/5',
'https://test/10'
]
const abortController = new AbortController()
checkLog('[start]')
try {
const action = pipe(
sites,
RA.traverse(T.ApplicativePar)(mapper1_throwError(O.none)(abortController))
)
const result = await action()
const separateResult = pipe(result, RA.separate)
console.log(separateResult)
} catch (error) {
console.log(`error -> ${error}`)
}
})
})
実行結果
time : 2025-01-27T12:47:50.742Z - text: [start]
time : 2025-01-27T12:47:50.745Z - text: https://test/10 - abort - LastIndex >= 10
time : 2025-01-27T12:47:50.745Z - text: https://test/1 - aborted - from event
time : 2025-01-27T12:47:50.745Z - text: https://test/2 - aborted - from event
time : 2025-01-27T12:47:50.745Z - text: https://test/3 - aborted - from event
time : 2025-01-27T12:47:50.746Z - text: https://test/4 - aborted - from event
time : 2025-01-27T12:47:50.746Z - text: https://test/5 - aborted - from event
error -> https://test/10 - abort - LastIndex >= 10
time : 2025-01-27T12:47:50.746Z - text: https://test/3 - ng - Sleep NG before wait
実行結果(※中断データを除外した場合)
time : 2025-01-27T12:48:10.719Z - text: [start]
time : 2025-01-27T12:48:10.722Z - text: https://test/3 - ng - Sleep NG before wait
time : 2025-01-27T12:48:11.725Z - text: https://test/1
time : 2025-01-27T12:48:12.725Z - text: https://test/2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T12:48:14.724Z - text: https://test/4 - ng - 4000 - Sleep NG after wait
time : 2025-01-27T12:48:15.732Z - text: https://test/5
{
left: [
'https://test/2 - ng - 2000 - Sleep NG after wait',
'https://test/3 - ng - Sleep NG before wait',
'https://test/4 - ng - 4000 - Sleep NG after wait'
],
right: [ 'https://test/1 - ok - 1000', 'https://test/5 - ok - 5000' ]
}
最大2個まで同時実行
import { describe, test } from 'vitest'
import * as TE from 'fp-ts/TaskEither'
import { pipe } from 'fp-ts/lib/function'
import * as RA from 'fp-ts/ReadonlyArray'
import * as O from 'fp-ts/Option'
import * as T from 'fp-ts/Task'
const sleep = (wait: number) =>
new Promise((resolve, reject) => {
const waitSeconds = wait / 1000
if (waitSeconds % 3 === 0) {
reject('Sleep NG before wait')
} else if (waitSeconds % 2 === 0) {
setTimeout(() => reject(`${wait} - Sleep NG after wait`), wait)
} else {
setTimeout(() => resolve(wait), wait)
}
}) as Promise<number>
function checkLog(text: string) {
console.log(`time : ${new Date().toISOString()} - text: ${text}`)
}
const testOptions = {
timeout: 1000 * 100
}
const getLastIndex = (abortAction: Function) => (site: string) => {
const index = site.slice(-1)
if (Number.isNaN(Number(index))) {
checkLog(`${site} - abort - LastIndex is NaN`)
abortAction()
throw new Error(`${site} - abort - LastIndex is NaN`)
}
if (site[site.length - 2] !== '/') {
checkLog(`${site} - abort - LastIndex >= 10`)
abortAction()
throw new Error(`${site} - abort - LastIndex >= 10`)
}
return Number(index)
}
const checkIndex =
(abortAction: Function) => (site: string, topIndex: number, lastIndex: number) => {
if (topIndex + lastIndex >= 10) {
checkLog(`${site} - abort - (topIndex + lastIndex) >= 10`)
abortAction()
throw new Error(`${site} - abort - (topIndex + lastIndex) >= 10`)
}
}
function getWaitNumber(index: number) {
const wait = index * 1000
return wait
}
const do_sleep1 = (site: string) => (wait: number) =>
TE.tryCatch(
() =>
sleep(wait).then((_wait) => {
checkLog(site)
return `${site} - ok - ${_wait}`
}),
(error) => {
checkLog(`${site} - ng - ${error}`)
return `${site} - ng - ${error}`
}
)
const throwError = (abortController: AbortController) => (error: unknown) => {
const message = error && typeof error === 'object' && 'message' in error ? error.message : error
if (abortController.signal.aborted) {
throw message
} else {
return String(message)
}
}
const mapper1_throwError =
(topIndex: O.Option<number>) => (abortController: AbortController) => (site: string) => {
if (abortController.signal.aborted) {
checkLog(`${site} - aborted`)
return TE.left(`${site} - aborted`)
}
const abortEvent = () => {
checkLog(`${site} - aborted - from event`)
}
abortController.signal.addEventListener('abort', abortEvent)
function removeEvent() {
abortController.signal.removeEventListener('abort', abortEvent)
}
const abortAction = () => {
removeEvent()
abortController.abort()
}
const action = pipe(
topIndex,
O.match(
() =>
pipe(
TE.tryCatch(
async () => pipe(site, getLastIndex(abortAction), getWaitNumber, do_sleep1(site)),
(error) => throwError(abortController)(error)
),
TE.flattenW
),
(topIndex) =>
pipe(
TE.tryCatch(
async () => {
const lastIndex = getLastIndex(abortAction)(site)
checkIndex(abortAction)(site, topIndex, lastIndex)
const action = pipe(lastIndex, getWaitNumber, do_sleep1(site))
return action
},
(error) => throwError(abortController)(error)
),
TE.flattenW
)
)
)
const actionWithRemoveEvent = pipe(
action,
TE.fold(
(error) => {
removeEvent()
return TE.left(error)
},
(result) => {
removeEvent()
return TE.right(result)
}
)
)
return actionWithRemoveEvent
}
describe('fp-ts 非同期処理1つの配列', () => {
test('使い方2ー最大2個まで同時実行', testOptions, async () => {
const sites = [
'https://test/1',
'https://test/2',
'https://test/3',
'https://test/4',
'https://test/5',
'https://test/10'
]
const abortController = new AbortController()
checkLog('[start]')
try {
const limit = 2
const chunkSites = pipe(sites, RA.chunksOf(limit))
const actionArray = pipe(
chunkSites,
RA.map(RA.traverse(T.ApplicativePar)(mapper1_throwError(O.none)(abortController)))
)
const action = pipe(actionArray, RA.sequence(T.ApplicativeSeq), T.map(RA.flatten))
const result = await action()
const separateResult = pipe(result, RA.separate)
console.log(separateResult)
} catch (error) {
console.log(error)
}
})
})
実行結果
time : 2025-01-27T12:51:32.445Z - text: [start]
time : 2025-01-27T12:51:33.455Z - text: https://test/1
time : 2025-01-27T12:51:34.451Z - text: https://test/2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T12:51:34.454Z - text: https://test/3 - ng - Sleep NG before wait
time : 2025-01-27T12:51:38.462Z - text: https://test/4 - ng - 4000 - Sleep NG after wait
time : 2025-01-27T12:51:38.464Z - text: https://test/10 - abort - LastIndex >= 10
time : 2025-01-27T12:51:38.467Z - text: https://test/5 - aborted - from event
https://test/10 - abort - LastIndex >= 10
実行結果(※中断データを除外した場合)
time : 2025-01-27T12:54:12.154Z - text: [start]
time : 2025-01-27T12:54:13.161Z - text: https://test/1
time : 2025-01-27T12:54:14.165Z - text: https://test/2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T12:54:14.167Z - text: https://test/3 - ng - Sleep NG before wait
time : 2025-01-27T12:54:18.168Z - text: https://test/4 - ng - 4000 - Sleep NG after wait
time : 2025-01-27T12:54:23.171Z - text: https://test/5
{
left: [
'https://test/2 - ng - 2000 - Sleep NG after wait',
'https://test/3 - ng - Sleep NG before wait',
'https://test/4 - ng - 4000 - Sleep NG after wait'
],
right: [ 'https://test/1 - ok - 1000', 'https://test/5 - ok - 5000' ]
}
配列が2つの場合(シンプル)
全部同時実行からの全部同時実行
import { describe, test } from 'vitest'
import * as TE from 'fp-ts/TaskEither'
import { pipe } from 'fp-ts/lib/function'
import * as RA from 'fp-ts/ReadonlyArray'
import * as E from 'fp-ts/Either'
import * as O from 'fp-ts/Option'
import * as T from 'fp-ts/Task'
const sleep = (wait: number) =>
new Promise((resolve, reject) => {
const waitSeconds = wait / 1000
if (waitSeconds % 3 === 0) {
reject('Sleep NG before wait')
} else if (waitSeconds % 2 === 0) {
setTimeout(() => reject(`${wait} - Sleep NG after wait`), wait)
} else {
setTimeout(() => resolve(wait), wait)
}
}) as Promise<number>
function checkLog(text: string) {
console.log(`time : ${new Date().toISOString()} - text: ${text}`)
}
const testOptions = {
timeout: 1000 * 100
}
const getLastIndex = (abortAction: Function) => (site: string) => {
const index = site.slice(-1)
if (Number.isNaN(Number(index))) {
checkLog(`${site} - abort - LastIndex is NaN`)
abortAction()
throw new Error(`${site} - abort - LastIndex is NaN`)
}
if (site[site.length - 2] !== '/') {
checkLog(`${site} - abort - LastIndex >= 10`)
abortAction()
throw new Error(`${site} - abort - LastIndex >= 10`)
}
return Number(index)
}
const getTopIndex = (abortAction: Function) => (site: string) => {
const index = site.substring(site.length - 3, site.length - 2)
if (Number.isNaN(Number(index))) {
checkLog(`${site} - abort - TopIndex is NaN`)
abortAction()
throw new Error(`${site} - abort - TopIndex is NaN`)
}
return Number(index)
}
const checkIndex =
(abortAction: Function) => (site: string, topIndex: number, lastIndex: number) => {
if (topIndex + lastIndex >= 10) {
checkLog(`${site} - abort - (topIndex + lastIndex) >= 10`)
abortAction()
throw new Error(`${site} - abort - (topIndex + lastIndex) >= 10`)
}
}
function getWaitNumber(index: number) {
const wait = index * 1000
return wait
}
const do_sleep1 = (site: string) => (wait: number) =>
TE.tryCatch(
() =>
sleep(wait).then((_wait) => {
checkLog(site)
return `${site} - ok - ${_wait}`
}),
(error) => {
checkLog(`${site} - ng - ${error}`)
return `${site} - ng - ${error}`
}
)
function getWaitFromMapper1Message(mapper1Message: string) {
const mapper1OkWaitString = mapper1Message.substring(mapper1Message.indexOf('ok') + 5)
const mapper1OkWait = Number(mapper1OkWaitString)
if (isNaN(mapper1OkWait)) {
const mapper1NgWaitString = mapper1Message.substring(mapper1Message.indexOf('ng') + 5)
const mapper1NgWaitStringWithoutErrorMessage = mapper1NgWaitString.substring(
0,
mapper1NgWaitString.indexOf('-')
)
const mapper1NgWait = Number(mapper1NgWaitStringWithoutErrorMessage)
return isNaN(mapper1NgWait) ? 0 : mapper1NgWait
} else {
return mapper1OkWait
}
}
const do_sleep2 = (site: string, abortAction: Function) => (wait: number) =>
TE.tryCatch(
() =>
sleep(wait).then((_wait) => {
const index = getTopIndex(abortAction)(site)
checkLog(`mapper2-index = ${index}`)
return (opWait: number) => `${site.substring(0, site.length - 2)} - ${_wait + opWait}`
}),
(error) => {
const index = getTopIndex(abortAction)(site)
checkLog(`mapper2-index = ${index} - ng - ${error}`)
return `${site.substring(0, site.length - 2)} - ng - ${error}`
}
)
const throwError = (abortController: AbortController) => (error: unknown) => {
const message = error && typeof error === 'object' && 'message' in error ? error.message : error
if (abortController.signal.aborted) {
throw message
} else {
return String(message)
}
}
const mapper1_throwError =
(topIndex: O.Option<number>) => (abortController: AbortController) => (site: string) => {
if (abortController.signal.aborted) {
checkLog(`${site} - aborted`)
return TE.left(`${site} - aborted`)
}
const abortEvent = () => {
checkLog(`${site} - aborted - from event`)
}
abortController.signal.addEventListener('abort', abortEvent)
function removeEvent() {
abortController.signal.removeEventListener('abort', abortEvent)
}
const abortAction = () => {
removeEvent()
abortController.abort()
}
const action = pipe(
topIndex,
O.match(
() =>
pipe(
TE.tryCatch(
async () => pipe(site, getLastIndex(abortAction), getWaitNumber, do_sleep1(site)),
(error) => throwError(abortController)(error)
),
TE.flattenW
),
(topIndex) =>
pipe(
TE.tryCatch(
async () => {
const lastIndex = getLastIndex(abortAction)(site)
checkIndex(abortAction)(site, topIndex, lastIndex)
const action = pipe(lastIndex, getWaitNumber, do_sleep1(site))
return action
},
(error) => throwError(abortController)(error)
),
TE.flattenW
)
)
)
const actionWithRemoveEvent = pipe(
action,
TE.fold(
(error) => {
removeEvent()
return TE.left(error)
},
(result) => {
removeEvent()
return TE.right(result)
}
)
)
return actionWithRemoveEvent
}
function action2_and_1(
abortController: AbortController,
action2: TE.TaskEither<string, (opWait: number) => string>,
removeEvent: Function,
action1: () => T.Task<readonly E.Either<string, string>[]>,
op: (a: number, b: number) => number
) {
const action = pipe(
action2,
TE.foldW(
(error2) => {
removeEvent()
return TE.left(throwError(abortController)(error2))
},
(message2Function) => async () => {
const resultArray1 = await action1()()
const convertedResultArray1 = pipe(
resultArray1,
RA.map((result1) =>
pipe(
result1,
E.fold(
(error1) => E.right(getWaitFromMapper1Message(error1)),
(message1) => E.right(getWaitFromMapper1Message(message1))
)
)
)
)
const separateResult = pipe(convertedResultArray1, RA.separate)
const opWait1 = pipe(separateResult.right, RA.reduce(0, op))
removeEvent()
return E.right(message2Function(opWait1))
}
)
)
return action
}
const mapper2_par = (abortController: AbortController) => (sites: string[]) => {
const site = sites[0]
const topIndex = getTopIndex(abortController.abort)(site)
if (abortController.signal.aborted) {
checkLog(`mapper2-index = ${topIndex} - aborted`)
return TE.left(`mapper2-index = ${topIndex} - aborted`)
}
const abortEvent = () => {
checkLog(`mapper2-index = ${topIndex} - aborted - from event`)
}
abortController.signal.addEventListener('abort', abortEvent)
function removeEvent() {
abortController.signal.removeEventListener('abort', abortEvent)
}
const abortAction = () => {
removeEvent()
abortController.abort()
}
const action2 = pipe(
TE.tryCatch(
async () => pipe(topIndex, getWaitNumber, do_sleep2(site, abortAction)),
(error) => throwError(abortController)(error)
),
TE.flattenW
)
const action = action2_and_1(
abortController,
action2,
removeEvent,
() =>
pipe(
sites,
RA.traverse(T.ApplicativePar)(mapper1_throwError(O.some(topIndex))(abortController))
),
Math.max
)
return pipe(
action as any,
TE.fold(
(error) => {
removeEvent()
return TE.left(error)
},
(result) => {
removeEvent()
return TE.right(result)
}
)
)
}
describe('fp-ts 非同期処理2つの配列', () => {
test('使い方1ー全部同時実行からの全部同時実行', testOptions, async () => {
const sites = [
// ['https://test/1/1'],
// ['https://test/2/1', 'https://test/2/2']
// ['https://test/3/1', 'https://test/3/2', 'https://test/3/3']
// ['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
// [
// 'https://test/5/1',
// 'https://test/5/2',
// 'https://test/5/3',
// 'https://test/5/4'
// // 'https://test/5/5'
// ]
[
'https://test/7/1',
'https://test/7/2',
'https://test/7/3',
'https://test/7/4',
'https://test/7/5',
'https://test/7/6',
'https://test/7/7'
]
]
const abortController = new AbortController()
checkLog('[start]')
try {
const action = pipe(sites, RA.traverse(T.ApplicativePar)(mapper2_par(abortController)))
const result = await action()
const separateResult = pipe(result, RA.separate)
console.log(separateResult)
} catch (error) {
console.log(error)
}
})
})
実行結果
time : 2025-01-27T13:00:00.792Z - text: [start]
time : 2025-01-27T13:00:07.806Z - text: mapper2-index = 7
time : 2025-01-27T13:00:07.812Z - text: https://test/7/7 - abort - (topIndex + lastIndex) >= 10
time : 2025-01-27T13:00:07.816Z - text: mapper2-index = 7 - aborted - from event
time : 2025-01-27T13:00:07.817Z - text: https://test/7/1 - aborted - from event
time : 2025-01-27T13:00:07.818Z - text: https://test/7/2 - aborted - from event
time : 2025-01-27T13:00:07.818Z - text: https://test/7/3 - aborted - from event
time : 2025-01-27T13:00:07.818Z - text: https://test/7/4 - aborted - from event
time : 2025-01-27T13:00:07.819Z - text: https://test/7/5 - aborted - from event
time : 2025-01-27T13:00:07.820Z - text: https://test/7/6 - aborted - from event
time : 2025-01-27T13:00:07.820Z - text: https://test/7/6 - abort - (topIndex + lastIndex) >= 10
time : 2025-01-27T13:00:07.822Z - text: https://test/7/5 - abort - (topIndex + lastIndex) >= 10
time : 2025-01-27T13:00:07.824Z - text: https://test/7/4 - abort - (topIndex + lastIndex) >= 10
time : 2025-01-27T13:00:07.824Z - text: https://test/7/3 - abort - (topIndex + lastIndex) >= 10
https://test/7/7 - abort - (topIndex + lastIndex) >= 10
実行結果(※中断データを除外した場合)
time : 2025-01-27T13:02:05.156Z - text: [start]
time : 2025-01-27T13:02:12.163Z - text: mapper2-index = 7
time : 2025-01-27T13:02:13.178Z - text: https://test/7/1
time : 2025-01-27T13:02:14.168Z - text: https://test/7/2 - ng - 2000 - Sleep NG after wait
{ left: [], right: [ 'https://test/7 - 9000' ] }
全部同時実行からの直列実行
import { describe, test } from 'vitest'
import * as TE from 'fp-ts/TaskEither'
import { pipe } from 'fp-ts/lib/function'
import * as RA from 'fp-ts/ReadonlyArray'
import * as E from 'fp-ts/Either'
import * as O from 'fp-ts/Option'
import * as T from 'fp-ts/Task'
const sleep = (wait: number) =>
new Promise((resolve, reject) => {
const waitSeconds = wait / 1000
if (waitSeconds % 3 === 0) {
reject('Sleep NG before wait')
} else if (waitSeconds % 2 === 0) {
setTimeout(() => reject(`${wait} - Sleep NG after wait`), wait)
} else {
setTimeout(() => resolve(wait), wait)
}
}) as Promise<number>
function checkLog(text: string) {
console.log(`time : ${new Date().toISOString()} - text: ${text}`)
}
const testOptions = {
timeout: 1000 * 100
}
function sum(a: number, b: number) {
return a + b
}
const getLastIndex = (abortAction: Function) => (site: string) => {
const index = site.slice(-1)
if (Number.isNaN(Number(index))) {
checkLog(`${site} - abort - LastIndex is NaN`)
abortAction()
throw new Error(`${site} - abort - LastIndex is NaN`)
}
if (site[site.length - 2] !== '/') {
checkLog(`${site} - abort - LastIndex >= 10`)
abortAction()
throw new Error(`${site} - abort - LastIndex >= 10`)
}
return Number(index)
}
const getTopIndex = (abortAction: Function) => (site: string) => {
const index = site.substring(site.length - 3, site.length - 2)
if (Number.isNaN(Number(index))) {
checkLog(`${site} - abort - TopIndex is NaN`)
abortAction()
throw new Error(`${site} - abort - TopIndex is NaN`)
}
return Number(index)
}
const checkIndex =
(abortAction: Function) => (site: string, topIndex: number, lastIndex: number) => {
if (topIndex + lastIndex >= 10) {
checkLog(`${site} - abort - (topIndex + lastIndex) >= 10`)
abortAction()
throw new Error(`${site} - abort - (topIndex + lastIndex) >= 10`)
}
}
function getWaitNumber(index: number) {
const wait = index * 1000
return wait
}
const do_sleep1 = (site: string) => (wait: number) =>
TE.tryCatch(
() =>
sleep(wait).then((_wait) => {
checkLog(site)
return `${site} - ok - ${_wait}`
}),
(error) => {
checkLog(`${site} - ng - ${error}`)
return `${site} - ng - ${error}`
}
)
function getWaitFromMapper1Message(mapper1Message: string) {
const mapper1OkWaitString = mapper1Message.substring(mapper1Message.indexOf('ok') + 5)
const mapper1OkWait = Number(mapper1OkWaitString)
if (isNaN(mapper1OkWait)) {
const mapper1NgWaitString = mapper1Message.substring(mapper1Message.indexOf('ng') + 5)
const mapper1NgWaitStringWithoutErrorMessage = mapper1NgWaitString.substring(
0,
mapper1NgWaitString.indexOf('-')
)
const mapper1NgWait = Number(mapper1NgWaitStringWithoutErrorMessage)
return isNaN(mapper1NgWait) ? 0 : mapper1NgWait
} else {
return mapper1OkWait
}
}
const do_sleep2 = (site: string, abortAction: Function) => (wait: number) =>
TE.tryCatch(
() =>
sleep(wait).then((_wait) => {
const index = getTopIndex(abortAction)(site)
checkLog(`mapper2-index = ${index}`)
return (opWait: number) => `${site.substring(0, site.length - 2)} - ${_wait + opWait}`
}),
(error) => {
const index = getTopIndex(abortAction)(site)
checkLog(`mapper2-index = ${index} - ng - ${error}`)
return `${site.substring(0, site.length - 2)} - ng - ${error}`
}
)
const throwError = (abortController: AbortController) => (error: unknown) => {
const message = error && typeof error === 'object' && 'message' in error ? error.message : error
if (abortController.signal.aborted) {
throw message
} else {
return String(message)
}
}
const mapper1_throwError =
(topIndex: O.Option<number>) => (abortController: AbortController) => (site: string) => {
if (abortController.signal.aborted) {
checkLog(`${site} - aborted`)
return TE.left(`${site} - aborted`)
}
const abortEvent = () => {
checkLog(`${site} - aborted - from event`)
}
abortController.signal.addEventListener('abort', abortEvent)
function removeEvent() {
abortController.signal.removeEventListener('abort', abortEvent)
}
const abortAction = () => {
removeEvent()
abortController.abort()
}
const action = pipe(
topIndex,
O.match(
() =>
pipe(
TE.tryCatch(
async () => pipe(site, getLastIndex(abortAction), getWaitNumber, do_sleep1(site)),
(error) => throwError(abortController)(error)
),
TE.flattenW
),
(topIndex) =>
pipe(
TE.tryCatch(
async () => {
const lastIndex = getLastIndex(abortAction)(site)
checkIndex(abortAction)(site, topIndex, lastIndex)
const action = pipe(lastIndex, getWaitNumber, do_sleep1(site))
return action
},
(error) => throwError(abortController)(error)
),
TE.flattenW
)
)
)
const actionWithRemoveEvent = pipe(
action,
TE.fold(
(error) => {
removeEvent()
return TE.left(error)
},
(result) => {
removeEvent()
return TE.right(result)
}
)
)
return actionWithRemoveEvent
}
function action2_and_1(
abortController: AbortController,
action2: TE.TaskEither<string, (opWait: number) => string>,
removeEvent: Function,
action1: () => T.Task<readonly E.Either<string, string>[]>,
op: (a: number, b: number) => number
) {
const action = pipe(
action2,
TE.foldW(
(error2) => {
removeEvent()
return TE.left(throwError(abortController)(error2))
},
(message2Function) => async () => {
const resultArray1 = await action1()()
const convertedResultArray1 = pipe(
resultArray1,
RA.map((result1) =>
pipe(
result1,
E.fold(
(error1) => E.right(getWaitFromMapper1Message(error1)),
(message1) => E.right(getWaitFromMapper1Message(message1))
)
)
)
)
const separateResult = pipe(convertedResultArray1, RA.separate)
const opWait1 = pipe(separateResult.right, RA.reduce(0, op))
removeEvent()
return E.right(message2Function(opWait1))
}
)
)
return action
}
const mapper2_seq = (abortController: AbortController) => (sites: string[]) => {
const site = sites[0]
const topIndex = getTopIndex(abortController.abort)(site)
const abortEvent = () => {
checkLog(`mapper2-index = ${topIndex} - aborted - from event`)
}
abortController.signal.addEventListener('abort', abortEvent)
function removeEvent() {
abortController.signal.removeEventListener('abort', abortEvent)
}
const abortAction = () => {
removeEvent()
abortController.abort()
}
if (abortController.signal.aborted) {
checkLog(`mapper2-index = ${topIndex} - aborted`)
removeEvent()
return TE.left(`mapper2-index = ${topIndex} - aborted`)
}
const action2 = pipe(
TE.tryCatch(
async () => pipe(topIndex, getWaitNumber, do_sleep2(site, abortAction)),
(error) => throwError(abortController)(error)
),
TE.flattenW
)
const action = action2_and_1(
abortController,
action2,
removeEvent,
() =>
pipe(
sites,
RA.traverse(T.ApplicativeSeq)(mapper1_throwError(O.some(topIndex))(abortController))
),
sum
)
return action
}
describe('fp-ts 非同期処理2つの配列', () => {
test('使い方3ー全部同時実行からの直列実行', testOptions, async () => {
const sites = [
// ['https://test/1/1'],
// ['https://test/2/1', 'https://test/2/2']
// ['https://test/3/1', 'https://test/3/2', 'https://test/3/3']
// ['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
// [
// 'https://test/5/1',
// 'https://test/5/2',
// 'https://test/5/3',
// 'https://test/5/4',
// 'https://test/5/5'
// ]
[
'https://test/7/1',
'https://test/7/2',
'https://test/7/3',
'https://test/7/4',
'https://test/7/5',
'https://test/7/6',
'https://test/7/7'
]
]
const abortController = new AbortController()
checkLog('[start]')
try {
const action = pipe(sites, RA.traverse(T.ApplicativePar)(mapper2_seq(abortController)))
const result = await action()
const separateResult = pipe(result, RA.separate)
console.log(separateResult)
} catch (error) {
console.log(error)
}
})
})
実行結果
time : 2025-01-27T13:06:22.590Z - text: [start]
time : 2025-01-27T13:06:29.598Z - text: mapper2-index = 7
time : 2025-01-27T13:06:30.604Z - text: https://test/7/1
time : 2025-01-27T13:06:32.620Z - text: https://test/7/2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T13:06:32.621Z - text: https://test/7/3 - abort - (topIndex + lastIndex) >= 10
time : 2025-01-27T13:06:32.623Z - text: mapper2-index = 7 - aborted - from event
time : 2025-01-27T13:06:32.623Z - text: https://test/7/4 - aborted - from event
time : 2025-01-27T13:06:32.623Z - text: https://test/7/5 - aborted - from event
time : 2025-01-27T13:06:32.623Z - text: https://test/7/6 - aborted - from event
time : 2025-01-27T13:06:32.624Z - text: https://test/7/7 - aborted - from event
https://test/7/3 - abort - (topIndex + lastIndex) >= 10
実行結果(※中断データを除外した場合)
time : 2025-01-27T13:07:15.692Z - text: [start]
time : 2025-01-27T13:07:22.709Z - text: mapper2-index = 7
time : 2025-01-27T13:07:23.728Z - text: https://test/7/1
time : 2025-01-27T13:07:25.738Z - text: https://test/7/2 - ng - 2000 - Sleep NG after wait
{ left: [], right: [ 'https://test/7 - 10000' ] }
全部同時実行からの最大2個まで同時実行
import { describe, test } from 'vitest'
import * as TE from 'fp-ts/TaskEither'
import { pipe } from 'fp-ts/lib/function'
import * as RA from 'fp-ts/ReadonlyArray'
import * as E from 'fp-ts/Either'
import * as O from 'fp-ts/Option'
import * as T from 'fp-ts/Task'
const sleep = (wait: number) =>
new Promise((resolve, reject) => {
const waitSeconds = wait / 1000
if (waitSeconds % 3 === 0) {
reject('Sleep NG before wait')
} else if (waitSeconds % 2 === 0) {
setTimeout(() => reject(`${wait} - Sleep NG after wait`), wait)
} else {
setTimeout(() => resolve(wait), wait)
}
}) as Promise<number>
function checkLog(text: string) {
console.log(`time : ${new Date().toISOString()} - text: ${text}`)
}
const testOptions = {
timeout: 1000 * 100
}
function sum(a: number, b: number) {
return a + b
}
const getLastIndex = (abortAction: Function) => (site: string) => {
const index = site.slice(-1)
if (Number.isNaN(Number(index))) {
checkLog(`${site} - abort - LastIndex is NaN`)
abortAction()
throw new Error(`${site} - abort - LastIndex is NaN`)
}
if (site[site.length - 2] !== '/') {
checkLog(`${site} - abort - LastIndex >= 10`)
abortAction()
throw new Error(`${site} - abort - LastIndex >= 10`)
}
return Number(index)
}
const getTopIndex = (abortAction: Function) => (site: string) => {
const index = site.substring(site.length - 3, site.length - 2)
if (Number.isNaN(Number(index))) {
checkLog(`${site} - abort - TopIndex is NaN`)
abortAction()
throw new Error(`${site} - abort - TopIndex is NaN`)
}
return Number(index)
}
const checkIndex =
(abortAction: Function) => (site: string, topIndex: number, lastIndex: number) => {
if (topIndex + lastIndex >= 10) {
checkLog(`${site} - abort - (topIndex + lastIndex) >= 10`)
abortAction()
throw new Error(`${site} - abort - (topIndex + lastIndex) >= 10`)
}
}
function getWaitNumber(index: number) {
const wait = index * 1000
return wait
}
const do_sleep1 = (site: string) => (wait: number) =>
TE.tryCatch(
() =>
sleep(wait).then((_wait) => {
checkLog(site)
return `${site} - ok - ${_wait}`
}),
(error) => {
checkLog(`${site} - ng - ${error}`)
return `${site} - ng - ${error}`
}
)
function getWaitFromMapper1Message(mapper1Message: string) {
const mapper1OkWaitString = mapper1Message.substring(mapper1Message.indexOf('ok') + 5)
const mapper1OkWait = Number(mapper1OkWaitString)
if (isNaN(mapper1OkWait)) {
const mapper1NgWaitString = mapper1Message.substring(mapper1Message.indexOf('ng') + 5)
const mapper1NgWaitStringWithoutErrorMessage = mapper1NgWaitString.substring(
0,
mapper1NgWaitString.indexOf('-')
)
const mapper1NgWait = Number(mapper1NgWaitStringWithoutErrorMessage)
return isNaN(mapper1NgWait) ? 0 : mapper1NgWait
} else {
return mapper1OkWait
}
}
const do_sleep2 = (site: string, abortAction: Function) => (wait: number) =>
TE.tryCatch(
() =>
sleep(wait).then((_wait) => {
const index = getTopIndex(abortAction)(site)
checkLog(`mapper2-index = ${index}`)
return (opWait: number) => `${site.substring(0, site.length - 2)} - ${_wait + opWait}`
}),
(error) => {
const index = getTopIndex(abortAction)(site)
checkLog(`mapper2-index = ${index} - ng - ${error}`)
return `${site.substring(0, site.length - 2)} - ng - ${error}`
}
)
const flatternEitherStringArray = (
a: readonly E.Either<string, string>[],
b: readonly E.Either<string, string>[]
) => {
if (b.length == 2) {
const b0 = getWaitFromMapper1Message(E.isRight(b[0]) ? b[0].right : b[0].left)
const b1 = getWaitFromMapper1Message(E.isRight(b[1]) ? b[1].right : b[1].left)
if (b0 > b1) {
const result = [...a, b[0]]
return result
} else {
const result = [...a, b[1]]
return result
}
} else {
const result = [...a, ...b]
return result
}
}
const map_E_flatten_array =
(
flatten: (
a: readonly E.Either<string, string>[],
b: readonly E.Either<string, string>[]
) => readonly E.Either<string, string>[]
) =>
(messageArrayArray: readonly (readonly E.Either<string, string>[])[]) => {
const converted_message = pipe(messageArrayArray, RA.reduce([], flatten))
return converted_message
}
const throwError = (abortController: AbortController) => (error: unknown) => {
const message = error && typeof error === 'object' && 'message' in error ? error.message : error
if (abortController.signal.aborted) {
throw message
} else {
return String(message)
}
}
const mapper1_throwError =
(topIndex: O.Option<number>) => (abortController: AbortController) => (site: string) => {
if (abortController.signal.aborted) {
checkLog(`${site} - aborted`)
return TE.left(`${site} - aborted`)
}
const abortEvent = () => {
checkLog(`${site} - aborted - from event`)
}
abortController.signal.addEventListener('abort', abortEvent)
function removeEvent() {
abortController.signal.removeEventListener('abort', abortEvent)
}
const abortAction = () => {
removeEvent()
abortController.abort()
}
const action = pipe(
topIndex,
O.match(
() =>
pipe(
TE.tryCatch(
async () => pipe(site, getLastIndex(abortAction), getWaitNumber, do_sleep1(site)),
(error) => throwError(abortController)(error)
),
TE.flattenW
),
(topIndex) =>
pipe(
TE.tryCatch(
async () => {
const lastIndex = getLastIndex(abortAction)(site)
checkIndex(abortAction)(site, topIndex, lastIndex)
const action = pipe(lastIndex, getWaitNumber, do_sleep1(site))
return action
},
(error) => throwError(abortController)(error)
),
TE.flattenW
)
)
)
const actionWithRemoveEvent = pipe(
action,
TE.fold(
(error) => {
removeEvent()
return TE.left(error)
},
(result) => {
removeEvent()
return TE.right(result)
}
)
)
return actionWithRemoveEvent
}
function action2_and_1(
abortController: AbortController,
action2: TE.TaskEither<string, (opWait: number) => string>,
removeEvent: Function,
action1: () => T.Task<readonly E.Either<string, string>[]>,
op: (a: number, b: number) => number
) {
const action = pipe(
action2,
TE.foldW(
(error2) => {
removeEvent()
return TE.left(throwError(abortController)(error2))
},
(message2Function) => async () => {
const resultArray1 = await action1()()
const convertedResultArray1 = pipe(
resultArray1,
RA.map((result1) =>
pipe(
result1,
E.fold(
(error1) => E.right(getWaitFromMapper1Message(error1)),
(message1) => E.right(getWaitFromMapper1Message(message1))
)
)
)
)
const separateResult = pipe(convertedResultArray1, RA.separate)
const opWait1 = pipe(separateResult.right, RA.reduce(0, op))
removeEvent()
return E.right(message2Function(opWait1))
}
)
)
return action
}
const mapper2_limit2 = (abortController: AbortController) => (sites: string[]) => {
const site = sites[0]
const topIndex = getTopIndex(abortController.abort)(site)
const abortEvent = () => {
checkLog(`mapper2-index = ${topIndex} - aborted - from event`)
}
abortController.signal.addEventListener('abort', abortEvent)
function removeEvent() {
abortController.signal.removeEventListener('abort', abortEvent)
}
const abortAction = () => {
removeEvent()
abortController.abort()
}
if (abortController.signal.aborted) {
checkLog(`mapper2-index = ${topIndex} - aborted`)
removeEvent()
return TE.left(`mapper2-index = ${topIndex} - aborted`)
}
const action2 = pipe(
TE.tryCatch(
async () => pipe(topIndex, getWaitNumber, do_sleep2(site, abortAction)),
(error) => throwError(abortController)(error)
),
TE.flattenW
)
const limit = 2
const chunkSites = pipe(sites, RA.chunksOf(limit))
const action = action2_and_1(
abortController,
action2,
removeEvent,
() => {
const action1 = pipe(
chunkSites,
RA.map(RA.traverse(T.ApplicativePar)(mapper1_throwError(O.some(topIndex))(abortController)))
)
const flatAction1 = pipe(
action1,
RA.sequence(T.ApplicativeSeq),
T.map(map_E_flatten_array(flatternEitherStringArray))
)
return flatAction1
},
sum
)
return action
}
describe('fp-ts 非同期処理2つの配列', () => {
test('使い方5ー全部同時実行からの最大2個まで同時実行', testOptions, async () => {
const sites = [
// ['https://test/1/1'],
// ['https://test/2/1', 'https://test/2/2']
// ['https://test/3/1', 'https://test/3/2', 'https://test/3/3']
// ['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
// [
// 'https://test/5/1',
// 'https://test/5/2',
// 'https://test/5/3',
// 'https://test/5/4',
// 'https://test/5/5'
// ]
[
'https://test/7/1',
'https://test/7/2',
'https://test/7/3',
'https://test/7/4',
'https://test/7/5',
'https://test/7/6',
'https://test/7/7'
]
]
const abortController = new AbortController()
checkLog('[start]')
try {
const action = pipe(sites, RA.traverse(T.ApplicativePar)(mapper2_limit2(abortController)))
const result = await action()
const separateResult = pipe(result, RA.separate)
console.log(separateResult)
} catch (error) {
console.log(error)
}
})
})
実行結果
time : 2025-01-27T15:37:31.223Z - text: [start]
time : 2025-01-27T15:37:38.242Z - text: mapper2-index = 7
time : 2025-01-27T15:37:39.254Z - text: https://test/7/1
time : 2025-01-27T15:37:40.253Z - text: https://test/7/2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T15:37:40.253Z - text: https://test/7/4 - abort - (topIndex + lastIndex) >= 10
time : 2025-01-27T15:37:40.255Z - text: mapper2-index = 7 - aborted - from event
time : 2025-01-27T15:37:40.255Z - text: https://test/7/3 - aborted - from event
time : 2025-01-27T15:37:40.255Z - text: https://test/7/5 - aborted - from event
time : 2025-01-27T15:37:40.255Z - text: https://test/7/6 - aborted - from event
time : 2025-01-27T15:37:40.255Z - text: https://test/7/7 - aborted - from event
time : 2025-01-27T15:37:40.256Z - text: https://test/7/3 - abort - (topIndex + lastIndex) >= 10
https://test/7/4 - abort - (topIndex + lastIndex) >= 10
実行結果(※中断データを除外した場合)
time : 2025-01-27T15:37:54.906Z - text: [start]
time : 2025-01-27T15:38:01.911Z - text: mapper2-index = 7
time : 2025-01-27T15:38:02.931Z - text: https://test/7/1
time : 2025-01-27T15:38:03.930Z - text: https://test/7/2 - ng - 2000 - Sleep NG after wait
{ left: [], right: [ 'https://test/7 - 9000' ] }
配列が2つの場合(複雑)
全部同時実行からの全部同時実行
import { describe, test } from 'vitest'
import * as TE from 'fp-ts/TaskEither'
import { pipe } from 'fp-ts/lib/function'
import * as RA from 'fp-ts/ReadonlyArray'
import * as E from 'fp-ts/Either'
import * as O from 'fp-ts/Option'
import * as T from 'fp-ts/Task'
const sleep = (wait: number) =>
new Promise((resolve, reject) => {
const waitSeconds = wait / 1000
if (waitSeconds % 3 === 0) {
reject('Sleep NG before wait')
} else if (waitSeconds % 2 === 0) {
setTimeout(() => reject(`${wait} - Sleep NG after wait`), wait)
} else {
setTimeout(() => resolve(wait), wait)
}
}) as Promise<number>
function checkLog(text: string) {
console.log(`time : ${new Date().toISOString()} - text: ${text}`)
}
const testOptions = {
timeout: 1000 * 100
}
const getLastIndex = (abortAction: Function) => (site: string) => {
const index = site.slice(-1)
if (Number.isNaN(Number(index))) {
checkLog(`${site} - abort - LastIndex is NaN`)
abortAction()
throw new Error(`${site} - abort - LastIndex is NaN`)
}
if (site[site.length - 2] !== '/') {
checkLog(`${site} - abort - LastIndex >= 10`)
abortAction()
throw new Error(`${site} - abort - LastIndex >= 10`)
}
return Number(index)
}
const getTopIndex = (abortAction: Function) => (site: string) => {
const index = site.substring(site.length - 3, site.length - 2)
if (Number.isNaN(Number(index))) {
checkLog(`${site} - abort - TopIndex is NaN`)
abortAction()
throw new Error(`${site} - abort - TopIndex is NaN`)
}
return Number(index)
}
const checkIndex =
(abortAction: Function) => (site: string, topIndex: number, lastIndex: number) => {
if (topIndex + lastIndex >= 10) {
checkLog(`${site} - abort - (topIndex + lastIndex) >= 10`)
abortAction()
throw new Error(`${site} - abort - (topIndex + lastIndex) >= 10`)
}
}
function getWaitNumber(index: number) {
const wait = index * 1000
return wait
}
const do_sleep1 = (site: string) => (wait: number) =>
TE.tryCatch(
() =>
sleep(wait).then((_wait) => {
checkLog(site)
return `${site} - ok - ${_wait}`
}),
(error) => {
checkLog(`${site} - ng - ${error}`)
return `${site} - ng - ${error}`
}
)
function getWaitFromMapper1Message(mapper1Message: string) {
const mapper1OkWaitString = mapper1Message.substring(mapper1Message.indexOf('ok') + 5)
const mapper1OkWait = Number(mapper1OkWaitString)
if (isNaN(mapper1OkWait)) {
const mapper1NgWaitString = mapper1Message.substring(mapper1Message.indexOf('ng') + 5)
const mapper1NgWaitStringWithoutErrorMessage = mapper1NgWaitString.substring(
0,
mapper1NgWaitString.indexOf('-')
)
const mapper1NgWait = Number(mapper1NgWaitStringWithoutErrorMessage)
return isNaN(mapper1NgWait) ? 0 : mapper1NgWait
} else {
return mapper1OkWait
}
}
const do_sleep2 = (site: string, abortAction: Function) => (wait: number) =>
TE.tryCatch(
() =>
sleep(wait).then((_wait) => {
const index = getTopIndex(abortAction)(site)
checkLog(`mapper2-index = ${index}`)
return (opWait: number) => `${site.substring(0, site.length - 2)} - ${_wait + opWait}`
}),
(error) => {
const index = getTopIndex(abortAction)(site)
checkLog(`mapper2-index = ${index} - ng - ${error}`)
return `${site.substring(0, site.length - 2)} - ng - ${error}`
}
)
const throwError = (abortController: AbortController) => (error: unknown) => {
const message = error && typeof error === 'object' && 'message' in error ? error.message : error
if (abortController.signal.aborted) {
throw message
} else {
return String(message)
}
}
const mapper1_throwError =
(topIndex: O.Option<number>) => (abortController: AbortController) => (site: string) => {
if (abortController.signal.aborted) {
checkLog(`${site} - aborted`)
return TE.left(`${site} - aborted`)
}
const abortEvent = () => {
checkLog(`${site} - aborted - from event`)
}
abortController.signal.addEventListener('abort', abortEvent)
function removeEvent() {
abortController.signal.removeEventListener('abort', abortEvent)
}
const abortAction = () => {
removeEvent()
abortController.abort()
}
const action = pipe(
topIndex,
O.match(
() =>
pipe(
TE.tryCatch(
async () => pipe(site, getLastIndex(abortAction), getWaitNumber, do_sleep1(site)),
(error) => throwError(abortController)(error)
),
TE.flattenW
),
(topIndex) =>
pipe(
TE.tryCatch(
async () => {
const lastIndex = getLastIndex(abortAction)(site)
checkIndex(abortAction)(site, topIndex, lastIndex)
const action = pipe(lastIndex, getWaitNumber, do_sleep1(site))
return action
},
(error) => throwError(abortController)(error)
),
TE.flattenW
)
)
)
const actionWithRemoveEvent = pipe(
action,
TE.fold(
(error) => {
removeEvent()
return TE.left(error)
},
(result) => {
removeEvent()
return TE.right(result)
}
)
)
return actionWithRemoveEvent
}
function action2_and_1(
abortController: AbortController,
action2: TE.TaskEither<string, (opWait: number) => string>,
removeEvent: Function,
action1: () => T.Task<readonly E.Either<string, string>[]>,
op: (a: number, b: number) => number
) {
const action = pipe(
action2,
TE.foldW(
(error2) => {
removeEvent()
return TE.left(throwError(abortController)(error2))
},
(message2Function) => async () => {
const resultArray1 = await action1()()
const convertedResultArray1 = pipe(
resultArray1,
RA.map((result1) =>
pipe(
result1,
E.fold(
(error1) => E.right(getWaitFromMapper1Message(error1)),
(message1) => E.right(getWaitFromMapper1Message(message1))
)
)
)
)
const separateResult = pipe(convertedResultArray1, RA.separate)
const opWait1 = pipe(separateResult.right, RA.reduce(0, op))
removeEvent()
return E.right(message2Function(opWait1))
}
)
)
return action
}
const mapper2_par = (abortController: AbortController) => (sites: string[]) => {
const site = sites[0]
const topIndex = getTopIndex(abortController.abort)(site)
if (abortController.signal.aborted) {
checkLog(`mapper2-index = ${topIndex} - aborted`)
return TE.left(`mapper2-index = ${topIndex} - aborted`)
}
const abortEvent = () => {
checkLog(`mapper2-index = ${topIndex} - aborted - from event`)
}
abortController.signal.addEventListener('abort', abortEvent)
function removeEvent() {
abortController.signal.removeEventListener('abort', abortEvent)
}
const abortAction = () => {
removeEvent()
abortController.abort()
}
const action2 = pipe(
TE.tryCatch(
async () => pipe(topIndex, getWaitNumber, do_sleep2(site, abortAction)),
(error) => throwError(abortController)(error)
),
TE.flattenW
)
const action = action2_and_1(
abortController,
action2,
removeEvent,
() =>
pipe(
sites,
RA.traverse(T.ApplicativePar)(mapper1_throwError(O.some(topIndex))(abortController))
),
Math.max
)
return pipe(
action as any,
TE.fold(
(error) => {
removeEvent()
return TE.left(error)
},
(result) => {
removeEvent()
return TE.right(result)
}
)
)
}
describe('fp-ts 非同期処理2つの配列', () => {
test('使い方2ー全部同時実行からの全部同時実行', testOptions, async () => {
const sites = [
['https://test/1/1'],
['https://test/2/1', 'https://test/2/2'],
['https://test/3/1', 'https://test/3/2', 'https://test/3/3'],
['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
[
'https://test/5/1',
'https://test/5/2',
'https://test/5/3',
'https://test/5/4',
'https://test/5/5'
],
[
'https://test/7/1',
'https://test/7/2',
'https://test/7/3',
'https://test/7/4',
'https://test/7/5',
'https://test/7/6',
'https://test/7/7'
]
]
const abortController = new AbortController()
checkLog('[start]')
try {
const action = pipe(sites, RA.traverse(T.ApplicativePar)(mapper2_par(abortController)))
const result = await action()
const separateResult = pipe(result, RA.separate)
console.log(separateResult)
} catch (error) {
console.log(error)
}
})
})
実行結果
time : 2025-01-27T13:50:16.315Z - text: [start]
time : 2025-01-27T13:50:16.318Z - text: mapper2-index = 3 - ng - Sleep NG before wait
time : 2025-01-27T13:50:17.327Z - text: mapper2-index = 1
time : 2025-01-27T13:50:18.330Z - text: mapper2-index = 2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T13:50:18.331Z - text: https://test/1/1
time : 2025-01-27T13:50:20.333Z - text: mapper2-index = 4 - ng - 4000 - Sleep NG after wait
time : 2025-01-27T13:50:21.325Z - text: mapper2-index = 5
time : 2025-01-27T13:50:21.326Z - text: https://test/5/5 - abort - (topIndex + lastIndex) >= 10
time : 2025-01-27T13:50:21.327Z - text: mapper2-index = 5 - aborted - from event
time : 2025-01-27T13:50:21.327Z - text: mapper2-index = 7 - aborted - from event
time : 2025-01-27T13:50:21.327Z - text: https://test/5/1 - aborted - from event
time : 2025-01-27T13:50:21.327Z - text: https://test/5/2 - aborted - from event
time : 2025-01-27T13:50:21.327Z - text: https://test/5/3 - aborted - from event
time : 2025-01-27T13:50:21.327Z - text: https://test/5/4 - aborted - from event
time : 2025-01-27T13:50:21.327Z - text: https://test/5/3 - ng - Sleep NG before wait
https://test/5/5 - abort - (topIndex + lastIndex) >= 10
実行結果(※中断データを除外した場合)
time : 2025-01-27T13:59:36.765Z - text: [start]
time : 2025-01-27T13:59:36.770Z - text: mapper2-index = 3 - ng - Sleep NG before wait
time : 2025-01-27T13:59:37.782Z - text: mapper2-index = 1
time : 2025-01-27T13:59:38.784Z - text: mapper2-index = 2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T13:59:38.787Z - text: https://test/1/1
time : 2025-01-27T13:59:40.773Z - text: mapper2-index = 4 - ng - 4000 - Sleep NG after wait
time : 2025-01-27T13:59:41.779Z - text: mapper2-index = 5
time : 2025-01-27T13:59:41.781Z - text: https://test/5/3 - ng - Sleep NG before wait
time : 2025-01-27T13:59:42.784Z - text: https://test/5/1
time : 2025-01-27T13:59:43.772Z - text: mapper2-index = 7
time : 2025-01-27T13:59:43.787Z - text: https://test/5/2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T13:59:44.774Z - text: https://test/7/1
time : 2025-01-27T13:59:45.776Z - text: https://test/7/2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T13:59:45.791Z - text: https://test/5/4 - ng - 4000 - Sleep NG after wait
{
left: [
'https://test/2 - ng - 2000 - Sleep NG after wait',
'https://test/3 - ng - Sleep NG before wait',
'https://test/4 - ng - 4000 - Sleep NG after wait'
],
right: [
'https://test/1 - 2000',
'https://test/5 - 9000',
'https://test/7 - 9000'
]
}
全部同時実行からの直列実行
import { describe, test } from 'vitest'
import * as TE from 'fp-ts/TaskEither'
import { pipe } from 'fp-ts/lib/function'
import * as RA from 'fp-ts/ReadonlyArray'
import * as E from 'fp-ts/Either'
import * as O from 'fp-ts/Option'
import * as T from 'fp-ts/Task'
const sleep = (wait: number) =>
new Promise((resolve, reject) => {
const waitSeconds = wait / 1000
if (waitSeconds % 3 === 0) {
reject('Sleep NG before wait')
} else if (waitSeconds % 2 === 0) {
setTimeout(() => reject(`${wait} - Sleep NG after wait`), wait)
} else {
setTimeout(() => resolve(wait), wait)
}
}) as Promise<number>
function checkLog(text: string) {
console.log(`time : ${new Date().toISOString()} - text: ${text}`)
}
const testOptions = {
timeout: 1000 * 100
}
function sum(a: number, b: number) {
return a + b
}
const getLastIndex = (abortAction: Function) => (site: string) => {
const index = site.slice(-1)
if (Number.isNaN(Number(index))) {
checkLog(`${site} - abort - LastIndex is NaN`)
abortAction()
throw new Error(`${site} - abort - LastIndex is NaN`)
}
if (site[site.length - 2] !== '/') {
checkLog(`${site} - abort - LastIndex >= 10`)
abortAction()
throw new Error(`${site} - abort - LastIndex >= 10`)
}
return Number(index)
}
const getTopIndex = (abortAction: Function) => (site: string) => {
const index = site.substring(site.length - 3, site.length - 2)
if (Number.isNaN(Number(index))) {
checkLog(`${site} - abort - TopIndex is NaN`)
abortAction()
throw new Error(`${site} - abort - TopIndex is NaN`)
}
return Number(index)
}
const checkIndex =
(abortAction: Function) => (site: string, topIndex: number, lastIndex: number) => {
if (topIndex + lastIndex >= 10) {
checkLog(`${site} - abort - (topIndex + lastIndex) >= 10`)
abortAction()
throw new Error(`${site} - abort - (topIndex + lastIndex) >= 10`)
}
}
function getWaitNumber(index: number) {
const wait = index * 1000
return wait
}
const do_sleep1 = (site: string) => (wait: number) =>
TE.tryCatch(
() =>
sleep(wait).then((_wait) => {
checkLog(site)
return `${site} - ok - ${_wait}`
}),
(error) => {
checkLog(`${site} - ng - ${error}`)
return `${site} - ng - ${error}`
}
)
function getWaitFromMapper1Message(mapper1Message: string) {
const mapper1OkWaitString = mapper1Message.substring(mapper1Message.indexOf('ok') + 5)
const mapper1OkWait = Number(mapper1OkWaitString)
if (isNaN(mapper1OkWait)) {
const mapper1NgWaitString = mapper1Message.substring(mapper1Message.indexOf('ng') + 5)
const mapper1NgWaitStringWithoutErrorMessage = mapper1NgWaitString.substring(
0,
mapper1NgWaitString.indexOf('-')
)
const mapper1NgWait = Number(mapper1NgWaitStringWithoutErrorMessage)
return isNaN(mapper1NgWait) ? 0 : mapper1NgWait
} else {
return mapper1OkWait
}
}
const do_sleep2 = (site: string, abortAction: Function) => (wait: number) =>
TE.tryCatch(
() =>
sleep(wait).then((_wait) => {
const index = getTopIndex(abortAction)(site)
checkLog(`mapper2-index = ${index}`)
return (opWait: number) => `${site.substring(0, site.length - 2)} - ${_wait + opWait}`
}),
(error) => {
const index = getTopIndex(abortAction)(site)
checkLog(`mapper2-index = ${index} - ng - ${error}`)
return `${site.substring(0, site.length - 2)} - ng - ${error}`
}
)
const throwError = (abortController: AbortController) => (error: unknown) => {
const message = error && typeof error === 'object' && 'message' in error ? error.message : error
if (abortController.signal.aborted) {
throw message
} else {
return String(message)
}
}
const mapper1_throwError =
(topIndex: O.Option<number>) => (abortController: AbortController) => (site: string) => {
if (abortController.signal.aborted) {
checkLog(`${site} - aborted`)
return TE.left(`${site} - aborted`)
}
const abortEvent = () => {
checkLog(`${site} - aborted - from event`)
}
abortController.signal.addEventListener('abort', abortEvent)
function removeEvent() {
abortController.signal.removeEventListener('abort', abortEvent)
}
const abortAction = () => {
removeEvent()
abortController.abort()
}
const action = pipe(
topIndex,
O.match(
() =>
pipe(
TE.tryCatch(
async () => pipe(site, getLastIndex(abortAction), getWaitNumber, do_sleep1(site)),
(error) => throwError(abortController)(error)
),
TE.flattenW
),
(topIndex) =>
pipe(
TE.tryCatch(
async () => {
const lastIndex = getLastIndex(abortAction)(site)
checkIndex(abortAction)(site, topIndex, lastIndex)
const action = pipe(lastIndex, getWaitNumber, do_sleep1(site))
return action
},
(error) => throwError(abortController)(error)
),
TE.flattenW
)
)
)
const actionWithRemoveEvent = pipe(
action,
TE.fold(
(error) => {
removeEvent()
return TE.left(error)
},
(result) => {
removeEvent()
return TE.right(result)
}
)
)
return actionWithRemoveEvent
}
function action2_and_1(
abortController: AbortController,
action2: TE.TaskEither<string, (opWait: number) => string>,
removeEvent: Function,
action1: () => T.Task<readonly E.Either<string, string>[]>,
op: (a: number, b: number) => number
) {
const action = pipe(
action2,
TE.foldW(
(error2) => {
removeEvent()
return TE.left(throwError(abortController)(error2))
},
(message2Function) => async () => {
const resultArray1 = await action1()()
const convertedResultArray1 = pipe(
resultArray1,
RA.map((result1) =>
pipe(
result1,
E.fold(
(error1) => E.right(getWaitFromMapper1Message(error1)),
(message1) => E.right(getWaitFromMapper1Message(message1))
)
)
)
)
const separateResult = pipe(convertedResultArray1, RA.separate)
const opWait1 = pipe(separateResult.right, RA.reduce(0, op))
removeEvent()
return E.right(message2Function(opWait1))
}
)
)
return action
}
const mapper2_seq = (abortController: AbortController) => (sites: string[]) => {
const site = sites[0]
const topIndex = getTopIndex(abortController.abort)(site)
const abortEvent = () => {
checkLog(`mapper2-index = ${topIndex} - aborted - from event`)
}
abortController.signal.addEventListener('abort', abortEvent)
function removeEvent() {
abortController.signal.removeEventListener('abort', abortEvent)
}
const abortAction = () => {
removeEvent()
abortController.abort()
}
if (abortController.signal.aborted) {
checkLog(`mapper2-index = ${topIndex} - aborted`)
removeEvent()
return TE.left(`mapper2-index = ${topIndex} - aborted`)
}
const action2 = pipe(
TE.tryCatch(
async () => pipe(topIndex, getWaitNumber, do_sleep2(site, abortAction)),
(error) => throwError(abortController)(error)
),
TE.flattenW
)
const action = action2_and_1(
abortController,
action2,
removeEvent,
() =>
pipe(
sites,
RA.traverse(T.ApplicativeSeq)(mapper1_throwError(O.some(topIndex))(abortController))
),
sum
)
return action
}
describe('fp-ts 非同期処理2つの配列', () => {
test('使い方4ー全部同時実行からの直列実行', testOptions, async () => {
const sites = [
['https://test/1/1'],
['https://test/2/1', 'https://test/2/2'],
['https://test/3/1', 'https://test/3/2', 'https://test/3/3'],
['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
[
'https://test/5/1',
'https://test/5/2',
'https://test/5/3',
'https://test/5/4',
'https://test/5/5'
],
[
'https://test/7/1',
'https://test/7/2',
'https://test/7/3',
'https://test/7/4',
'https://test/7/5',
'https://test/7/6',
'https://test/7/7'
]
]
const abortController = new AbortController()
checkLog('[start]')
try {
const action = pipe(sites, RA.traverse(T.ApplicativePar)(mapper2_seq(abortController)))
const result = await action()
const separateResult = pipe(result, RA.separate)
console.log(separateResult)
} catch (error) {
console.log(error)
}
})
})
実行結果
time : 2025-01-27T14:02:06.786Z - text: [start]
time : 2025-01-27T14:02:06.790Z - text: mapper2-index = 3 - ng - Sleep NG before wait
time : 2025-01-27T14:02:07.792Z - text: mapper2-index = 1
time : 2025-01-27T14:02:08.794Z - text: mapper2-index = 2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T14:02:08.795Z - text: https://test/1/1
time : 2025-01-27T14:02:10.799Z - text: mapper2-index = 4 - ng - 4000 - Sleep NG after wait
time : 2025-01-27T14:02:11.795Z - text: mapper2-index = 5
time : 2025-01-27T14:02:12.812Z - text: https://test/5/1
time : 2025-01-27T14:02:13.797Z - text: mapper2-index = 7
time : 2025-01-27T14:02:14.803Z - text: https://test/7/1
time : 2025-01-27T14:02:14.817Z - text: https://test/5/2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T14:02:14.818Z - text: https://test/5/3 - ng - Sleep NG before wait
time : 2025-01-27T14:02:16.810Z - text: https://test/7/2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T14:02:16.811Z - text: https://test/7/3 - abort - (topIndex + lastIndex) >= 10
time : 2025-01-27T14:02:16.814Z - text: mapper2-index = 5 - aborted - from event
time : 2025-01-27T14:02:16.815Z - text: mapper2-index = 7 - aborted - from event
time : 2025-01-27T14:02:16.815Z - text: https://test/5/4 - aborted - from event
time : 2025-01-27T14:02:16.815Z - text: https://test/5/5 - aborted - from event
time : 2025-01-27T14:02:16.815Z - text: https://test/7/4 - aborted - from event
time : 2025-01-27T14:02:16.816Z - text: https://test/7/5 - aborted - from event
time : 2025-01-27T14:02:16.816Z - text: https://test/7/6 - aborted - from event
time : 2025-01-27T14:02:16.816Z - text: https://test/7/7 - aborted - from event
https://test/7/3 - abort - (topIndex + lastIndex) >= 10
実行結果(※中断データを除外した場合)
time : 2025-01-27T14:06:20.077Z - text: [start]
time : 2025-01-27T14:06:20.081Z - text: mapper2-index = 3 - ng - Sleep NG before wait
time : 2025-01-27T14:06:21.091Z - text: mapper2-index = 1
time : 2025-01-27T14:06:22.083Z - text: mapper2-index = 2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T14:06:22.099Z - text: https://test/1/1
time : 2025-01-27T14:06:24.091Z - text: mapper2-index = 4 - ng - 4000 - Sleep NG after wait
time : 2025-01-27T14:06:25.093Z - text: mapper2-index = 5
time : 2025-01-27T14:06:26.098Z - text: https://test/5/1
time : 2025-01-27T14:06:27.086Z - text: mapper2-index = 7
time : 2025-01-27T14:06:28.095Z - text: https://test/7/1
time : 2025-01-27T14:06:28.098Z - text: https://test/5/2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T14:06:28.099Z - text: https://test/5/3 - ng - Sleep NG before wait
time : 2025-01-27T14:06:30.104Z - text: https://test/7/2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T14:06:32.110Z - text: https://test/5/4 - ng - 4000 - Sleep NG after wait
{
left: [
'https://test/2 - ng - 2000 - Sleep NG after wait',
'https://test/3 - ng - Sleep NG before wait',
'https://test/4 - ng - 4000 - Sleep NG after wait'
],
right: [
'https://test/1 - 2000',
'https://test/5 - 12000',
'https://test/7 - 10000'
]
}
全部同時実行からの最大2個まで同時実行
import { describe, test } from 'vitest'
import * as TE from 'fp-ts/TaskEither'
import { pipe } from 'fp-ts/lib/function'
import * as RA from 'fp-ts/ReadonlyArray'
import * as E from 'fp-ts/Either'
import * as O from 'fp-ts/Option'
import * as T from 'fp-ts/Task'
const sleep = (wait: number) =>
new Promise((resolve, reject) => {
const waitSeconds = wait / 1000
if (waitSeconds % 3 === 0) {
reject('Sleep NG before wait')
} else if (waitSeconds % 2 === 0) {
setTimeout(() => reject(`${wait} - Sleep NG after wait`), wait)
} else {
setTimeout(() => resolve(wait), wait)
}
}) as Promise<number>
function checkLog(text: string) {
console.log(`time : ${new Date().toISOString()} - text: ${text}`)
}
const testOptions = {
timeout: 1000 * 100
}
function sum(a: number, b: number) {
return a + b
}
const getLastIndex = (abortAction: Function) => (site: string) => {
const index = site.slice(-1)
if (Number.isNaN(Number(index))) {
checkLog(`${site} - abort - LastIndex is NaN`)
abortAction()
throw new Error(`${site} - abort - LastIndex is NaN`)
}
if (site[site.length - 2] !== '/') {
checkLog(`${site} - abort - LastIndex >= 10`)
abortAction()
throw new Error(`${site} - abort - LastIndex >= 10`)
}
return Number(index)
}
const getTopIndex = (abortAction: Function) => (site: string) => {
const index = site.substring(site.length - 3, site.length - 2)
if (Number.isNaN(Number(index))) {
checkLog(`${site} - abort - TopIndex is NaN`)
abortAction()
throw new Error(`${site} - abort - TopIndex is NaN`)
}
return Number(index)
}
const checkIndex =
(abortAction: Function) => (site: string, topIndex: number, lastIndex: number) => {
if (topIndex + lastIndex >= 10) {
checkLog(`${site} - abort - (topIndex + lastIndex) >= 10`)
abortAction()
throw new Error(`${site} - abort - (topIndex + lastIndex) >= 10`)
}
}
function getWaitNumber(index: number) {
const wait = index * 1000
return wait
}
const do_sleep1 = (site: string) => (wait: number) =>
TE.tryCatch(
() =>
sleep(wait).then((_wait) => {
checkLog(site)
return `${site} - ok - ${_wait}`
}),
(error) => {
checkLog(`${site} - ng - ${error}`)
return `${site} - ng - ${error}`
}
)
function getWaitFromMapper1Message(mapper1Message: string) {
const mapper1OkWaitString = mapper1Message.substring(mapper1Message.indexOf('ok') + 5)
const mapper1OkWait = Number(mapper1OkWaitString)
if (isNaN(mapper1OkWait)) {
const mapper1NgWaitString = mapper1Message.substring(mapper1Message.indexOf('ng') + 5)
const mapper1NgWaitStringWithoutErrorMessage = mapper1NgWaitString.substring(
0,
mapper1NgWaitString.indexOf('-')
)
const mapper1NgWait = Number(mapper1NgWaitStringWithoutErrorMessage)
return isNaN(mapper1NgWait) ? 0 : mapper1NgWait
} else {
return mapper1OkWait
}
}
const do_sleep2 = (site: string, abortAction: Function) => (wait: number) =>
TE.tryCatch(
() =>
sleep(wait).then((_wait) => {
const index = getTopIndex(abortAction)(site)
checkLog(`mapper2-index = ${index}`)
return (opWait: number) => `${site.substring(0, site.length - 2)} - ${_wait + opWait}`
}),
(error) => {
const index = getTopIndex(abortAction)(site)
checkLog(`mapper2-index = ${index} - ng - ${error}`)
return `${site.substring(0, site.length - 2)} - ng - ${error}`
}
)
const flatternEitherStringArray = (
a: readonly E.Either<string, string>[],
b: readonly E.Either<string, string>[]
) => {
if (b.length == 2) {
const b0 = getWaitFromMapper1Message(E.isRight(b[0]) ? b[0].right : b[0].left)
const b1 = getWaitFromMapper1Message(E.isRight(b[1]) ? b[1].right : b[1].left)
if (b0 > b1) {
const result = [...a, b[0]]
return result
} else {
const result = [...a, b[1]]
return result
}
} else {
const result = [...a, ...b]
return result
}
}
const map_E_flatten_array =
(
flatten: (
a: readonly E.Either<string, string>[],
b: readonly E.Either<string, string>[]
) => readonly E.Either<string, string>[]
) =>
(messageArrayArray: readonly (readonly E.Either<string, string>[])[]) => {
const converted_message = pipe(messageArrayArray, RA.reduce([], flatten))
return converted_message
}
const throwError = (abortController: AbortController) => (error: unknown) => {
const message = error && typeof error === 'object' && 'message' in error ? error.message : error
if (abortController.signal.aborted) {
throw message
} else {
return String(message)
}
}
const mapper1_throwError =
(topIndex: O.Option<number>) => (abortController: AbortController) => (site: string) => {
if (abortController.signal.aborted) {
checkLog(`${site} - aborted`)
return TE.left(`${site} - aborted`)
}
const abortEvent = () => {
checkLog(`${site} - aborted - from event`)
}
abortController.signal.addEventListener('abort', abortEvent)
function removeEvent() {
abortController.signal.removeEventListener('abort', abortEvent)
}
const abortAction = () => {
removeEvent()
abortController.abort()
}
const action = pipe(
topIndex,
O.match(
() =>
pipe(
TE.tryCatch(
async () => pipe(site, getLastIndex(abortAction), getWaitNumber, do_sleep1(site)),
(error) => throwError(abortController)(error)
),
TE.flattenW
),
(topIndex) =>
pipe(
TE.tryCatch(
async () => {
const lastIndex = getLastIndex(abortAction)(site)
checkIndex(abortAction)(site, topIndex, lastIndex)
const action = pipe(lastIndex, getWaitNumber, do_sleep1(site))
return action
},
(error) => throwError(abortController)(error)
),
TE.flattenW
)
)
)
const actionWithRemoveEvent = pipe(
action,
TE.fold(
(error) => {
removeEvent()
return TE.left(error)
},
(result) => {
removeEvent()
return TE.right(result)
}
)
)
return actionWithRemoveEvent
}
function action2_and_1(
abortController: AbortController,
action2: TE.TaskEither<string, (opWait: number) => string>,
removeEvent: Function,
action1: () => T.Task<readonly E.Either<string, string>[]>,
op: (a: number, b: number) => number
) {
const action = pipe(
action2,
TE.foldW(
(error2) => {
removeEvent()
return TE.left(throwError(abortController)(error2))
},
(message2Function) => async () => {
const resultArray1 = await action1()()
const convertedResultArray1 = pipe(
resultArray1,
RA.map((result1) =>
pipe(
result1,
E.fold(
(error1) => E.right(getWaitFromMapper1Message(error1)),
(message1) => E.right(getWaitFromMapper1Message(message1))
)
)
)
)
const separateResult = pipe(convertedResultArray1, RA.separate)
const opWait1 = pipe(separateResult.right, RA.reduce(0, op))
removeEvent()
return E.right(message2Function(opWait1))
}
)
)
return action
}
const mapper2_limit2 = (abortController: AbortController) => (sites: string[]) => {
const site = sites[0]
const topIndex = getTopIndex(abortController.abort)(site)
const abortEvent = () => {
checkLog(`mapper2-index = ${topIndex} - aborted - from event`)
}
abortController.signal.addEventListener('abort', abortEvent)
function removeEvent() {
abortController.signal.removeEventListener('abort', abortEvent)
}
const abortAction = () => {
removeEvent()
abortController.abort()
}
if (abortController.signal.aborted) {
checkLog(`mapper2-index = ${topIndex} - aborted`)
removeEvent()
return TE.left(`mapper2-index = ${topIndex} - aborted`)
}
const action2 = pipe(
TE.tryCatch(
async () => pipe(topIndex, getWaitNumber, do_sleep2(site, abortAction)),
(error) => throwError(abortController)(error)
),
TE.flattenW
)
const limit = 2
const chunkSites = pipe(sites, RA.chunksOf(limit))
const action = action2_and_1(
abortController,
action2,
removeEvent,
() => {
const action1 = pipe(
chunkSites,
RA.map(RA.traverse(T.ApplicativePar)(mapper1_throwError(O.some(topIndex))(abortController)))
)
const flatAction1 = pipe(
action1,
RA.sequence(T.ApplicativeSeq),
T.map(map_E_flatten_array(flatternEitherStringArray))
)
return flatAction1
},
sum
)
return action
}
describe('fp-ts 非同期処理2つの配列', () => {
test('使い方6ー全部同時実行からの最大2個まで同時実行', testOptions, async () => {
const sites = [
['https://test/1/1'],
['https://test/2/1', 'https://test/2/2'],
['https://test/3/1', 'https://test/3/2', 'https://test/3/3'],
['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
[
'https://test/5/1',
'https://test/5/2',
'https://test/5/3',
'https://test/5/4',
'https://test/5/5'
],
[
'https://test/7/1',
'https://test/7/2',
'https://test/7/3',
'https://test/7/4',
'https://test/7/5',
'https://test/7/6',
'https://test/7/7'
]
]
const abortController = new AbortController()
checkLog('[start]')
try {
const action = pipe(sites, RA.traverse(T.ApplicativePar)(mapper2_limit2(abortController)))
const result = await action()
const separateResult = pipe(result, RA.separate)
console.log(separateResult)
} catch (error) {
console.log(error)
}
})
})
実行結果
time : 2025-01-27T15:44:10.954Z - text: [start]
time : 2025-01-27T15:44:10.958Z - text: mapper2-index = 3 - ng - Sleep NG before wait
time : 2025-01-27T15:44:11.965Z - text: mapper2-index = 1
time : 2025-01-27T15:44:12.968Z - text: mapper2-index = 2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T15:44:12.969Z - text: https://test/1/1
time : 2025-01-27T15:44:14.962Z - text: mapper2-index = 4 - ng - 4000 - Sleep NG after wait
time : 2025-01-27T15:44:15.966Z - text: mapper2-index = 5
time : 2025-01-27T15:44:16.970Z - text: https://test/5/1
time : 2025-01-27T15:44:17.959Z - text: mapper2-index = 7
time : 2025-01-27T15:44:17.973Z - text: https://test/5/2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T15:44:17.975Z - text: https://test/5/3 - ng - Sleep NG before wait
time : 2025-01-27T15:44:18.977Z - text: https://test/7/1
time : 2025-01-27T15:44:19.966Z - text: https://test/7/2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T15:44:19.967Z - text: https://test/7/4 - abort - (topIndex + lastIndex) >= 10
time : 2025-01-27T15:44:19.971Z - text: mapper2-index = 5 - aborted - from event
time : 2025-01-27T15:44:19.972Z - text: mapper2-index = 7 - aborted - from event
time : 2025-01-27T15:44:19.972Z - text: https://test/5/4 - aborted - from event
time : 2025-01-27T15:44:19.972Z - text: https://test/5/5 - aborted - from event
time : 2025-01-27T15:44:19.972Z - text: https://test/7/3 - aborted - from event
time : 2025-01-27T15:44:19.973Z - text: https://test/7/5 - aborted - from event
time : 2025-01-27T15:44:19.973Z - text: https://test/7/6 - aborted - from event
time : 2025-01-27T15:44:19.973Z - text: https://test/7/7 - aborted - from event
time : 2025-01-27T15:44:19.974Z - text: https://test/7/3 - abort - (topIndex + lastIndex) >= 10
https://test/7/4 - abort - (topIndex + lastIndex) >= 10
実行結果(※中断データを除外した場合)
time : 2025-01-27T15:44:46.209Z - text: [start]
time : 2025-01-27T15:44:46.213Z - text: mapper2-index = 3 - ng - Sleep NG before wait
time : 2025-01-27T15:44:47.226Z - text: mapper2-index = 1
time : 2025-01-27T15:44:48.214Z - text: mapper2-index = 2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T15:44:48.234Z - text: https://test/1/1
time : 2025-01-27T15:44:50.222Z - text: mapper2-index = 4 - ng - 4000 - Sleep NG after wait
time : 2025-01-27T15:44:51.222Z - text: mapper2-index = 5
time : 2025-01-27T15:44:52.238Z - text: https://test/5/1
time : 2025-01-27T15:44:53.215Z - text: mapper2-index = 7
time : 2025-01-27T15:44:53.229Z - text: https://test/5/2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T15:44:53.232Z - text: https://test/5/3 - ng - Sleep NG before wait
time : 2025-01-27T15:44:54.226Z - text: https://test/7/1
time : 2025-01-27T15:44:55.232Z - text: https://test/7/2 - ng - 2000 - Sleep NG after wait
time : 2025-01-27T15:44:57.233Z - text: https://test/5/4 - ng - 4000 - Sleep NG after wait
{
left: [
'https://test/2 - ng - 2000 - Sleep NG after wait',
'https://test/3 - ng - Sleep NG before wait',
'https://test/4 - ng - 4000 - Sleep NG after wait'
],
right: [
'https://test/1 - 2000',
'https://test/5 - 11000',
'https://test/7 - 9000'
]
}