配列の配列の非同期処理で対象API(以下ではsleep)がabortする場合のメモ
前回
配列が1つの場合
全部同時実行
import { describe, test } from 'vitest'
import { Effect, pipe, Either, Exit, Cause } from 'effect'
import { UnknownException } from 'effect/Cause'
class SleepNGBeforeError {
static tagName = 'Sleep.Ng.Before'
readonly _tag = SleepNGBeforeError.tagName
static isClass(test: any): test is SleepNGBeforeError {
return test._tag === SleepNGBeforeError.tagName
}
getMessage(site: string) {
return `${site} - ng - Sleep NG before wait`
}
}
class SleepNGAfterError {
static tagName = 'Sleep.Ng.After'
readonly _tag = SleepNGAfterError.tagName
wait: number
constructor(w: number) {
this.wait = w
}
static isClass(test: any): test is SleepNGAfterError {
return test._tag === SleepNGAfterError.tagName
}
getMessage(site: string) {
return `${site} - ng - ${this.wait} - Sleep NG after wait`
}
}
class AlreadyAbortError {
static tagName = 'Already.Abort.Error'
readonly _tag = AlreadyAbortError.tagName
static isClass(test: any): test is AlreadyAbortError {
return test._tag === AlreadyAbortError.tagName
}
static getMessage(site: string) {
return `${site} - aborted - from event`
}
}
class OriginAbortError {
static tagName = 'Origin.Abort.Error'
readonly _tag = OriginAbortError.tagName
site: string
reason: string
constructor(s: string, r: string) {
this.site = s
this.reason = r
}
static isClass(test: any): test is OriginAbortError {
return test._tag === OriginAbortError.tagName
}
getMessage() {
return `${this.site} - abort - ${this.reason}`
}
static siteCheck(abortAction: Function, topIndex?: number) {
return (site: string) => {
const lastIndex = OkData.getLastIndex(site)
let originAbortError = undefined
if (Number.isNaN(lastIndex)) {
originAbortError = new OriginAbortError(site, `${lastIndex} is NaN`)
} else if (lastIndex === undefined) {
originAbortError = new OriginAbortError(site, `${lastIndex} is undefined`)
} else if (site[site.length - 2] !== '/') {
originAbortError = new OriginAbortError(site, `lastIndex >= 10`)
} else if (topIndex && topIndex + lastIndex >= 10) {
originAbortError = new OriginAbortError(site, `topIndex + lastIndex >= 10`)
}
if (originAbortError !== undefined) {
Effect.runPromise(checkLog(originAbortError.getMessage()))
abortAction()
return Effect.fail(originAbortError)
}
return Effect.succeed(site)
}
}
}
class OkData {
wait: number
constructor(w: number) {
this.wait = w
}
static isClass(test: any): test is OkData {
return test instanceof OkData
}
static getLastIndex(site: string) {
const index = site.slice(-1)
return Number(index)
}
getMessage(site: string) {
return `${site} - ok - ${this.wait}`
}
}
const sleep = (wait: number) =>
Effect.tryPromise<OkData>(
() =>
new Promise((resolve, reject) => {
const waitSeconds = wait / 1000
if (waitSeconds % 3 === 0) {
reject(new SleepNGBeforeError())
} else if (waitSeconds % 2 === 0) {
setTimeout(() => reject(new SleepNGAfterError(wait)), wait)
} else {
setTimeout(() => resolve(new OkData(wait)), wait)
}
})
)
const checkLog = (text: string) =>
Effect.sync(() => console.log(`time : ${new Date().toISOString()} - text: ${text}`))
const testOptions = {
timeout: 1000 * 100
}
function getWaitNumber(index: number) {
const wait = index * 1000
return wait
}
function mapError_sleepNg<T>(
site: string,
effectAndThen: Effect.Effect<T, UnknownException, never>
) {
const effectAll = Effect.mapError(effectAndThen, (err) => {
if (SleepNGBeforeError.isClass(err.error)) {
return err.error
} else if (SleepNGAfterError.isClass(err.error)) {
return err.error
} else {
Effect.runPromise(checkLog(`mapError_sleepNg : UnknownException : ${err}`))
return err
}
})
return effectAll
}
const do_sleep1 = (site: string) => (wait: number) =>
Effect.try(() => {
const sleepEffect = sleep(wait)
const effectAll = mapError_sleepNg(site, sleepEffect)
return effectAll
})
const mapper1_gen = (abortController: AbortController, topIndex?: number) => (site: string) =>
Effect.gen(function* () {
if (abortController.signal.aborted) {
const alreadyAbortError = new AlreadyAbortError()
Effect.runPromise(checkLog(AlreadyAbortError.getMessage(site)))
return Either.left(alreadyAbortError)
}
const abortEvent = () => {
Effect.runPromise(checkLog(AlreadyAbortError.getMessage(site)))
}
abortController.signal.addEventListener('abort', abortEvent)
function removeEvent() {
abortController.signal.removeEventListener('abort', abortEvent)
}
const abortAction = () => {
removeEvent()
abortController.abort()
}
const abortCheck = yield* pipe(site, OriginAbortError.siteCheck(abortAction, topIndex))
const effect = yield* pipe(abortCheck, OkData.getLastIndex, getWaitNumber, do_sleep1(site))
const either = yield* Effect.either(effect)
if (Either.isRight(either)) {
const okData = either.right
const message = okData.getMessage(site)
Effect.runPromise(checkLog(message))
} else if (Either.isLeft(either)) {
const ngData = either.left
if (SleepNGBeforeError.isClass(ngData)) {
const message = ngData.getMessage(site)
Effect.runPromise(checkLog(message))
} else if (SleepNGAfterError.isClass(ngData)) {
const message = ngData.getMessage(site)
Effect.runPromise(checkLog(message))
}
}
removeEvent()
return either
})
describe('effect-ts 非同期処理1つの配列', () => {
test('使い方1ー全部同時実行', testOptions, async () => {
const sites = [
'https://test/1',
'https://test/2',
'https://test/3',
'https://test/4',
'https://test/5',
'https://test/10'
]
const abortController = new AbortController()
Effect.runPromise(checkLog('[start]'))
const effect = await Effect.all(sites.map(mapper1_gen(abortController)), {
concurrency: sites.length
})
const exit = await Effect.runPromiseExit(effect, { signal: abortController.signal })
console.log(
Exit.match(exit, {
onFailure: (cause) => `中断しました: ${Cause.pretty(cause)}`,
onSuccess: (either) => either
})
)
})
})
実行結果
time : 2025-02-26T13:38:02.447Z - text: [start]
time : 2025-02-26T13:38:02.458Z - text: https://test/10 - abort - lastIndex >= 10
time : 2025-02-26T13:38:02.459Z - text: https://test/1 - aborted - from event
time : 2025-02-26T13:38:02.459Z - text: https://test/2 - aborted - from event
time : 2025-02-26T13:38:02.459Z - text: https://test/3 - aborted - from event
time : 2025-02-26T13:38:02.459Z - text: https://test/4 - aborted - from event
time : 2025-02-26T13:38:02.459Z - text: https://test/5 - aborted - from event
中断しました: All fibers interrupted without errors.
実行結果(※中断データを除外した場合)
time : 2025-02-26T13:38:57.402Z - text: [start]
time : 2025-02-26T13:38:57.415Z - text: https://test/3 - ng - Sleep NG before wait
time : 2025-02-26T13:38:58.429Z - text: https://test/1 - ok - 1000
time : 2025-02-26T13:38:59.423Z - text: https://test/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-26T13:39:01.427Z - text: https://test/4 - ng - 4000 - Sleep NG after wait
time : 2025-02-26T13:39:02.415Z - text: https://test/5 - ok - 5000
[
{ _id: 'Either', _tag: 'Right', right: OkData { wait: 1000 } },
{
_id: 'Either',
_tag: 'Left',
left: SleepNGAfterError { _tag: 'Sleep.Ng.After', wait: 2000 }
},
{
_id: 'Either',
_tag: 'Left',
left: SleepNGBeforeError { _tag: 'Sleep.Ng.Before' }
},
{
_id: 'Either',
_tag: 'Left',
left: SleepNGAfterError { _tag: 'Sleep.Ng.After', wait: 4000 }
},
{ _id: 'Either', _tag: 'Right', right: OkData { wait: 5000 } }
]
最大2個まで同時実行
import { describe, test } from 'vitest'
import { Effect, pipe, Either, Exit, Cause } from 'effect'
import { UnknownException } from 'effect/Cause'
class SleepNGBeforeError {
static tagName = 'Sleep.Ng.Before'
readonly _tag = SleepNGBeforeError.tagName
static isClass(test: any): test is SleepNGBeforeError {
return test._tag === SleepNGBeforeError.tagName
}
getMessage(site: string) {
return `${site} - ng - Sleep NG before wait`
}
}
class SleepNGAfterError {
static tagName = 'Sleep.Ng.After'
readonly _tag = SleepNGAfterError.tagName
wait: number
constructor(w: number) {
this.wait = w
}
static isClass(test: any): test is SleepNGAfterError {
return test._tag === SleepNGAfterError.tagName
}
getMessage(site: string) {
return `${site} - ng - ${this.wait} - Sleep NG after wait`
}
}
class AlreadyAbortError {
static tagName = 'Already.Abort.Error'
readonly _tag = AlreadyAbortError.tagName
static isClass(test: any): test is AlreadyAbortError {
return test._tag === AlreadyAbortError.tagName
}
static getMessage(site: string) {
return `${site} - aborted - from event`
}
}
class OriginAbortError {
static tagName = 'Origin.Abort.Error'
readonly _tag = OriginAbortError.tagName
site: string
reason: string
constructor(s: string, r: string) {
this.site = s
this.reason = r
}
static isClass(test: any): test is OriginAbortError {
return test._tag === OriginAbortError.tagName
}
getMessage() {
return `${this.site} - abort - ${this.reason}`
}
static siteCheck(abortAction: Function, topIndex?: number) {
return (site: string) => {
const lastIndex = OkData.getLastIndex(site)
let originAbortError = undefined
if (Number.isNaN(lastIndex)) {
originAbortError = new OriginAbortError(site, `${lastIndex} is NaN`)
} else if (lastIndex === undefined) {
originAbortError = new OriginAbortError(site, `${lastIndex} is undefined`)
} else if (site[site.length - 2] !== '/') {
originAbortError = new OriginAbortError(site, `lastIndex >= 10`)
} else if (topIndex && topIndex + lastIndex >= 10) {
originAbortError = new OriginAbortError(site, `topIndex + lastIndex >= 10`)
}
if (originAbortError !== undefined) {
Effect.runPromise(checkLog(originAbortError.getMessage()))
abortAction()
return Effect.fail(originAbortError)
}
return Effect.succeed(site)
}
}
}
class OkData {
wait: number
constructor(w: number) {
this.wait = w
}
static isClass(test: any): test is OkData {
return test instanceof OkData
}
static getLastIndex(site: string) {
const index = site.slice(-1)
return Number(index)
}
getMessage(site: string) {
return `${site} - ok - ${this.wait}`
}
}
const sleep = (wait: number) =>
Effect.tryPromise<OkData>(
() =>
new Promise((resolve, reject) => {
const waitSeconds = wait / 1000
if (waitSeconds % 3 === 0) {
reject(new SleepNGBeforeError())
} else if (waitSeconds % 2 === 0) {
setTimeout(() => reject(new SleepNGAfterError(wait)), wait)
} else {
setTimeout(() => resolve(new OkData(wait)), wait)
}
})
)
const checkLog = (text: string) =>
Effect.sync(() => console.log(`time : ${new Date().toISOString()} - text: ${text}`))
const testOptions = {
timeout: 1000 * 100
}
function getWaitNumber(index: number) {
const wait = index * 1000
return wait
}
function mapError_sleepNg<T>(
site: string,
effectAndThen: Effect.Effect<T, UnknownException, never>
) {
const effectAll = Effect.mapError(effectAndThen, (err) => {
if (SleepNGBeforeError.isClass(err.error)) {
return err.error
} else if (SleepNGAfterError.isClass(err.error)) {
return err.error
} else {
Effect.runPromise(checkLog(`mapError_sleepNg : UnknownException : ${err}`))
return err
}
})
return effectAll
}
const do_sleep1 = (site: string) => (wait: number) =>
Effect.try(() => {
const sleepEffect = sleep(wait)
const effectAll = mapError_sleepNg(site, sleepEffect)
return effectAll
})
const mapper1_gen = (abortController: AbortController, topIndex?: number) => (site: string) =>
Effect.gen(function* () {
if (abortController.signal.aborted) {
const alreadyAbortError = new AlreadyAbortError()
Effect.runPromise(checkLog(AlreadyAbortError.getMessage(site)))
return Either.left(alreadyAbortError)
}
const abortEvent = () => {
Effect.runPromise(checkLog(AlreadyAbortError.getMessage(site)))
}
abortController.signal.addEventListener('abort', abortEvent)
function removeEvent() {
abortController.signal.removeEventListener('abort', abortEvent)
}
const abortAction = () => {
removeEvent()
abortController.abort()
}
const abortCheck = yield* pipe(site, OriginAbortError.siteCheck(abortAction, topIndex))
const effect = yield* pipe(abortCheck, OkData.getLastIndex, getWaitNumber, do_sleep1(site))
const either = yield* Effect.either(effect)
if (Either.isRight(either)) {
const okData = either.right
const message = okData.getMessage(site)
Effect.runPromise(checkLog(message))
} else if (Either.isLeft(either)) {
const ngData = either.left
if (SleepNGBeforeError.isClass(ngData)) {
const message = ngData.getMessage(site)
Effect.runPromise(checkLog(message))
} else if (SleepNGAfterError.isClass(ngData)) {
const message = ngData.getMessage(site)
Effect.runPromise(checkLog(message))
}
}
removeEvent()
return either
})
describe('effect-ts 非同期処理1つの配列', () => {
test('使い方2ー最大2個まで同時実行', testOptions, async () => {
const sites = [
'https://test/1',
'https://test/2',
'https://test/3',
'https://test/4',
'https://test/5',
'https://test/10'
]
const abortController = new AbortController()
Effect.runPromise(checkLog('[start]'))
const effect = await Effect.all(sites.map(mapper1_gen(abortController)), {
concurrency: 2
})
const exit = await Effect.runPromiseExit(effect, { signal: abortController.signal })
console.log(
Exit.match(exit, {
onFailure: (cause) => `中断しました: ${Cause.pretty(cause)}`,
onSuccess: (either) => either
})
)
})
})
実行結果
time : 2025-02-26T13:40:15.927Z - text: [start]
time : 2025-02-26T13:40:16.946Z - text: https://test/1 - ok - 1000
time : 2025-02-26T13:40:16.949Z - text: https://test/3 - ng - Sleep NG before wait
time : 2025-02-26T13:40:17.955Z - text: https://test/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-26T13:40:20.951Z - text: https://test/4 - ng - 4000 - Sleep NG after wait
time : 2025-02-26T13:40:20.952Z - text: https://test/10 - abort - lastIndex >= 10
time : 2025-02-26T13:40:20.953Z - text: https://test/5 - aborted - from event
中断しました: All fibers interrupted without errors.
実行結果(※中断データを除外した場合)
time : 2025-02-26T13:41:08.954Z - text: [start]
time : 2025-02-26T13:41:09.967Z - text: https://test/1 - ok - 1000
time : 2025-02-26T13:41:09.972Z - text: https://test/3 - ng - Sleep NG before wait
time : 2025-02-26T13:41:10.970Z - text: https://test/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-26T13:41:13.985Z - text: https://test/4 - ng - 4000 - Sleep NG after wait
time : 2025-02-26T13:41:15.985Z - text: https://test/5 - ok - 5000
[
{ _id: 'Either', _tag: 'Right', right: OkData { wait: 1000 } },
{
_id: 'Either',
_tag: 'Left',
left: SleepNGAfterError { _tag: 'Sleep.Ng.After', wait: 2000 }
},
{
_id: 'Either',
_tag: 'Left',
left: SleepNGBeforeError { _tag: 'Sleep.Ng.Before' }
},
{
_id: 'Either',
_tag: 'Left',
left: SleepNGAfterError { _tag: 'Sleep.Ng.After', wait: 4000 }
},
{ _id: 'Either', _tag: 'Right', right: OkData { wait: 5000 } }
]
配列が2つの場合(シンプル)
全部同時実行からの全部同時実行
import { describe, test } from 'vitest'
import { Effect, pipe, Array, Either, Exit, Cause } from 'effect'
import { UnknownException } from 'effect/Cause'
class SleepNGBeforeError {
static tagName = 'Sleep.Ng.Before'
readonly _tag = SleepNGBeforeError.tagName
static isClass(test: any): test is SleepNGBeforeError {
return test._tag === SleepNGBeforeError.tagName
}
getMessage(site: string) {
return `${site} - ng - Sleep NG before wait`
}
}
class SleepNGAfterError {
static tagName = 'Sleep.Ng.After'
readonly _tag = SleepNGAfterError.tagName
wait: number
constructor(w: number) {
this.wait = w
}
static isClass(test: any): test is SleepNGAfterError {
return test._tag === SleepNGAfterError.tagName
}
getMessage(site: string) {
return `${site} - ng - ${this.wait} - Sleep NG after wait`
}
}
type AllNgData = SleepNGBeforeError | SleepNGAfterError | UnknownException
class TopSleepNGError {
static tagName = 'Top.Sleep.Ng'
readonly _tag = TopSleepNGError.tagName
ngData: AllNgData
constructor(ng: AllNgData) {
this.ngData = ng
}
static isClass(test: any): test is SleepNGAfterError {
return test._tag === SleepNGAfterError.tagName
}
getLastMessage() {
const lastMessage =
this.ngData instanceof UnknownException
? ' - ng - UnknownException'
: this.ngData.getMessage('')
return lastMessage
}
getMessage(site: string) {
const index = TopOkData.getTopIndex(site)
const lastMessage = this.getLastMessage()
return `mapper2-index = ${index}${lastMessage}`
}
getTopMessage(site: string) {
const lastMessage = this.getLastMessage()
return `${site.substring(0, site.length - 2)}${lastMessage}`
}
}
class AlreadyAbortError {
static tagName = 'Already.Abort.Error'
readonly _tag = AlreadyAbortError.tagName
static isClass(test: any): test is AlreadyAbortError {
return test._tag === AlreadyAbortError.tagName
}
static getMessage(site: string) {
return `${site} - aborted - from event`
}
}
class TopAlreadyAbortError {
static tagName = 'Top.Already.Abort.Error'
readonly _tag = TopAlreadyAbortError.tagName
static isClass(test: any): test is TopAlreadyAbortError {
return test._tag === TopAlreadyAbortError.tagName
}
static getMessage(site: string) {
const topIndex = TopOkData.getTopIndex(site)
return `mapper2-index = ${topIndex} - aborted - from event`
}
}
class OriginAbortError {
static tagName = 'Origin.Abort.Error'
readonly _tag = OriginAbortError.tagName
site: string
reason: string
constructor(s: string, r: string) {
this.site = s
this.reason = r
}
static isClass(test: any): test is OriginAbortError {
return test._tag === OriginAbortError.tagName
}
getMessage() {
return `${this.site} - abort - ${this.reason}`
}
static siteCheck(abortAction: Function, topIndex?: number) {
return (site: string) => {
const lastIndex = OkData.getLastIndex(site)
let originAbortError = undefined
if (Number.isNaN(lastIndex)) {
originAbortError = new OriginAbortError(site, `${lastIndex} is NaN`)
} else if (lastIndex === undefined) {
originAbortError = new OriginAbortError(site, `${lastIndex} is undefined`)
} else if (site[site.length - 2] !== '/') {
originAbortError = new OriginAbortError(site, `lastIndex >= 10`)
} else if (topIndex && topIndex + lastIndex >= 10) {
originAbortError = new OriginAbortError(site, `topIndex + lastIndex >= 10`)
}
if (originAbortError !== undefined) {
Effect.runPromise(checkLog(originAbortError.getMessage()))
abortAction()
return Effect.fail(originAbortError)
}
return Effect.succeed(site)
}
}
}
class TopOkData {
site: string
wait: number
index: number
constructor(s: string, ok: OkData) {
this.site = s
this.wait = ok.wait
this.index = TopOkData.getTopIndex(s)
}
static isClass(test: any): test is TopOkData {
return test instanceof TopOkData
}
static getTopIndex(site: string) {
const index = site.substring(site.length - 3, site.length - 2)
return Number(index)
}
static getMessage(site: string) {
const index = TopOkData.getTopIndex(site)
return `mapper2-index = ${index}`
}
getTopMessage(opWait: number) {
return `${this.site.substring(0, this.site.length - 2)} - ok - ${this.wait + opWait}`
}
}
class OkData {
wait: number
constructor(w: number) {
this.wait = w
}
static isClass(test: any): test is OkData {
return test instanceof OkData
}
static getLastIndex(site: string) {
const index = site.slice(-1)
return Number(index)
}
getMessage(site: string) {
return `${site} - ok - ${this.wait}`
}
}
const sleep = (wait: number) =>
Effect.tryPromise<OkData>(
() =>
new Promise((resolve, reject) => {
const waitSeconds = wait / 1000
if (waitSeconds % 3 === 0) {
reject(new SleepNGBeforeError())
} else if (waitSeconds % 2 === 0) {
setTimeout(() => reject(new SleepNGAfterError(wait)), wait)
} else {
setTimeout(() => resolve(new OkData(wait)), wait)
}
})
)
const checkLog = (text: string) =>
Effect.sync(() => console.log(`time : ${new Date().toISOString()} - text: ${text}`))
const testOptions = {
timeout: 1000 * 100
}
const sumNumber = (chunkSize: number) => (a: number[], b: number[]) => {
function addArray(num: number) {
if (a.length === chunkSize) {
const aMin = Math.min(...a)
const aMinIndex = a.indexOf(aMin)
a[aMinIndex] += num
} else {
a.push(num)
}
}
b.forEach((bNum) => addArray(bNum))
return a
}
function getWaitNumber(index: number) {
const wait = index * 1000
return wait
}
function mapError_sleepNg<T>(
site: string,
effectAndThen: Effect.Effect<T, UnknownException, never>
) {
const effectAll = Effect.mapError(effectAndThen, (err) => {
if (SleepNGBeforeError.isClass(err.error)) {
return err.error
} else if (SleepNGAfterError.isClass(err.error)) {
return err.error
} else {
Effect.runPromise(checkLog(`mapError_sleepNg : UnknownException : ${err}`))
return err
}
})
return effectAll
}
function mapNumber_fromData<T, U>(data: Either.Either<T, U>) {
const result = Either.mapBoth(data, {
onLeft: (left) => {
if (OkData.isClass(left)) {
return left.wait
} else if (SleepNGBeforeError.isClass(left)) {
return 0
} else if (SleepNGAfterError.isClass(left)) {
return left.wait
}
return 0
},
onRight: (right) => {
if (OkData.isClass(right)) {
return right.wait
}
return 0
}
})
return Either.isLeft(result) ? result.left : result.right
}
const do_sleep1 = (site: string) => (wait: number) =>
Effect.try(() => {
const sleepEffect = sleep(wait)
const effectAll = mapError_sleepNg(site, sleepEffect)
return effectAll
})
const do_sleep2 = (site: string) => (wait: number) =>
Effect.try(() => {
const sleepEffect = sleep(wait)
const effectAndThen = Effect.andThen(sleepEffect, (okData) => {
const topOkData = new TopOkData(site, okData)
return topOkData
})
const effectAll = mapError_sleepNg(site, effectAndThen)
return effectAll
})
const mapper1_gen = (abortController: AbortController, topIndex?: number) => (site: string) =>
Effect.gen(function* () {
if (abortController.signal.aborted) {
const alreadyAbortError = new AlreadyAbortError()
Effect.runPromise(checkLog(AlreadyAbortError.getMessage(site)))
return Either.left(alreadyAbortError)
}
const abortEvent = () => {
Effect.runPromise(checkLog(AlreadyAbortError.getMessage(site)))
}
abortController.signal.addEventListener('abort', abortEvent)
function removeEvent() {
abortController.signal.removeEventListener('abort', abortEvent)
}
const abortAction = () => {
removeEvent()
abortController.abort()
}
const abortCheck = yield* pipe(site, OriginAbortError.siteCheck(abortAction, topIndex))
const effect = yield* pipe(abortCheck, OkData.getLastIndex, getWaitNumber, do_sleep1(site))
const either = yield* Effect.either(effect)
if (Either.isRight(either)) {
const okData = either.right
const message = okData.getMessage(site)
Effect.runPromise(checkLog(message))
} else if (Either.isLeft(either)) {
const ngData = either.left
if (SleepNGBeforeError.isClass(ngData)) {
const message = ngData.getMessage(site)
Effect.runPromise(checkLog(message))
} else if (SleepNGAfterError.isClass(ngData)) {
const message = ngData.getMessage(site)
Effect.runPromise(checkLog(message))
}
}
removeEvent()
return either
})
const mapper2_gen =
(abortController: AbortController, options: { concurrency: number }) => (sites: string[]) =>
Effect.gen(function* () {
const site = sites[0]
if (abortController.signal.aborted) {
const topAlreadyAbortError = new TopAlreadyAbortError()
Effect.runPromise(checkLog(TopAlreadyAbortError.getMessage(site)))
return Either.left(topAlreadyAbortError)
}
const abortEvent = () => {
Effect.runPromise(checkLog(TopAlreadyAbortError.getMessage(site)))
}
abortController.signal.addEventListener('abort', abortEvent)
function removeEvent() {
abortController.signal.removeEventListener('abort', abortEvent)
}
const effect2 = yield* pipe(site, TopOkData.getTopIndex, getWaitNumber, do_sleep2(site))
const either2 = yield* Effect.either(effect2)
const map = Either.mapBoth(either2, {
onLeft: (ngData) => {
const topNgData = new TopSleepNGError(ngData)
const message = topNgData.getMessage(site)
Effect.runPromise(checkLog(message))
const topMessage = topNgData.getTopMessage(site)
return topMessage
},
onRight: (topOkData) =>
Effect.gen(function* () {
const message = TopOkData.getMessage(site)
Effect.runPromise(checkLog(message))
const either1 = yield* Effect.all(
sites.map(mapper1_gen(abortController, topOkData.index)),
options
)
const arrayEither1 = either1.map(mapNumber_fromData)
const concurrency = options['concurrency']
const chunkArray = Array.chunksOf(arrayEither1, concurrency)
const waitNumber = Array.reduce(chunkArray, [0], sumNumber(concurrency))
const topMessage = topOkData.getTopMessage(Math.max(...waitNumber))
return topMessage
})
})
const result = Either.isLeft(map) ? map : Either.right(yield* map.right)
removeEvent()
return result
})
describe('effect-ts 非同期処理2つの配列', () => {
test('使い方1ー全部同時実行からの全部同時実行', testOptions, async () => {
const sites = [
// ['https://test/1/1'],
// ['https://test/2/1', 'https://test/2/2']
// ['https://test/3/1', 'https://test/3/2', 'https://test/3/3']
// ['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
// [
// 'https://test/5/1',
// 'https://test/5/2',
// 'https://test/5/3',
// 'https://test/5/4',
// 'https://test/5/5'
// ],
[
'https://test/7/1',
'https://test/7/2',
'https://test/7/3',
'https://test/7/4',
'https://test/7/5',
'https://test/7/6',
'https://test/7/7'
]
]
const abortController = new AbortController()
Effect.runPromise(checkLog('[start]'))
const effect = await Effect.all(
sites.map(mapper2_gen(abortController, { concurrency: Number.MAX_VALUE })),
{
concurrency: Number.MAX_VALUE
}
)
const exit = await Effect.runPromiseExit(effect, { signal: abortController.signal })
console.log(
Exit.match(exit, {
onFailure: (cause) => `中断しました: ${Cause.pretty(cause)}`,
onSuccess: (either) => either
})
)
})
})
実行結果
time : 2025-02-26T13:42:47.593Z - text: [start]
time : 2025-02-26T13:42:54.625Z - text: mapper2-index = 7
time : 2025-02-26T13:42:54.638Z - text: https://test/7/3 - abort - topIndex + lastIndex >= 10
time : 2025-02-26T13:42:54.643Z - text: mapper2-index = 7 - aborted - from event
time : 2025-02-26T13:42:54.645Z - text: https://test/7/1 - aborted - from event
time : 2025-02-26T13:42:54.645Z - text: https://test/7/2 - aborted - from event
time : 2025-02-26T13:42:54.651Z - text: https://test/7/4 - aborted - from event
time : 2025-02-26T13:42:54.653Z - text: https://test/7/5 - aborted - from event
time : 2025-02-26T13:42:54.655Z - text: https://test/7/6 - aborted - from event
time : 2025-02-26T13:42:54.655Z - text: https://test/7/7 - aborted - from event
中断しました: All fibers interrupted without errors.
実行結果(※中断データを除外した場合)
time : 2025-02-26T13:44:50.401Z - text: [start]
time : 2025-02-26T13:44:57.427Z - text: mapper2-index = 7
time : 2025-02-26T13:44:58.451Z - text: https://test/7/1 - ok - 1000
time : 2025-02-26T13:44:59.450Z - text: https://test/7/2 - ng - 2000 - Sleep NG after wait
[
{ _id: 'Either', _tag: 'Right', right: 'https://test/7 - ok - 9000' }
]
全部同時実行からの直列実行
import { describe, test } from 'vitest'
import { Effect, pipe, Array, Either, Exit, Cause } from 'effect'
import { UnknownException } from 'effect/Cause'
class SleepNGBeforeError {
static tagName = 'Sleep.Ng.Before'
readonly _tag = SleepNGBeforeError.tagName
static isClass(test: any): test is SleepNGBeforeError {
return test._tag === SleepNGBeforeError.tagName
}
getMessage(site: string) {
return `${site} - ng - Sleep NG before wait`
}
}
class SleepNGAfterError {
static tagName = 'Sleep.Ng.After'
readonly _tag = SleepNGAfterError.tagName
wait: number
constructor(w: number) {
this.wait = w
}
static isClass(test: any): test is SleepNGAfterError {
return test._tag === SleepNGAfterError.tagName
}
getMessage(site: string) {
return `${site} - ng - ${this.wait} - Sleep NG after wait`
}
}
type AllNgData = SleepNGBeforeError | SleepNGAfterError | UnknownException
class TopSleepNGError {
static tagName = 'Top.Sleep.Ng'
readonly _tag = TopSleepNGError.tagName
ngData: AllNgData
constructor(ng: AllNgData) {
this.ngData = ng
}
static isClass(test: any): test is SleepNGAfterError {
return test._tag === SleepNGAfterError.tagName
}
getLastMessage() {
const lastMessage =
this.ngData instanceof UnknownException
? ' - ng - UnknownException'
: this.ngData.getMessage('')
return lastMessage
}
getMessage(site: string) {
const index = TopOkData.getTopIndex(site)
const lastMessage = this.getLastMessage()
return `mapper2-index = ${index}${lastMessage}`
}
getTopMessage(site: string) {
const lastMessage = this.getLastMessage()
return `${site.substring(0, site.length - 2)}${lastMessage}`
}
}
class AlreadyAbortError {
static tagName = 'Already.Abort.Error'
readonly _tag = AlreadyAbortError.tagName
static isClass(test: any): test is AlreadyAbortError {
return test._tag === AlreadyAbortError.tagName
}
static getMessage(site: string) {
return `${site} - aborted - from event`
}
}
class TopAlreadyAbortError {
static tagName = 'Top.Already.Abort.Error'
readonly _tag = TopAlreadyAbortError.tagName
static isClass(test: any): test is TopAlreadyAbortError {
return test._tag === TopAlreadyAbortError.tagName
}
static getMessage(site: string) {
const topIndex = TopOkData.getTopIndex(site)
return `mapper2-index = ${topIndex} - aborted - from event`
}
}
class OriginAbortError {
static tagName = 'Origin.Abort.Error'
readonly _tag = OriginAbortError.tagName
site: string
reason: string
constructor(s: string, r: string) {
this.site = s
this.reason = r
}
static isClass(test: any): test is OriginAbortError {
return test._tag === OriginAbortError.tagName
}
getMessage() {
return `${this.site} - abort - ${this.reason}`
}
static siteCheck(abortAction: Function, topIndex?: number) {
return (site: string) => {
const lastIndex = OkData.getLastIndex(site)
let originAbortError = undefined
if (Number.isNaN(lastIndex)) {
originAbortError = new OriginAbortError(site, `${lastIndex} is NaN`)
} else if (lastIndex === undefined) {
originAbortError = new OriginAbortError(site, `${lastIndex} is undefined`)
} else if (site[site.length - 2] !== '/') {
originAbortError = new OriginAbortError(site, `lastIndex >= 10`)
} else if (topIndex && topIndex + lastIndex >= 10) {
originAbortError = new OriginAbortError(site, `topIndex + lastIndex >= 10`)
}
if (originAbortError !== undefined) {
Effect.runPromise(checkLog(originAbortError.getMessage()))
abortAction()
return Effect.fail(originAbortError)
}
return Effect.succeed(site)
}
}
}
class TopOkData {
site: string
wait: number
index: number
constructor(s: string, ok: OkData) {
this.site = s
this.wait = ok.wait
this.index = TopOkData.getTopIndex(s)
}
static isClass(test: any): test is TopOkData {
return test instanceof TopOkData
}
static getTopIndex(site: string) {
const index = site.substring(site.length - 3, site.length - 2)
return Number(index)
}
static getMessage(site: string) {
const index = TopOkData.getTopIndex(site)
return `mapper2-index = ${index}`
}
getTopMessage(opWait: number) {
return `${this.site.substring(0, this.site.length - 2)} - ok - ${this.wait + opWait}`
}
}
class OkData {
wait: number
constructor(w: number) {
this.wait = w
}
static isClass(test: any): test is OkData {
return test instanceof OkData
}
static getLastIndex(site: string) {
const index = site.slice(-1)
return Number(index)
}
getMessage(site: string) {
return `${site} - ok - ${this.wait}`
}
}
const sleep = (wait: number) =>
Effect.tryPromise<OkData>(
() =>
new Promise((resolve, reject) => {
const waitSeconds = wait / 1000
if (waitSeconds % 3 === 0) {
reject(new SleepNGBeforeError())
} else if (waitSeconds % 2 === 0) {
setTimeout(() => reject(new SleepNGAfterError(wait)), wait)
} else {
setTimeout(() => resolve(new OkData(wait)), wait)
}
})
)
const checkLog = (text: string) =>
Effect.sync(() => console.log(`time : ${new Date().toISOString()} - text: ${text}`))
const testOptions = {
timeout: 1000 * 100
}
const sumNumber = (chunkSize: number) => (a: number[], b: number[]) => {
function addArray(num: number) {
if (a.length === chunkSize) {
const aMin = Math.min(...a)
const aMinIndex = a.indexOf(aMin)
a[aMinIndex] += num
} else {
a.push(num)
}
}
b.forEach((bNum) => addArray(bNum))
return a
}
function getWaitNumber(index: number) {
const wait = index * 1000
return wait
}
function mapError_sleepNg<T>(
site: string,
effectAndThen: Effect.Effect<T, UnknownException, never>
) {
const effectAll = Effect.mapError(effectAndThen, (err) => {
if (SleepNGBeforeError.isClass(err.error)) {
return err.error
} else if (SleepNGAfterError.isClass(err.error)) {
return err.error
} else {
Effect.runPromise(checkLog(`mapError_sleepNg : UnknownException : ${err}`))
return err
}
})
return effectAll
}
function mapNumber_fromData<T, U>(data: Either.Either<T, U>) {
const result = Either.mapBoth(data, {
onLeft: (left) => {
if (OkData.isClass(left)) {
return left.wait
} else if (SleepNGBeforeError.isClass(left)) {
return 0
} else if (SleepNGAfterError.isClass(left)) {
return left.wait
}
return 0
},
onRight: (right) => {
if (OkData.isClass(right)) {
return right.wait
}
return 0
}
})
return Either.isLeft(result) ? result.left : result.right
}
const do_sleep1 = (site: string) => (wait: number) =>
Effect.try(() => {
const sleepEffect = sleep(wait)
const effectAll = mapError_sleepNg(site, sleepEffect)
return effectAll
})
const do_sleep2 = (site: string) => (wait: number) =>
Effect.try(() => {
const sleepEffect = sleep(wait)
const effectAndThen = Effect.andThen(sleepEffect, (okData) => {
const topOkData = new TopOkData(site, okData)
return topOkData
})
const effectAll = mapError_sleepNg(site, effectAndThen)
return effectAll
})
const mapper1_gen = (abortController: AbortController, topIndex?: number) => (site: string) =>
Effect.gen(function* () {
if (abortController.signal.aborted) {
const alreadyAbortError = new AlreadyAbortError()
Effect.runPromise(checkLog(AlreadyAbortError.getMessage(site)))
return Either.left(alreadyAbortError)
}
const abortEvent = () => {
Effect.runPromise(checkLog(AlreadyAbortError.getMessage(site)))
}
abortController.signal.addEventListener('abort', abortEvent)
function removeEvent() {
abortController.signal.removeEventListener('abort', abortEvent)
}
const abortAction = () => {
removeEvent()
abortController.abort()
}
const abortCheck = yield* pipe(site, OriginAbortError.siteCheck(abortAction, topIndex))
const effect = yield* pipe(abortCheck, OkData.getLastIndex, getWaitNumber, do_sleep1(site))
const either = yield* Effect.either(effect)
if (Either.isRight(either)) {
const okData = either.right
const message = okData.getMessage(site)
Effect.runPromise(checkLog(message))
} else if (Either.isLeft(either)) {
const ngData = either.left
if (SleepNGBeforeError.isClass(ngData)) {
const message = ngData.getMessage(site)
Effect.runPromise(checkLog(message))
} else if (SleepNGAfterError.isClass(ngData)) {
const message = ngData.getMessage(site)
Effect.runPromise(checkLog(message))
}
}
removeEvent()
return either
})
const mapper2_gen =
(abortController: AbortController, options: { concurrency: number }) => (sites: string[]) =>
Effect.gen(function* () {
const site = sites[0]
if (abortController.signal.aborted) {
const topAlreadyAbortError = new TopAlreadyAbortError()
Effect.runPromise(checkLog(TopAlreadyAbortError.getMessage(site)))
return Either.left(topAlreadyAbortError)
}
const abortEvent = () => {
Effect.runPromise(checkLog(TopAlreadyAbortError.getMessage(site)))
}
abortController.signal.addEventListener('abort', abortEvent)
function removeEvent() {
abortController.signal.removeEventListener('abort', abortEvent)
}
const effect2 = yield* pipe(site, TopOkData.getTopIndex, getWaitNumber, do_sleep2(site))
const either2 = yield* Effect.either(effect2)
const map = Either.mapBoth(either2, {
onLeft: (ngData) => {
const topNgData = new TopSleepNGError(ngData)
const message = topNgData.getMessage(site)
Effect.runPromise(checkLog(message))
const topMessage = topNgData.getTopMessage(site)
return topMessage
},
onRight: (topOkData) =>
Effect.gen(function* () {
const message = TopOkData.getMessage(site)
Effect.runPromise(checkLog(message))
const either1 = yield* Effect.all(
sites.map(mapper1_gen(abortController, topOkData.index)),
options
)
const arrayEither1 = either1.map(mapNumber_fromData)
const concurrency = options['concurrency']
const chunkArray = Array.chunksOf(arrayEither1, concurrency)
const waitNumber = Array.reduce(chunkArray, [0], sumNumber(concurrency))
const topMessage = topOkData.getTopMessage(Math.max(...waitNumber))
return topMessage
})
})
const result = Either.isLeft(map) ? map : Either.right(yield* map.right)
removeEvent()
return result
})
describe('effect-ts 非同期処理2つの配列', () => {
test('使い方3ー全部同時実行からの直列実行', testOptions, async () => {
const sites = [
// ['https://test/1/1'],
// ['https://test/2/1', 'https://test/2/2']
// ['https://test/3/1', 'https://test/3/2', 'https://test/3/3']
// ['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
// [
// 'https://test/5/1',
// 'https://test/5/2',
// 'https://test/5/3',
// 'https://test/5/4',
// 'https://test/5/5'
// ],
[
'https://test/7/1',
'https://test/7/2',
'https://test/7/3',
'https://test/7/4',
'https://test/7/5',
'https://test/7/6',
'https://test/7/7'
]
]
const abortController = new AbortController()
Effect.runPromise(checkLog('[start]'))
const effect = await Effect.all(sites.map(mapper2_gen(abortController, { concurrency: 1 })), {
concurrency: Number.MAX_VALUE
})
const exit = await Effect.runPromiseExit(effect, { signal: abortController.signal })
console.log(
Exit.match(exit, {
onFailure: (cause) => `中断しました: ${Cause.pretty(cause)}`,
onSuccess: (either) => either
})
)
})
})
実行結果
time : 2025-02-26T13:47:01.774Z - text: [start]
time : 2025-02-26T13:47:08.793Z - text: mapper2-index = 7
time : 2025-02-26T13:47:09.810Z - text: https://test/7/1 - ok - 1000
time : 2025-02-26T13:47:11.834Z - text: https://test/7/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-26T13:47:11.836Z - text: https://test/7/3 - abort - topIndex + lastIndex >= 10
time : 2025-02-26T13:47:11.841Z - text: mapper2-index = 7 - aborted - from event
中断しました: All fibers interrupted without errors.
実行結果(※中断データを除外した場合)
time : 2025-02-26T13:48:58.173Z - text: [start]
time : 2025-02-26T13:49:05.191Z - text: mapper2-index = 7
time : 2025-02-26T13:49:06.195Z - text: https://test/7/1 - ok - 1000
time : 2025-02-26T13:49:08.202Z - text: https://test/7/2 - ng - 2000 - Sleep NG after wait
[
{
_id: 'Either',
_tag: 'Right',
right: 'https://test/7 - ok - 10000'
}
]
全部同時実行からの最大2個まで同時実行
import { describe, test } from 'vitest'
import { Effect, pipe, Array, Either, Exit, Cause } from 'effect'
import { UnknownException } from 'effect/Cause'
class SleepNGBeforeError {
static tagName = 'Sleep.Ng.Before'
readonly _tag = SleepNGBeforeError.tagName
static isClass(test: any): test is SleepNGBeforeError {
return test._tag === SleepNGBeforeError.tagName
}
getMessage(site: string) {
return `${site} - ng - Sleep NG before wait`
}
}
class SleepNGAfterError {
static tagName = 'Sleep.Ng.After'
readonly _tag = SleepNGAfterError.tagName
wait: number
constructor(w: number) {
this.wait = w
}
static isClass(test: any): test is SleepNGAfterError {
return test._tag === SleepNGAfterError.tagName
}
getMessage(site: string) {
return `${site} - ng - ${this.wait} - Sleep NG after wait`
}
}
type AllNgData = SleepNGBeforeError | SleepNGAfterError | UnknownException
class TopSleepNGError {
static tagName = 'Top.Sleep.Ng'
readonly _tag = TopSleepNGError.tagName
ngData: AllNgData
constructor(ng: AllNgData) {
this.ngData = ng
}
static isClass(test: any): test is SleepNGAfterError {
return test._tag === SleepNGAfterError.tagName
}
getLastMessage() {
const lastMessage =
this.ngData instanceof UnknownException
? ' - ng - UnknownException'
: this.ngData.getMessage('')
return lastMessage
}
getMessage(site: string) {
const index = TopOkData.getTopIndex(site)
const lastMessage = this.getLastMessage()
return `mapper2-index = ${index}${lastMessage}`
}
getTopMessage(site: string) {
const lastMessage = this.getLastMessage()
return `${site.substring(0, site.length - 2)}${lastMessage}`
}
}
class AlreadyAbortError {
static tagName = 'Already.Abort.Error'
readonly _tag = AlreadyAbortError.tagName
static isClass(test: any): test is AlreadyAbortError {
return test._tag === AlreadyAbortError.tagName
}
static getMessage(site: string) {
return `${site} - aborted - from event`
}
}
class TopAlreadyAbortError {
static tagName = 'Top.Already.Abort.Error'
readonly _tag = TopAlreadyAbortError.tagName
static isClass(test: any): test is TopAlreadyAbortError {
return test._tag === TopAlreadyAbortError.tagName
}
static getMessage(site: string) {
const topIndex = TopOkData.getTopIndex(site)
return `mapper2-index = ${topIndex} - aborted - from event`
}
}
class OriginAbortError {
static tagName = 'Origin.Abort.Error'
readonly _tag = OriginAbortError.tagName
site: string
reason: string
constructor(s: string, r: string) {
this.site = s
this.reason = r
}
static isClass(test: any): test is OriginAbortError {
return test._tag === OriginAbortError.tagName
}
getMessage() {
return `${this.site} - abort - ${this.reason}`
}
static siteCheck(abortAction: Function, topIndex?: number) {
return (site: string) => {
const lastIndex = OkData.getLastIndex(site)
let originAbortError = undefined
if (Number.isNaN(lastIndex)) {
originAbortError = new OriginAbortError(site, `${lastIndex} is NaN`)
} else if (lastIndex === undefined) {
originAbortError = new OriginAbortError(site, `${lastIndex} is undefined`)
} else if (site[site.length - 2] !== '/') {
originAbortError = new OriginAbortError(site, `lastIndex >= 10`)
} else if (topIndex && topIndex + lastIndex >= 10) {
originAbortError = new OriginAbortError(site, `topIndex + lastIndex >= 10`)
}
if (originAbortError !== undefined) {
Effect.runPromise(checkLog(originAbortError.getMessage()))
abortAction()
return Effect.fail(originAbortError)
}
return Effect.succeed(site)
}
}
}
class TopOkData {
site: string
wait: number
index: number
constructor(s: string, ok: OkData) {
this.site = s
this.wait = ok.wait
this.index = TopOkData.getTopIndex(s)
}
static isClass(test: any): test is TopOkData {
return test instanceof TopOkData
}
static getTopIndex(site: string) {
const index = site.substring(site.length - 3, site.length - 2)
return Number(index)
}
static getMessage(site: string) {
const index = TopOkData.getTopIndex(site)
return `mapper2-index = ${index}`
}
getTopMessage(opWait: number) {
return `${this.site.substring(0, this.site.length - 2)} - ok - ${this.wait + opWait}`
}
}
class OkData {
wait: number
constructor(w: number) {
this.wait = w
}
static isClass(test: any): test is OkData {
return test instanceof OkData
}
static getLastIndex(site: string) {
const index = site.slice(-1)
return Number(index)
}
getMessage(site: string) {
return `${site} - ok - ${this.wait}`
}
}
const sleep = (wait: number) =>
Effect.tryPromise<OkData>(
() =>
new Promise((resolve, reject) => {
const waitSeconds = wait / 1000
if (waitSeconds % 3 === 0) {
reject(new SleepNGBeforeError())
} else if (waitSeconds % 2 === 0) {
setTimeout(() => reject(new SleepNGAfterError(wait)), wait)
} else {
setTimeout(() => resolve(new OkData(wait)), wait)
}
})
)
const checkLog = (text: string) =>
Effect.sync(() => console.log(`time : ${new Date().toISOString()} - text: ${text}`))
const testOptions = {
timeout: 1000 * 100
}
const sumNumber = (chunkSize: number) => (a: number[], b: number[]) => {
function addArray(num: number) {
if (a.length === chunkSize) {
const aMin = Math.min(...a)
const aMinIndex = a.indexOf(aMin)
a[aMinIndex] += num
} else {
a.push(num)
}
}
b.forEach((bNum) => addArray(bNum))
return a
}
function getWaitNumber(index: number) {
const wait = index * 1000
return wait
}
function mapError_sleepNg<T>(
site: string,
effectAndThen: Effect.Effect<T, UnknownException, never>
) {
const effectAll = Effect.mapError(effectAndThen, (err) => {
if (SleepNGBeforeError.isClass(err.error)) {
return err.error
} else if (SleepNGAfterError.isClass(err.error)) {
return err.error
} else {
Effect.runPromise(checkLog(`mapError_sleepNg : UnknownException : ${err}`))
return err
}
})
return effectAll
}
function mapNumber_fromData<T, U>(data: Either.Either<T, U>) {
const result = Either.mapBoth(data, {
onLeft: (left) => {
if (OkData.isClass(left)) {
return left.wait
} else if (SleepNGBeforeError.isClass(left)) {
return 0
} else if (SleepNGAfterError.isClass(left)) {
return left.wait
}
return 0
},
onRight: (right) => {
if (OkData.isClass(right)) {
return right.wait
}
return 0
}
})
return Either.isLeft(result) ? result.left : result.right
}
const do_sleep1 = (site: string) => (wait: number) =>
Effect.try(() => {
const sleepEffect = sleep(wait)
const effectAll = mapError_sleepNg(site, sleepEffect)
return effectAll
})
const do_sleep2 = (site: string) => (wait: number) =>
Effect.try(() => {
const sleepEffect = sleep(wait)
const effectAndThen = Effect.andThen(sleepEffect, (okData) => {
const topOkData = new TopOkData(site, okData)
return topOkData
})
const effectAll = mapError_sleepNg(site, effectAndThen)
return effectAll
})
const mapper1_gen = (abortController: AbortController, topIndex?: number) => (site: string) =>
Effect.gen(function* () {
if (abortController.signal.aborted) {
const alreadyAbortError = new AlreadyAbortError()
Effect.runPromise(checkLog(AlreadyAbortError.getMessage(site)))
return Either.left(alreadyAbortError)
}
const abortEvent = () => {
Effect.runPromise(checkLog(AlreadyAbortError.getMessage(site)))
}
abortController.signal.addEventListener('abort', abortEvent)
function removeEvent() {
abortController.signal.removeEventListener('abort', abortEvent)
}
const abortAction = () => {
removeEvent()
abortController.abort()
}
const abortCheck = yield* pipe(site, OriginAbortError.siteCheck(abortAction, topIndex))
const effect = yield* pipe(abortCheck, OkData.getLastIndex, getWaitNumber, do_sleep1(site))
const either = yield* Effect.either(effect)
if (Either.isRight(either)) {
const okData = either.right
const message = okData.getMessage(site)
Effect.runPromise(checkLog(message))
} else if (Either.isLeft(either)) {
const ngData = either.left
if (SleepNGBeforeError.isClass(ngData)) {
const message = ngData.getMessage(site)
Effect.runPromise(checkLog(message))
} else if (SleepNGAfterError.isClass(ngData)) {
const message = ngData.getMessage(site)
Effect.runPromise(checkLog(message))
}
}
removeEvent()
return either
})
const mapper2_gen =
(abortController: AbortController, options: { concurrency: number }) => (sites: string[]) =>
Effect.gen(function* () {
const site = sites[0]
if (abortController.signal.aborted) {
const topAlreadyAbortError = new TopAlreadyAbortError()
Effect.runPromise(checkLog(TopAlreadyAbortError.getMessage(site)))
return Either.left(topAlreadyAbortError)
}
const abortEvent = () => {
Effect.runPromise(checkLog(TopAlreadyAbortError.getMessage(site)))
}
abortController.signal.addEventListener('abort', abortEvent)
function removeEvent() {
abortController.signal.removeEventListener('abort', abortEvent)
}
const effect2 = yield* pipe(site, TopOkData.getTopIndex, getWaitNumber, do_sleep2(site))
const either2 = yield* Effect.either(effect2)
const map = Either.mapBoth(either2, {
onLeft: (ngData) => {
const topNgData = new TopSleepNGError(ngData)
const message = topNgData.getMessage(site)
Effect.runPromise(checkLog(message))
const topMessage = topNgData.getTopMessage(site)
return topMessage
},
onRight: (topOkData) =>
Effect.gen(function* () {
const message = TopOkData.getMessage(site)
Effect.runPromise(checkLog(message))
const either1 = yield* Effect.all(
sites.map(mapper1_gen(abortController, topOkData.index)),
options
)
const arrayEither1 = either1.map(mapNumber_fromData)
const concurrency = options['concurrency']
const chunkArray = Array.chunksOf(arrayEither1, concurrency)
const waitNumber = Array.reduce(chunkArray, [0], sumNumber(concurrency))
const topMessage = topOkData.getTopMessage(Math.max(...waitNumber))
return topMessage
})
})
const result = Either.isLeft(map) ? map : Either.right(yield* map.right)
removeEvent()
return result
})
describe('effect-ts 非同期処理2つの配列', () => {
test('使い方5ー全部同時実行からの最大2個まで同時実行', testOptions, async () => {
const sites = [
// ['https://test/1/1'],
// ['https://test/2/1', 'https://test/2/2']
// ['https://test/3/1', 'https://test/3/2', 'https://test/3/3']
// ['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
// [
// 'https://test/5/1',
// 'https://test/5/2',
// 'https://test/5/3',
// 'https://test/5/4',
// 'https://test/5/5'
// ],
[
'https://test/7/1',
'https://test/7/2',
'https://test/7/3',
'https://test/7/4',
'https://test/7/5',
'https://test/7/6',
'https://test/7/7'
]
]
const abortController = new AbortController()
Effect.runPromise(checkLog('[start]'))
const effect = await Effect.all(sites.map(mapper2_gen(abortController, { concurrency: 2 })), {
concurrency: Number.MAX_VALUE
})
const exit = await Effect.runPromiseExit(effect, { signal: abortController.signal })
console.log(
Exit.match(exit, {
onFailure: (cause) => `中断しました: ${Cause.pretty(cause)}`,
onSuccess: (either) => either
})
)
})
})
実行結果
time : 2025-02-26T13:51:20.945Z - text: [start]
time : 2025-02-26T13:51:27.969Z - text: mapper2-index = 7
time : 2025-02-26T13:51:28.986Z - text: https://test/7/1 - ok - 1000
time : 2025-02-26T13:51:28.991Z - text: https://test/7/3 - abort - topIndex + lastIndex >= 10
time : 2025-02-26T13:51:28.996Z - text: mapper2-index = 7 - aborted - from event
time : 2025-02-26T13:51:28.997Z - text: https://test/7/2 - aborted - from event
中断しました: All fibers interrupted without errors.
実行結果(※中断データを除外した場合)
time : 2025-02-26T13:53:40.471Z - text: [start]
time : 2025-02-26T13:53:47.493Z - text: mapper2-index = 7
time : 2025-02-26T13:53:48.509Z - text: https://test/7/1 - ok - 1000
time : 2025-02-26T13:53:49.511Z - text: https://test/7/2 - ng - 2000 - Sleep NG after wait
[
{ _id: 'Either', _tag: 'Right', right: 'https://test/7 - ok - 9000' }
]
配列が2つの場合(複雑)
全部同時実行からの全部同時実行
import { describe, test } from 'vitest'
import { Effect, pipe, Array, Either, Exit, Cause } from 'effect'
import { UnknownException } from 'effect/Cause'
class SleepNGBeforeError {
static tagName = 'Sleep.Ng.Before'
readonly _tag = SleepNGBeforeError.tagName
static isClass(test: any): test is SleepNGBeforeError {
return test._tag === SleepNGBeforeError.tagName
}
getMessage(site: string) {
return `${site} - ng - Sleep NG before wait`
}
}
class SleepNGAfterError {
static tagName = 'Sleep.Ng.After'
readonly _tag = SleepNGAfterError.tagName
wait: number
constructor(w: number) {
this.wait = w
}
static isClass(test: any): test is SleepNGAfterError {
return test._tag === SleepNGAfterError.tagName
}
getMessage(site: string) {
return `${site} - ng - ${this.wait} - Sleep NG after wait`
}
}
type AllNgData = SleepNGBeforeError | SleepNGAfterError | UnknownException
class TopSleepNGError {
static tagName = 'Top.Sleep.Ng'
readonly _tag = TopSleepNGError.tagName
ngData: AllNgData
constructor(ng: AllNgData) {
this.ngData = ng
}
static isClass(test: any): test is SleepNGAfterError {
return test._tag === SleepNGAfterError.tagName
}
getLastMessage() {
const lastMessage =
this.ngData instanceof UnknownException
? ' - ng - UnknownException'
: this.ngData.getMessage('')
return lastMessage
}
getMessage(site: string) {
const index = TopOkData.getTopIndex(site)
const lastMessage = this.getLastMessage()
return `mapper2-index = ${index}${lastMessage}`
}
getTopMessage(site: string) {
const lastMessage = this.getLastMessage()
return `${site.substring(0, site.length - 2)}${lastMessage}`
}
}
class AlreadyAbortError {
static tagName = 'Already.Abort.Error'
readonly _tag = AlreadyAbortError.tagName
static isClass(test: any): test is AlreadyAbortError {
return test._tag === AlreadyAbortError.tagName
}
static getMessage(site: string) {
return `${site} - aborted - from event`
}
}
class TopAlreadyAbortError {
static tagName = 'Top.Already.Abort.Error'
readonly _tag = TopAlreadyAbortError.tagName
static isClass(test: any): test is TopAlreadyAbortError {
return test._tag === TopAlreadyAbortError.tagName
}
static getMessage(site: string) {
const topIndex = TopOkData.getTopIndex(site)
return `mapper2-index = ${topIndex} - aborted - from event`
}
}
class OriginAbortError {
static tagName = 'Origin.Abort.Error'
readonly _tag = OriginAbortError.tagName
site: string
reason: string
constructor(s: string, r: string) {
this.site = s
this.reason = r
}
static isClass(test: any): test is OriginAbortError {
return test._tag === OriginAbortError.tagName
}
getMessage() {
return `${this.site} - abort - ${this.reason}`
}
static siteCheck(abortAction: Function, topIndex?: number) {
return (site: string) => {
const lastIndex = OkData.getLastIndex(site)
let originAbortError = undefined
if (Number.isNaN(lastIndex)) {
originAbortError = new OriginAbortError(site, `${lastIndex} is NaN`)
} else if (lastIndex === undefined) {
originAbortError = new OriginAbortError(site, `${lastIndex} is undefined`)
} else if (site[site.length - 2] !== '/') {
originAbortError = new OriginAbortError(site, `lastIndex >= 10`)
} else if (topIndex && topIndex + lastIndex >= 10) {
originAbortError = new OriginAbortError(site, `topIndex + lastIndex >= 10`)
}
if (originAbortError !== undefined) {
Effect.runPromise(checkLog(originAbortError.getMessage()))
abortAction()
return Effect.fail(originAbortError)
}
return Effect.succeed(site)
}
}
}
class TopOkData {
site: string
wait: number
index: number
constructor(s: string, ok: OkData) {
this.site = s
this.wait = ok.wait
this.index = TopOkData.getTopIndex(s)
}
static isClass(test: any): test is TopOkData {
return test instanceof TopOkData
}
static getTopIndex(site: string) {
const index = site.substring(site.length - 3, site.length - 2)
return Number(index)
}
static getMessage(site: string) {
const index = TopOkData.getTopIndex(site)
return `mapper2-index = ${index}`
}
getTopMessage(opWait: number) {
return `${this.site.substring(0, this.site.length - 2)} - ok - ${this.wait + opWait}`
}
}
class OkData {
wait: number
constructor(w: number) {
this.wait = w
}
static isClass(test: any): test is OkData {
return test instanceof OkData
}
static getLastIndex(site: string) {
const index = site.slice(-1)
return Number(index)
}
getMessage(site: string) {
return `${site} - ok - ${this.wait}`
}
}
const sleep = (wait: number) =>
Effect.tryPromise<OkData>(
() =>
new Promise((resolve, reject) => {
const waitSeconds = wait / 1000
if (waitSeconds % 3 === 0) {
reject(new SleepNGBeforeError())
} else if (waitSeconds % 2 === 0) {
setTimeout(() => reject(new SleepNGAfterError(wait)), wait)
} else {
setTimeout(() => resolve(new OkData(wait)), wait)
}
})
)
const checkLog = (text: string) =>
Effect.sync(() => console.log(`time : ${new Date().toISOString()} - text: ${text}`))
const testOptions = {
timeout: 1000 * 100
}
const sumNumber = (chunkSize: number) => (a: number[], b: number[]) => {
function addArray(num: number) {
if (a.length === chunkSize) {
const aMin = Math.min(...a)
const aMinIndex = a.indexOf(aMin)
a[aMinIndex] += num
} else {
a.push(num)
}
}
b.forEach((bNum) => addArray(bNum))
return a
}
function getWaitNumber(index: number) {
const wait = index * 1000
return wait
}
function mapError_sleepNg<T>(
site: string,
effectAndThen: Effect.Effect<T, UnknownException, never>
) {
const effectAll = Effect.mapError(effectAndThen, (err) => {
if (SleepNGBeforeError.isClass(err.error)) {
return err.error
} else if (SleepNGAfterError.isClass(err.error)) {
return err.error
} else {
Effect.runPromise(checkLog(`mapError_sleepNg : UnknownException : ${err}`))
return err
}
})
return effectAll
}
function mapNumber_fromData<T, U>(data: Either.Either<T, U>) {
const result = Either.mapBoth(data, {
onLeft: (left) => {
if (OkData.isClass(left)) {
return left.wait
} else if (SleepNGBeforeError.isClass(left)) {
return 0
} else if (SleepNGAfterError.isClass(left)) {
return left.wait
}
return 0
},
onRight: (right) => {
if (OkData.isClass(right)) {
return right.wait
}
return 0
}
})
return Either.isLeft(result) ? result.left : result.right
}
const do_sleep1 = (site: string) => (wait: number) =>
Effect.try(() => {
const sleepEffect = sleep(wait)
const effectAll = mapError_sleepNg(site, sleepEffect)
return effectAll
})
const do_sleep2 = (site: string) => (wait: number) =>
Effect.try(() => {
const sleepEffect = sleep(wait)
const effectAndThen = Effect.andThen(sleepEffect, (okData) => {
const topOkData = new TopOkData(site, okData)
return topOkData
})
const effectAll = mapError_sleepNg(site, effectAndThen)
return effectAll
})
const mapper1_gen = (abortController: AbortController, topIndex?: number) => (site: string) =>
Effect.gen(function* () {
if (abortController.signal.aborted) {
const alreadyAbortError = new AlreadyAbortError()
Effect.runPromise(checkLog(AlreadyAbortError.getMessage(site)))
return Either.left(alreadyAbortError)
}
const abortEvent = () => {
Effect.runPromise(checkLog(AlreadyAbortError.getMessage(site)))
}
abortController.signal.addEventListener('abort', abortEvent)
function removeEvent() {
abortController.signal.removeEventListener('abort', abortEvent)
}
const abortAction = () => {
removeEvent()
abortController.abort()
}
const abortCheck = yield* pipe(site, OriginAbortError.siteCheck(abortAction, topIndex))
const effect = yield* pipe(abortCheck, OkData.getLastIndex, getWaitNumber, do_sleep1(site))
const either = yield* Effect.either(effect)
if (Either.isRight(either)) {
const okData = either.right
const message = okData.getMessage(site)
Effect.runPromise(checkLog(message))
} else if (Either.isLeft(either)) {
const ngData = either.left
if (SleepNGBeforeError.isClass(ngData)) {
const message = ngData.getMessage(site)
Effect.runPromise(checkLog(message))
} else if (SleepNGAfterError.isClass(ngData)) {
const message = ngData.getMessage(site)
Effect.runPromise(checkLog(message))
}
}
removeEvent()
return either
})
const mapper2_gen =
(abortController: AbortController, options: { concurrency: number }) => (sites: string[]) =>
Effect.gen(function* () {
const site = sites[0]
if (abortController.signal.aborted) {
const topAlreadyAbortError = new TopAlreadyAbortError()
Effect.runPromise(checkLog(TopAlreadyAbortError.getMessage(site)))
return Either.left(topAlreadyAbortError)
}
const abortEvent = () => {
Effect.runPromise(checkLog(TopAlreadyAbortError.getMessage(site)))
}
abortController.signal.addEventListener('abort', abortEvent)
function removeEvent() {
abortController.signal.removeEventListener('abort', abortEvent)
}
const effect2 = yield* pipe(site, TopOkData.getTopIndex, getWaitNumber, do_sleep2(site))
const either2 = yield* Effect.either(effect2)
const map = Either.mapBoth(either2, {
onLeft: (ngData) => {
const topNgData = new TopSleepNGError(ngData)
const message = topNgData.getMessage(site)
Effect.runPromise(checkLog(message))
const topMessage = topNgData.getTopMessage(site)
return topMessage
},
onRight: (topOkData) =>
Effect.gen(function* () {
const message = TopOkData.getMessage(site)
Effect.runPromise(checkLog(message))
const either1 = yield* Effect.all(
sites.map(mapper1_gen(abortController, topOkData.index)),
options
)
const arrayEither1 = either1.map(mapNumber_fromData)
const concurrency = options['concurrency']
const chunkArray = Array.chunksOf(arrayEither1, concurrency)
const waitNumber = Array.reduce(chunkArray, [0], sumNumber(concurrency))
const topMessage = topOkData.getTopMessage(Math.max(...waitNumber))
return topMessage
})
})
const result = Either.isLeft(map) ? map : Either.right(yield* map.right)
removeEvent()
return result
})
describe('effect-ts 非同期処理2つの配列', () => {
test('使い方2ー全部同時実行からの全部同時実行', testOptions, async () => {
const sites = [
['https://test/1/1'],
['https://test/2/1', 'https://test/2/2'],
['https://test/3/1', 'https://test/3/2', 'https://test/3/3'],
['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
[
'https://test/5/1',
'https://test/5/2',
'https://test/5/3',
'https://test/5/4',
'https://test/5/5'
],
[
'https://test/7/1',
'https://test/7/2',
'https://test/7/3',
'https://test/7/4',
'https://test/7/5',
'https://test/7/6',
'https://test/7/7'
]
]
const abortController = new AbortController()
Effect.runPromise(checkLog('[start]'))
const effect = await Effect.all(
sites.map(mapper2_gen(abortController, { concurrency: Number.MAX_VALUE })),
{
concurrency: Number.MAX_VALUE
}
)
const exit = await Effect.runPromiseExit(effect, { signal: abortController.signal })
console.log(
Exit.match(exit, {
onFailure: (cause) => `中断しました: ${Cause.pretty(cause)}`,
onSuccess: (either) => either
})
)
})
})
実行結果
time : 2025-02-26T13:54:41.810Z - text: [start]
time : 2025-02-26T13:54:41.825Z - text: mapper2-index = 3 - ng - Sleep NG before wait
time : 2025-02-26T13:54:42.827Z - text: mapper2-index = 1
time : 2025-02-26T13:54:43.829Z - text: mapper2-index = 2 - ng - 2000 - Sleep NG after wait
time : 2025-02-26T13:54:43.846Z - text: https://test/1/1 - ok - 1000
time : 2025-02-26T13:54:45.837Z - text: mapper2-index = 4 - ng - 4000 - Sleep NG after wait
time : 2025-02-26T13:54:46.826Z - text: mapper2-index = 5
time : 2025-02-26T13:54:46.835Z - text: https://test/5/5 - abort - topIndex + lastIndex >= 10
time : 2025-02-26T13:54:46.839Z - text: mapper2-index = 5 - aborted - from event
time : 2025-02-26T13:54:46.841Z - text: mapper2-index = 7 - aborted - from event
time : 2025-02-26T13:54:46.842Z - text: https://test/5/1 - aborted - from event
time : 2025-02-26T13:54:46.843Z - text: https://test/5/2 - aborted - from event
time : 2025-02-26T13:54:46.844Z - text: https://test/5/3 - aborted - from event
time : 2025-02-26T13:54:46.844Z - text: https://test/5/4 - aborted - from event
中断しました: All fibers interrupted without errors.
実行結果(※中断データを除外した場合)
time : 2025-02-26T13:56:17.360Z - text: [start]
time : 2025-02-26T13:56:17.375Z - text: mapper2-index = 3 - ng - Sleep NG before wait
time : 2025-02-26T13:56:18.380Z - text: mapper2-index = 1
time : 2025-02-26T13:56:19.379Z - text: mapper2-index = 2 - ng - 2000 - Sleep NG after wait
time : 2025-02-26T13:56:19.396Z - text: https://test/1/1 - ok - 1000
time : 2025-02-26T13:56:21.381Z - text: mapper2-index = 4 - ng - 4000 - Sleep NG after wait
time : 2025-02-26T13:56:22.379Z - text: mapper2-index = 5
time : 2025-02-26T13:56:22.383Z - text: https://test/5/3 - ng - Sleep NG before wait
time : 2025-02-26T13:56:23.386Z - text: https://test/5/1 - ok - 1000
time : 2025-02-26T13:56:24.381Z - text: mapper2-index = 7
time : 2025-02-26T13:56:24.388Z - text: https://test/5/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-26T13:56:25.398Z - text: https://test/7/1 - ok - 1000
time : 2025-02-26T13:56:26.397Z - text: https://test/5/4 - ng - 4000 - Sleep NG after wait
time : 2025-02-26T13:56:26.404Z - text: https://test/7/2 - ng - 2000 - Sleep NG after wait
[
{ _id: 'Either', _tag: 'Right', right: 'https://test/1 - ok - 2000' },
{
_id: 'Either',
_tag: 'Left',
left: 'https://test/2 - ng - 2000 - Sleep NG after wait'
},
{
_id: 'Either',
_tag: 'Left',
left: 'https://test/3 - ng - Sleep NG before wait'
},
{
_id: 'Either',
_tag: 'Left',
left: 'https://test/4 - ng - 4000 - Sleep NG after wait'
},
{ _id: 'Either', _tag: 'Right', right: 'https://test/5 - ok - 9000' },
{ _id: 'Either', _tag: 'Right', right: 'https://test/7 - ok - 9000' }
]
全部同時実行からの直列実行
import { describe, test } from 'vitest'
import { Effect, pipe, Array, Either, Exit, Cause } from 'effect'
import { UnknownException } from 'effect/Cause'
class SleepNGBeforeError {
static tagName = 'Sleep.Ng.Before'
readonly _tag = SleepNGBeforeError.tagName
static isClass(test: any): test is SleepNGBeforeError {
return test._tag === SleepNGBeforeError.tagName
}
getMessage(site: string) {
return `${site} - ng - Sleep NG before wait`
}
}
class SleepNGAfterError {
static tagName = 'Sleep.Ng.After'
readonly _tag = SleepNGAfterError.tagName
wait: number
constructor(w: number) {
this.wait = w
}
static isClass(test: any): test is SleepNGAfterError {
return test._tag === SleepNGAfterError.tagName
}
getMessage(site: string) {
return `${site} - ng - ${this.wait} - Sleep NG after wait`
}
}
type AllNgData = SleepNGBeforeError | SleepNGAfterError | UnknownException
class TopSleepNGError {
static tagName = 'Top.Sleep.Ng'
readonly _tag = TopSleepNGError.tagName
ngData: AllNgData
constructor(ng: AllNgData) {
this.ngData = ng
}
static isClass(test: any): test is SleepNGAfterError {
return test._tag === SleepNGAfterError.tagName
}
getLastMessage() {
const lastMessage =
this.ngData instanceof UnknownException
? ' - ng - UnknownException'
: this.ngData.getMessage('')
return lastMessage
}
getMessage(site: string) {
const index = TopOkData.getTopIndex(site)
const lastMessage = this.getLastMessage()
return `mapper2-index = ${index}${lastMessage}`
}
getTopMessage(site: string) {
const lastMessage = this.getLastMessage()
return `${site.substring(0, site.length - 2)}${lastMessage}`
}
}
class AlreadyAbortError {
static tagName = 'Already.Abort.Error'
readonly _tag = AlreadyAbortError.tagName
static isClass(test: any): test is AlreadyAbortError {
return test._tag === AlreadyAbortError.tagName
}
static getMessage(site: string) {
return `${site} - aborted - from event`
}
}
class TopAlreadyAbortError {
static tagName = 'Top.Already.Abort.Error'
readonly _tag = TopAlreadyAbortError.tagName
static isClass(test: any): test is TopAlreadyAbortError {
return test._tag === TopAlreadyAbortError.tagName
}
static getMessage(site: string) {
const topIndex = TopOkData.getTopIndex(site)
return `mapper2-index = ${topIndex} - aborted - from event`
}
}
class OriginAbortError {
static tagName = 'Origin.Abort.Error'
readonly _tag = OriginAbortError.tagName
site: string
reason: string
constructor(s: string, r: string) {
this.site = s
this.reason = r
}
static isClass(test: any): test is OriginAbortError {
return test._tag === OriginAbortError.tagName
}
getMessage() {
return `${this.site} - abort - ${this.reason}`
}
static siteCheck(abortAction: Function, topIndex?: number) {
return (site: string) => {
const lastIndex = OkData.getLastIndex(site)
let originAbortError = undefined
if (Number.isNaN(lastIndex)) {
originAbortError = new OriginAbortError(site, `${lastIndex} is NaN`)
} else if (lastIndex === undefined) {
originAbortError = new OriginAbortError(site, `${lastIndex} is undefined`)
} else if (site[site.length - 2] !== '/') {
originAbortError = new OriginAbortError(site, `lastIndex >= 10`)
} else if (topIndex && topIndex + lastIndex >= 10) {
originAbortError = new OriginAbortError(site, `topIndex + lastIndex >= 10`)
}
if (originAbortError !== undefined) {
Effect.runPromise(checkLog(originAbortError.getMessage()))
abortAction()
return Effect.fail(originAbortError)
}
return Effect.succeed(site)
}
}
}
class TopOkData {
site: string
wait: number
index: number
constructor(s: string, ok: OkData) {
this.site = s
this.wait = ok.wait
this.index = TopOkData.getTopIndex(s)
}
static isClass(test: any): test is TopOkData {
return test instanceof TopOkData
}
static getTopIndex(site: string) {
const index = site.substring(site.length - 3, site.length - 2)
return Number(index)
}
static getMessage(site: string) {
const index = TopOkData.getTopIndex(site)
return `mapper2-index = ${index}`
}
getTopMessage(opWait: number) {
return `${this.site.substring(0, this.site.length - 2)} - ok - ${this.wait + opWait}`
}
}
class OkData {
wait: number
constructor(w: number) {
this.wait = w
}
static isClass(test: any): test is OkData {
return test instanceof OkData
}
static getLastIndex(site: string) {
const index = site.slice(-1)
return Number(index)
}
getMessage(site: string) {
return `${site} - ok - ${this.wait}`
}
}
const sleep = (wait: number) =>
Effect.tryPromise<OkData>(
() =>
new Promise((resolve, reject) => {
const waitSeconds = wait / 1000
if (waitSeconds % 3 === 0) {
reject(new SleepNGBeforeError())
} else if (waitSeconds % 2 === 0) {
setTimeout(() => reject(new SleepNGAfterError(wait)), wait)
} else {
setTimeout(() => resolve(new OkData(wait)), wait)
}
})
)
const checkLog = (text: string) =>
Effect.sync(() => console.log(`time : ${new Date().toISOString()} - text: ${text}`))
const testOptions = {
timeout: 1000 * 100
}
const sumNumber = (chunkSize: number) => (a: number[], b: number[]) => {
function addArray(num: number) {
if (a.length === chunkSize) {
const aMin = Math.min(...a)
const aMinIndex = a.indexOf(aMin)
a[aMinIndex] += num
} else {
a.push(num)
}
}
b.forEach((bNum) => addArray(bNum))
return a
}
function getWaitNumber(index: number) {
const wait = index * 1000
return wait
}
function mapError_sleepNg<T>(
site: string,
effectAndThen: Effect.Effect<T, UnknownException, never>
) {
const effectAll = Effect.mapError(effectAndThen, (err) => {
if (SleepNGBeforeError.isClass(err.error)) {
return err.error
} else if (SleepNGAfterError.isClass(err.error)) {
return err.error
} else {
Effect.runPromise(checkLog(`mapError_sleepNg : UnknownException : ${err}`))
return err
}
})
return effectAll
}
function mapNumber_fromData<T, U>(data: Either.Either<T, U>) {
const result = Either.mapBoth(data, {
onLeft: (left) => {
if (OkData.isClass(left)) {
return left.wait
} else if (SleepNGBeforeError.isClass(left)) {
return 0
} else if (SleepNGAfterError.isClass(left)) {
return left.wait
}
return 0
},
onRight: (right) => {
if (OkData.isClass(right)) {
return right.wait
}
return 0
}
})
return Either.isLeft(result) ? result.left : result.right
}
const do_sleep1 = (site: string) => (wait: number) =>
Effect.try(() => {
const sleepEffect = sleep(wait)
const effectAll = mapError_sleepNg(site, sleepEffect)
return effectAll
})
const do_sleep2 = (site: string) => (wait: number) =>
Effect.try(() => {
const sleepEffect = sleep(wait)
const effectAndThen = Effect.andThen(sleepEffect, (okData) => {
const topOkData = new TopOkData(site, okData)
return topOkData
})
const effectAll = mapError_sleepNg(site, effectAndThen)
return effectAll
})
const mapper1_gen = (abortController: AbortController, topIndex?: number) => (site: string) =>
Effect.gen(function* () {
if (abortController.signal.aborted) {
const alreadyAbortError = new AlreadyAbortError()
Effect.runPromise(checkLog(AlreadyAbortError.getMessage(site)))
return Either.left(alreadyAbortError)
}
const abortEvent = () => {
Effect.runPromise(checkLog(AlreadyAbortError.getMessage(site)))
}
abortController.signal.addEventListener('abort', abortEvent)
function removeEvent() {
abortController.signal.removeEventListener('abort', abortEvent)
}
const abortAction = () => {
removeEvent()
abortController.abort()
}
const abortCheck = yield* pipe(site, OriginAbortError.siteCheck(abortAction, topIndex))
const effect = yield* pipe(abortCheck, OkData.getLastIndex, getWaitNumber, do_sleep1(site))
const either = yield* Effect.either(effect)
if (Either.isRight(either)) {
const okData = either.right
const message = okData.getMessage(site)
Effect.runPromise(checkLog(message))
} else if (Either.isLeft(either)) {
const ngData = either.left
if (SleepNGBeforeError.isClass(ngData)) {
const message = ngData.getMessage(site)
Effect.runPromise(checkLog(message))
} else if (SleepNGAfterError.isClass(ngData)) {
const message = ngData.getMessage(site)
Effect.runPromise(checkLog(message))
}
}
removeEvent()
return either
})
const mapper2_gen =
(abortController: AbortController, options: { concurrency: number }) => (sites: string[]) =>
Effect.gen(function* () {
const site = sites[0]
if (abortController.signal.aborted) {
const topAlreadyAbortError = new TopAlreadyAbortError()
Effect.runPromise(checkLog(TopAlreadyAbortError.getMessage(site)))
return Either.left(topAlreadyAbortError)
}
const abortEvent = () => {
Effect.runPromise(checkLog(TopAlreadyAbortError.getMessage(site)))
}
abortController.signal.addEventListener('abort', abortEvent)
function removeEvent() {
abortController.signal.removeEventListener('abort', abortEvent)
}
const effect2 = yield* pipe(site, TopOkData.getTopIndex, getWaitNumber, do_sleep2(site))
const either2 = yield* Effect.either(effect2)
const map = Either.mapBoth(either2, {
onLeft: (ngData) => {
const topNgData = new TopSleepNGError(ngData)
const message = topNgData.getMessage(site)
Effect.runPromise(checkLog(message))
const topMessage = topNgData.getTopMessage(site)
return topMessage
},
onRight: (topOkData) =>
Effect.gen(function* () {
const message = TopOkData.getMessage(site)
Effect.runPromise(checkLog(message))
const either1 = yield* Effect.all(
sites.map(mapper1_gen(abortController, topOkData.index)),
options
)
const arrayEither1 = either1.map(mapNumber_fromData)
const concurrency = options['concurrency']
const chunkArray = Array.chunksOf(arrayEither1, concurrency)
const waitNumber = Array.reduce(chunkArray, [0], sumNumber(concurrency))
const topMessage = topOkData.getTopMessage(Math.max(...waitNumber))
return topMessage
})
})
const result = Either.isLeft(map) ? map : Either.right(yield* map.right)
removeEvent()
return result
})
describe('effect-ts 非同期処理2つの配列', () => {
test('使い方4ー全部同時実行からの直列実行', testOptions, async () => {
const sites = [
['https://test/1/1'],
['https://test/2/1', 'https://test/2/2'],
['https://test/3/1', 'https://test/3/2', 'https://test/3/3'],
['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
[
'https://test/5/1',
'https://test/5/2',
'https://test/5/3',
'https://test/5/4',
'https://test/5/5'
],
[
'https://test/7/1',
'https://test/7/2',
'https://test/7/3',
'https://test/7/4',
'https://test/7/5',
'https://test/7/6',
'https://test/7/7'
]
]
const abortController = new AbortController()
Effect.runPromise(checkLog('[start]'))
const effect = await Effect.all(sites.map(mapper2_gen(abortController, { concurrency: 1 })), {
concurrency: Number.MAX_VALUE
})
const exit = await Effect.runPromiseExit(effect, { signal: abortController.signal })
console.log(
Exit.match(exit, {
onFailure: (cause) => `中断しました: ${Cause.pretty(cause)}`,
onSuccess: (either) => either
})
)
})
})
実行結果
time : 2025-02-26T13:57:47.543Z - text: [start]
time : 2025-02-26T13:57:47.558Z - text: mapper2-index = 3 - ng - Sleep NG before wait
time : 2025-02-26T13:57:48.564Z - text: mapper2-index = 1
time : 2025-02-26T13:57:49.562Z - text: mapper2-index = 2 - ng - 2000 - Sleep NG after wait
time : 2025-02-26T13:57:49.577Z - text: https://test/1/1 - ok - 1000
time : 2025-02-26T13:57:51.564Z - text: mapper2-index = 4 - ng - 4000 - Sleep NG after wait
time : 2025-02-26T13:57:52.563Z - text: mapper2-index = 5
time : 2025-02-26T13:57:53.578Z - text: https://test/5/1 - ok - 1000
time : 2025-02-26T13:57:54.563Z - text: mapper2-index = 7
time : 2025-02-26T13:57:55.567Z - text: https://test/7/1 - ok - 1000
time : 2025-02-26T13:57:55.581Z - text: https://test/5/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-26T13:57:55.583Z - text: https://test/5/3 - ng - Sleep NG before wait
time : 2025-02-26T13:57:57.582Z - text: https://test/7/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-26T13:57:57.582Z - text: https://test/7/3 - abort - topIndex + lastIndex >= 10
time : 2025-02-26T13:57:57.584Z - text: mapper2-index = 5 - aborted - from event
time : 2025-02-26T13:57:57.584Z - text: mapper2-index = 7 - aborted - from event
time : 2025-02-26T13:57:57.584Z - text: https://test/5/4 - aborted - from event
中断しました: All fibers interrupted without errors.
実行結果(※中断データを除外した場合)
time : 2025-02-26T13:59:52.548Z - text: [start]
time : 2025-02-26T13:59:52.563Z - text: mapper2-index = 3 - ng - Sleep NG before wait
time : 2025-02-26T13:59:53.572Z - text: mapper2-index = 1
time : 2025-02-26T13:59:54.562Z - text: mapper2-index = 2 - ng - 2000 - Sleep NG after wait
time : 2025-02-26T13:59:54.582Z - text: https://test/1/1 - ok - 1000
time : 2025-02-26T13:59:56.572Z - text: mapper2-index = 4 - ng - 4000 - Sleep NG after wait
time : 2025-02-26T13:59:57.577Z - text: mapper2-index = 5
time : 2025-02-26T13:59:58.596Z - text: https://test/5/1 - ok - 1000
time : 2025-02-26T13:59:59.563Z - text: mapper2-index = 7
time : 2025-02-26T14:00:00.580Z - text: https://test/7/1 - ok - 1000
time : 2025-02-26T14:00:00.602Z - text: https://test/5/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-26T14:00:00.604Z - text: https://test/5/3 - ng - Sleep NG before wait
time : 2025-02-26T14:00:02.594Z - text: https://test/7/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-26T14:00:04.609Z - text: https://test/5/4 - ng - 4000 - Sleep NG after wait
[
{ _id: 'Either', _tag: 'Right', right: 'https://test/1 - ok - 2000' },
{
_id: 'Either',
_tag: 'Left',
left: 'https://test/2 - ng - 2000 - Sleep NG after wait'
},
{
_id: 'Either',
_tag: 'Left',
left: 'https://test/3 - ng - Sleep NG before wait'
},
{
_id: 'Either',
_tag: 'Left',
left: 'https://test/4 - ng - 4000 - Sleep NG after wait'
},
{
_id: 'Either',
_tag: 'Right',
right: 'https://test/5 - ok - 12000'
},
{
_id: 'Either',
_tag: 'Right',
right: 'https://test/7 - ok - 10000'
}
]
全部同時実行からの最大2個まで同時実行
import { describe, test } from 'vitest'
import { Effect, pipe, Array, Either, Exit, Cause } from 'effect'
import { UnknownException } from 'effect/Cause'
class SleepNGBeforeError {
static tagName = 'Sleep.Ng.Before'
readonly _tag = SleepNGBeforeError.tagName
static isClass(test: any): test is SleepNGBeforeError {
return test._tag === SleepNGBeforeError.tagName
}
getMessage(site: string) {
return `${site} - ng - Sleep NG before wait`
}
}
class SleepNGAfterError {
static tagName = 'Sleep.Ng.After'
readonly _tag = SleepNGAfterError.tagName
wait: number
constructor(w: number) {
this.wait = w
}
static isClass(test: any): test is SleepNGAfterError {
return test._tag === SleepNGAfterError.tagName
}
getMessage(site: string) {
return `${site} - ng - ${this.wait} - Sleep NG after wait`
}
}
type AllNgData = SleepNGBeforeError | SleepNGAfterError | UnknownException
class TopSleepNGError {
static tagName = 'Top.Sleep.Ng'
readonly _tag = TopSleepNGError.tagName
ngData: AllNgData
constructor(ng: AllNgData) {
this.ngData = ng
}
static isClass(test: any): test is SleepNGAfterError {
return test._tag === SleepNGAfterError.tagName
}
getLastMessage() {
const lastMessage =
this.ngData instanceof UnknownException
? ' - ng - UnknownException'
: this.ngData.getMessage('')
return lastMessage
}
getMessage(site: string) {
const index = TopOkData.getTopIndex(site)
const lastMessage = this.getLastMessage()
return `mapper2-index = ${index}${lastMessage}`
}
getTopMessage(site: string) {
const lastMessage = this.getLastMessage()
return `${site.substring(0, site.length - 2)}${lastMessage}`
}
}
class AlreadyAbortError {
static tagName = 'Already.Abort.Error'
readonly _tag = AlreadyAbortError.tagName
static isClass(test: any): test is AlreadyAbortError {
return test._tag === AlreadyAbortError.tagName
}
static getMessage(site: string) {
return `${site} - aborted - from event`
}
}
class TopAlreadyAbortError {
static tagName = 'Top.Already.Abort.Error'
readonly _tag = TopAlreadyAbortError.tagName
static isClass(test: any): test is TopAlreadyAbortError {
return test._tag === TopAlreadyAbortError.tagName
}
static getMessage(site: string) {
const topIndex = TopOkData.getTopIndex(site)
return `mapper2-index = ${topIndex} - aborted - from event`
}
}
class OriginAbortError {
static tagName = 'Origin.Abort.Error'
readonly _tag = OriginAbortError.tagName
site: string
reason: string
constructor(s: string, r: string) {
this.site = s
this.reason = r
}
static isClass(test: any): test is OriginAbortError {
return test._tag === OriginAbortError.tagName
}
getMessage() {
return `${this.site} - abort - ${this.reason}`
}
static siteCheck(abortAction: Function, topIndex?: number) {
return (site: string) => {
const lastIndex = OkData.getLastIndex(site)
let originAbortError = undefined
if (Number.isNaN(lastIndex)) {
originAbortError = new OriginAbortError(site, `${lastIndex} is NaN`)
} else if (lastIndex === undefined) {
originAbortError = new OriginAbortError(site, `${lastIndex} is undefined`)
} else if (site[site.length - 2] !== '/') {
originAbortError = new OriginAbortError(site, `lastIndex >= 10`)
} else if (topIndex && topIndex + lastIndex >= 10) {
originAbortError = new OriginAbortError(site, `topIndex + lastIndex >= 10`)
}
if (originAbortError !== undefined) {
Effect.runPromise(checkLog(originAbortError.getMessage()))
abortAction()
return Effect.fail(originAbortError)
}
return Effect.succeed(site)
}
}
}
class TopOkData {
site: string
wait: number
index: number
constructor(s: string, ok: OkData) {
this.site = s
this.wait = ok.wait
this.index = TopOkData.getTopIndex(s)
}
static isClass(test: any): test is TopOkData {
return test instanceof TopOkData
}
static getTopIndex(site: string) {
const index = site.substring(site.length - 3, site.length - 2)
return Number(index)
}
static getMessage(site: string) {
const index = TopOkData.getTopIndex(site)
return `mapper2-index = ${index}`
}
getTopMessage(opWait: number) {
return `${this.site.substring(0, this.site.length - 2)} - ok - ${this.wait + opWait}`
}
}
class OkData {
wait: number
constructor(w: number) {
this.wait = w
}
static isClass(test: any): test is OkData {
return test instanceof OkData
}
static getLastIndex(site: string) {
const index = site.slice(-1)
return Number(index)
}
getMessage(site: string) {
return `${site} - ok - ${this.wait}`
}
}
const sleep = (wait: number) =>
Effect.tryPromise<OkData>(
() =>
new Promise((resolve, reject) => {
const waitSeconds = wait / 1000
if (waitSeconds % 3 === 0) {
reject(new SleepNGBeforeError())
} else if (waitSeconds % 2 === 0) {
setTimeout(() => reject(new SleepNGAfterError(wait)), wait)
} else {
setTimeout(() => resolve(new OkData(wait)), wait)
}
})
)
const checkLog = (text: string) =>
Effect.sync(() => console.log(`time : ${new Date().toISOString()} - text: ${text}`))
const testOptions = {
timeout: 1000 * 100
}
const sumNumber = (chunkSize: number) => (a: number[], b: number[]) => {
function addArray(num: number) {
if (a.length === chunkSize) {
const aMin = Math.min(...a)
const aMinIndex = a.indexOf(aMin)
a[aMinIndex] += num
} else {
a.push(num)
}
}
b.forEach((bNum) => addArray(bNum))
return a
}
function getWaitNumber(index: number) {
const wait = index * 1000
return wait
}
function mapError_sleepNg<T>(
site: string,
effectAndThen: Effect.Effect<T, UnknownException, never>
) {
const effectAll = Effect.mapError(effectAndThen, (err) => {
if (SleepNGBeforeError.isClass(err.error)) {
return err.error
} else if (SleepNGAfterError.isClass(err.error)) {
return err.error
} else {
Effect.runPromise(checkLog(`mapError_sleepNg : UnknownException : ${err}`))
return err
}
})
return effectAll
}
function mapNumber_fromData<T, U>(data: Either.Either<T, U>) {
const result = Either.mapBoth(data, {
onLeft: (left) => {
if (OkData.isClass(left)) {
return left.wait
} else if (SleepNGBeforeError.isClass(left)) {
return 0
} else if (SleepNGAfterError.isClass(left)) {
return left.wait
}
return 0
},
onRight: (right) => {
if (OkData.isClass(right)) {
return right.wait
}
return 0
}
})
return Either.isLeft(result) ? result.left : result.right
}
const do_sleep1 = (site: string) => (wait: number) =>
Effect.try(() => {
const sleepEffect = sleep(wait)
const effectAll = mapError_sleepNg(site, sleepEffect)
return effectAll
})
const do_sleep2 = (site: string) => (wait: number) =>
Effect.try(() => {
const sleepEffect = sleep(wait)
const effectAndThen = Effect.andThen(sleepEffect, (okData) => {
const topOkData = new TopOkData(site, okData)
return topOkData
})
const effectAll = mapError_sleepNg(site, effectAndThen)
return effectAll
})
const mapper1_gen = (abortController: AbortController, topIndex?: number) => (site: string) =>
Effect.gen(function* () {
if (abortController.signal.aborted) {
const alreadyAbortError = new AlreadyAbortError()
Effect.runPromise(checkLog(AlreadyAbortError.getMessage(site)))
return Either.left(alreadyAbortError)
}
const abortEvent = () => {
Effect.runPromise(checkLog(AlreadyAbortError.getMessage(site)))
}
abortController.signal.addEventListener('abort', abortEvent)
function removeEvent() {
abortController.signal.removeEventListener('abort', abortEvent)
}
const abortAction = () => {
removeEvent()
abortController.abort()
}
const abortCheck = yield* pipe(site, OriginAbortError.siteCheck(abortAction, topIndex))
const effect = yield* pipe(abortCheck, OkData.getLastIndex, getWaitNumber, do_sleep1(site))
const either = yield* Effect.either(effect)
if (Either.isRight(either)) {
const okData = either.right
const message = okData.getMessage(site)
Effect.runPromise(checkLog(message))
} else if (Either.isLeft(either)) {
const ngData = either.left
if (SleepNGBeforeError.isClass(ngData)) {
const message = ngData.getMessage(site)
Effect.runPromise(checkLog(message))
} else if (SleepNGAfterError.isClass(ngData)) {
const message = ngData.getMessage(site)
Effect.runPromise(checkLog(message))
}
}
removeEvent()
return either
})
const mapper2_gen =
(abortController: AbortController, options: { concurrency: number }) => (sites: string[]) =>
Effect.gen(function* () {
const site = sites[0]
if (abortController.signal.aborted) {
const topAlreadyAbortError = new TopAlreadyAbortError()
Effect.runPromise(checkLog(TopAlreadyAbortError.getMessage(site)))
return Either.left(topAlreadyAbortError)
}
const abortEvent = () => {
Effect.runPromise(checkLog(TopAlreadyAbortError.getMessage(site)))
}
abortController.signal.addEventListener('abort', abortEvent)
function removeEvent() {
abortController.signal.removeEventListener('abort', abortEvent)
}
const effect2 = yield* pipe(site, TopOkData.getTopIndex, getWaitNumber, do_sleep2(site))
const either2 = yield* Effect.either(effect2)
const map = Either.mapBoth(either2, {
onLeft: (ngData) => {
const topNgData = new TopSleepNGError(ngData)
const message = topNgData.getMessage(site)
Effect.runPromise(checkLog(message))
const topMessage = topNgData.getTopMessage(site)
return topMessage
},
onRight: (topOkData) =>
Effect.gen(function* () {
const message = TopOkData.getMessage(site)
Effect.runPromise(checkLog(message))
const either1 = yield* Effect.all(
sites.map(mapper1_gen(abortController, topOkData.index)),
options
)
const arrayEither1 = either1.map(mapNumber_fromData)
const concurrency = options['concurrency']
const chunkArray = Array.chunksOf(arrayEither1, concurrency)
const waitNumber = Array.reduce(chunkArray, [0], sumNumber(concurrency))
const topMessage = topOkData.getTopMessage(Math.max(...waitNumber))
return topMessage
})
})
const result = Either.isLeft(map) ? map : Either.right(yield* map.right)
removeEvent()
return result
})
describe('effect-ts 非同期処理2つの配列', () => {
test('使い方6ー全部同時実行からの最大2個まで同時実行', testOptions, async () => {
const sites = [
['https://test/1/1'],
['https://test/2/1', 'https://test/2/2'],
['https://test/3/1', 'https://test/3/2', 'https://test/3/3'],
['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
[
'https://test/5/1',
'https://test/5/2',
'https://test/5/3',
'https://test/5/4',
'https://test/5/5'
],
[
'https://test/7/1',
'https://test/7/2',
'https://test/7/3',
'https://test/7/4',
'https://test/7/5',
'https://test/7/6',
'https://test/7/7'
]
]
const abortController = new AbortController()
Effect.runPromise(checkLog('[start]'))
const effect = await Effect.all(sites.map(mapper2_gen(abortController, { concurrency: 2 })), {
concurrency: Number.MAX_VALUE
})
const exit = await Effect.runPromiseExit(effect, { signal: abortController.signal })
console.log(
Exit.match(exit, {
onFailure: (cause) => `中断しました: ${Cause.pretty(cause)}`,
onSuccess: (either) => either
})
)
})
})
実行結果
time : 2025-02-26T14:01:36.468Z - text: [start]
time : 2025-02-26T14:01:36.482Z - text: mapper2-index = 3 - ng - Sleep NG before wait
time : 2025-02-26T14:01:37.484Z - text: mapper2-index = 1
time : 2025-02-26T14:01:38.485Z - text: mapper2-index = 2 - ng - 2000 - Sleep NG after wait
time : 2025-02-26T14:01:38.499Z - text: https://test/1/1 - ok - 1000
time : 2025-02-26T14:01:40.484Z - text: mapper2-index = 4 - ng - 4000 - Sleep NG after wait
time : 2025-02-26T14:01:41.495Z - text: mapper2-index = 5
time : 2025-02-26T14:01:42.501Z - text: https://test/5/1 - ok - 1000
time : 2025-02-26T14:01:42.504Z - text: https://test/5/3 - ng - Sleep NG before wait
time : 2025-02-26T14:01:43.495Z - text: mapper2-index = 7
time : 2025-02-26T14:01:43.503Z - text: https://test/5/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-26T14:01:43.503Z - text: https://test/5/5 - abort - topIndex + lastIndex >= 10
time : 2025-02-26T14:01:43.505Z - text: mapper2-index = 5 - aborted - from event
time : 2025-02-26T14:01:43.506Z - text: mapper2-index = 7 - aborted - from event
time : 2025-02-26T14:01:43.506Z - text: https://test/5/4 - aborted - from event
time : 2025-02-26T14:01:43.507Z - text: https://test/7/1 - aborted - from event
time : 2025-02-26T14:01:43.507Z - text: https://test/7/2 - aborted - from event
中断しました: All fibers interrupted without errors.
実行結果(※中断データを除外した場合)
time : 2025-02-26T14:03:02.231Z - text: [start]
time : 2025-02-26T14:03:02.251Z - text: mapper2-index = 3 - ng - Sleep NG before wait
time : 2025-02-26T14:03:03.249Z - text: mapper2-index = 1
time : 2025-02-26T14:03:04.262Z - text: mapper2-index = 2 - ng - 2000 - Sleep NG after wait
time : 2025-02-26T14:03:04.265Z - text: https://test/1/1 - ok - 1000
time : 2025-02-26T14:03:06.265Z - text: mapper2-index = 4 - ng - 4000 - Sleep NG after wait
time : 2025-02-26T14:03:07.253Z - text: mapper2-index = 5
time : 2025-02-26T14:03:08.260Z - text: https://test/5/1 - ok - 1000
time : 2025-02-26T14:03:08.265Z - text: https://test/5/3 - ng - Sleep NG before wait
time : 2025-02-26T14:03:09.251Z - text: mapper2-index = 7
time : 2025-02-26T14:03:09.269Z - text: https://test/5/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-26T14:03:10.255Z - text: https://test/7/1 - ok - 1000
time : 2025-02-26T14:03:11.264Z - text: https://test/7/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-26T14:03:12.275Z - text: https://test/5/4 - ng - 4000 - Sleep NG after wait
[
{ _id: 'Either', _tag: 'Right', right: 'https://test/1 - ok - 2000' },
{
_id: 'Either',
_tag: 'Left',
left: 'https://test/2 - ng - 2000 - Sleep NG after wait'
},
{
_id: 'Either',
_tag: 'Left',
left: 'https://test/3 - ng - Sleep NG before wait'
},
{
_id: 'Either',
_tag: 'Left',
left: 'https://test/4 - ng - 4000 - Sleep NG after wait'
},
{
_id: 'Either',
_tag: 'Right',
right: 'https://test/5 - ok - 10000'
},
{ _id: 'Either', _tag: 'Right', right: 'https://test/7 - ok - 9000' }
]