配列の配列の非同期処理で対象API(以下ではsleep)がrejectする場合のメモ
書き方は、まだまだどちらがよいかわからないところがある
前回
配列が1つの場合
全部同時実行
import { describe, test } from 'vitest'
import { Effect, pipe, Either } from 'effect'
import { UnknownException } from 'effect/Cause'
class SleepNGBeforeError {
static tagName = 'Sleep.Ng.Before'
readonly _tag = SleepNGBeforeError.tagName
static isClass(test: any): test is SleepNGBeforeError {
return test._tag === SleepNGBeforeError.tagName
}
getMessage(site: string) {
return `${site} - ng - Sleep NG before wait`
}
}
class SleepNGAfterError {
static tagName = 'Sleep.Ng.After'
readonly _tag = SleepNGAfterError.tagName
wait: number
constructor(w: number) {
this.wait = w
}
static isClass(test: any): test is SleepNGAfterError {
return test._tag === SleepNGAfterError.tagName
}
getMessage(site: string) {
return `${site} - ng - ${this.wait} - Sleep NG after wait`
}
}
class OkData {
wait: number
constructor(w: number) {
this.wait = w
}
static isClass(test: any): test is OkData {
return test instanceof OkData
}
static getLastIndex(site: string) {
const index = site.slice(-1)
return Number(index)
}
getMessage(site: string) {
return `${site} - ok - ${this.wait}`
}
}
const sleep = (wait: number) =>
Effect.tryPromise<OkData>(
() =>
new Promise((resolve, reject) => {
const waitSeconds = wait / 1000
if (waitSeconds % 3 === 0) {
reject(new SleepNGBeforeError())
} else if (waitSeconds % 2 === 0) {
setTimeout(() => reject(new SleepNGAfterError(wait)), wait)
} else {
setTimeout(() => resolve(new OkData(wait)), wait)
}
})
)
const checkLog = (text: string) =>
Effect.sync(() => console.log(`time : ${new Date().toISOString()} - text: ${text}`))
const testOptions = {
timeout: 1000 * 100
}
function getWaitNumber(index: number) {
const wait = index * 1000
return wait
}
function mapError_sleepNg<T>(
site: string,
effectAndThen: Effect.Effect<T, UnknownException, never>
) {
const effectAll = Effect.mapError(effectAndThen, (err) => {
if (SleepNGBeforeError.isClass(err.error)) {
return err.error
} else if (SleepNGAfterError.isClass(err.error)) {
return err.error
} else {
Effect.runPromise(checkLog(`mapError_sleepNg : UnknownException : ${err}`))
return err
}
})
return effectAll
}
const do_sleep1 = (site: string) => (wait: number) =>
Effect.try(() => {
const sleepEffect = sleep(wait)
const effectAll = mapError_sleepNg(site, sleepEffect)
return effectAll
})
const mapper1_gen = (site: string) =>
Effect.gen(function* () {
const effect = yield* pipe(site, OkData.getLastIndex, getWaitNumber, do_sleep1(site))
const either = yield* Effect.either(effect)
if (Either.isRight(either)) {
const okData = either.right
const message = okData.getMessage(site)
Effect.runPromise(checkLog(message))
} else if (Either.isLeft(either)) {
const ngData = either.left
if (SleepNGBeforeError.isClass(ngData)) {
const message = ngData.getMessage(site)
Effect.runPromise(checkLog(message))
} else if (SleepNGAfterError.isClass(ngData)) {
const message = ngData.getMessage(site)
Effect.runPromise(checkLog(message))
}
}
return either
})
describe('effect-ts 非同期処理1つの配列', () => {
test('使い方1ー全部同時実行', testOptions, async () => {
const sites = [
'https://test/1',
'https://test/2',
'https://test/3',
'https://test/4',
'https://test/5'
]
Effect.runPromise(checkLog('[start]'))
const effect = await Effect.all(sites.map(mapper1_gen), {
concurrency: sites.length
})
const either = await Effect.runPromise(effect)
console.log(either)
})
})
実行結果
time : 2025-02-24T11:24:36.510Z - text: [start]
time : 2025-02-24T11:24:36.523Z - text: https://test/3 - ng - Sleep NG before wait
time : 2025-02-24T11:24:37.526Z - text: https://test/1 - ok - 1000
time : 2025-02-24T11:24:38.538Z - text: https://test/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-24T11:24:40.525Z - text: https://test/4 - ng - 4000 - Sleep NG after wait
time : 2025-02-24T11:24:41.525Z - text: https://test/5 - ok - 5000
[
{ _id: 'Either', _tag: 'Right', right: OkData { wait: 1000 } },
{
_id: 'Either',
_tag: 'Left',
left: SleepNGAfterError { _tag: 'Sleep.Ng.After', wait: 2000 }
},
{
_id: 'Either',
_tag: 'Left',
left: SleepNGBeforeError { _tag: 'Sleep.Ng.Before' }
},
{
_id: 'Either',
_tag: 'Left',
left: SleepNGAfterError { _tag: 'Sleep.Ng.After', wait: 4000 }
},
{ _id: 'Either', _tag: 'Right', right: OkData { wait: 5000 } }
]
最大2個まで同時実行
import { describe, test } from 'vitest'
import { Effect, pipe, Either } from 'effect'
import { UnknownException } from 'effect/Cause'
class SleepNGBeforeError {
static tagName = 'Sleep.Ng.Before'
readonly _tag = SleepNGBeforeError.tagName
static isClass(test: any): test is SleepNGBeforeError {
return test._tag === SleepNGBeforeError.tagName
}
getMessage(site: string) {
return `${site} - ng - Sleep NG before wait`
}
}
class SleepNGAfterError {
static tagName = 'Sleep.Ng.After'
readonly _tag = SleepNGAfterError.tagName
wait: number
constructor(w: number) {
this.wait = w
}
static isClass(test: any): test is SleepNGAfterError {
return test._tag === SleepNGAfterError.tagName
}
getMessage(site: string) {
return `${site} - ng - ${this.wait} - Sleep NG after wait`
}
}
class OkData {
wait: number
constructor(w: number) {
this.wait = w
}
static isClass(test: any): test is OkData {
return test instanceof OkData
}
static getLastIndex(site: string) {
const index = site.slice(-1)
return Number(index)
}
getMessage(site: string) {
return `${site} - ok - ${this.wait}`
}
}
const sleep = (wait: number) =>
Effect.tryPromise<OkData>(
() =>
new Promise((resolve, reject) => {
const waitSeconds = wait / 1000
if (waitSeconds % 3 === 0) {
reject(new SleepNGBeforeError())
} else if (waitSeconds % 2 === 0) {
setTimeout(() => reject(new SleepNGAfterError(wait)), wait)
} else {
setTimeout(() => resolve(new OkData(wait)), wait)
}
})
)
const checkLog = (text: string) =>
Effect.sync(() => console.log(`time : ${new Date().toISOString()} - text: ${text}`))
const testOptions = {
timeout: 1000 * 100
}
function getWaitNumber(index: number) {
const wait = index * 1000
return wait
}
function mapError_sleepNg<T>(
site: string,
effectAndThen: Effect.Effect<T, UnknownException, never>
) {
const effectAll = Effect.mapError(effectAndThen, (err) => {
if (SleepNGBeforeError.isClass(err.error)) {
return err.error
} else if (SleepNGAfterError.isClass(err.error)) {
return err.error
} else {
Effect.runPromise(checkLog(`mapError_sleepNg : UnknownException : ${err}`))
return err
}
})
return effectAll
}
const do_sleep1 = (site: string) => (wait: number) =>
Effect.try(() => {
const sleepEffect = sleep(wait)
const effectAll = mapError_sleepNg(site, sleepEffect)
return effectAll
})
const mapper1_gen = (site: string) =>
Effect.gen(function* () {
const effect = yield* pipe(site, OkData.getLastIndex, getWaitNumber, do_sleep1(site))
const either = yield* Effect.either(effect)
if (Either.isRight(either)) {
const okData = either.right
const message = okData.getMessage(site)
Effect.runPromise(checkLog(message))
} else if (Either.isLeft(either)) {
const ngData = either.left
if (SleepNGBeforeError.isClass(ngData)) {
const message = ngData.getMessage(site)
Effect.runPromise(checkLog(message))
} else if (SleepNGAfterError.isClass(ngData)) {
const message = ngData.getMessage(site)
Effect.runPromise(checkLog(message))
}
}
return either
})
describe('effect-ts 非同期処理1つの配列', () => {
test('使い方2ー最大2個まで同時実行', testOptions, async () => {
const sites = [
'https://test/1',
'https://test/2',
'https://test/3',
'https://test/4',
'https://test/5'
]
Effect.runPromise(checkLog('[start]'))
const effect = await Effect.all(sites.map(mapper1_gen), {
concurrency: 2
})
const either = await Effect.runPromise(effect)
console.log(either)
})
})
実行結果
time : 2025-02-24T11:30:04.377Z - text: [start]
time : 2025-02-24T11:30:05.391Z - text: https://test/1 - ok - 1000
time : 2025-02-24T11:30:05.400Z - text: https://test/3 - ng - Sleep NG before wait
time : 2025-02-24T11:30:06.401Z - text: https://test/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-24T11:30:09.406Z - text: https://test/4 - ng - 4000 - Sleep NG after wait
time : 2025-02-24T11:30:11.407Z - text: https://test/5 - ok - 5000
[
{ _id: 'Either', _tag: 'Right', right: OkData { wait: 1000 } },
{
_id: 'Either',
_tag: 'Left',
left: SleepNGAfterError { _tag: 'Sleep.Ng.After', wait: 2000 }
},
{
_id: 'Either',
_tag: 'Left',
left: SleepNGBeforeError { _tag: 'Sleep.Ng.Before' }
},
{
_id: 'Either',
_tag: 'Left',
left: SleepNGAfterError { _tag: 'Sleep.Ng.After', wait: 4000 }
},
{ _id: 'Either', _tag: 'Right', right: OkData { wait: 5000 } }
]
配列が2つの場合(シンプル)
全部同時実行からの全部同時実行
import { describe, test } from 'vitest'
import { Effect, pipe, Array, Either } from 'effect'
import { UnknownException } from 'effect/Cause'
class SleepNGBeforeError {
static tagName = 'Sleep.Ng.Before'
readonly _tag = SleepNGBeforeError.tagName
static isClass(test: any): test is SleepNGBeforeError {
return test._tag === SleepNGBeforeError.tagName
}
getMessage(site: string) {
return `${site} - ng - Sleep NG before wait`
}
}
class SleepNGAfterError {
static tagName = 'Sleep.Ng.After'
readonly _tag = SleepNGAfterError.tagName
wait: number
constructor(w: number) {
this.wait = w
}
static isClass(test: any): test is SleepNGAfterError {
return test._tag === SleepNGAfterError.tagName
}
getMessage(site: string) {
return `${site} - ng - ${this.wait} - Sleep NG after wait`
}
}
type AllNgData = SleepNGBeforeError | SleepNGAfterError | UnknownException
class TopSleepNGError {
static tagName = 'Top.Sleep.Ng'
readonly _tag = TopSleepNGError.tagName
ngData: AllNgData
constructor(ng: AllNgData) {
this.ngData = ng
}
static isClass(test: any): test is SleepNGAfterError {
return test._tag === SleepNGAfterError.tagName
}
getLastMessage() {
const lastMessage =
this.ngData instanceof UnknownException
? ' - ng - UnknownException'
: this.ngData.getMessage('')
return lastMessage
}
getMessage(site: string) {
const index = TopOkData.getTopIndex(site)
const lastMessage = this.getLastMessage()
return `mapper2-index = ${index}${lastMessage}`
}
getTopMessage(site: string) {
const lastMessage = this.getLastMessage()
return `${site.substring(0, site.length - 2)}${lastMessage}`
}
}
class TopOkData {
site: string
wait: number
constructor(s: string, ok: OkData) {
this.site = s
this.wait = ok.wait
}
static isClass(test: any): test is TopOkData {
return test instanceof TopOkData
}
static getTopIndex(site: string) {
const index = site.substring(site.length - 3, site.length - 2)
return Number(index)
}
static getMessage(site: string) {
const index = TopOkData.getTopIndex(site)
return `mapper2-index = ${index}`
}
getTopMessage(opWait: number) {
return `${this.site.substring(0, this.site.length - 2)} - ok - ${this.wait + opWait}`
}
}
class OkData {
wait: number
constructor(w: number) {
this.wait = w
}
static isClass(test: any): test is OkData {
return test instanceof OkData
}
static getLastIndex(site: string) {
const index = site.slice(-1)
return Number(index)
}
getMessage(site: string) {
return `${site} - ok - ${this.wait}`
}
}
const sleep = (wait: number) =>
Effect.tryPromise<OkData>(
() =>
new Promise((resolve, reject) => {
const waitSeconds = wait / 1000
if (waitSeconds % 3 === 0) {
reject(new SleepNGBeforeError())
} else if (waitSeconds % 2 === 0) {
setTimeout(() => reject(new SleepNGAfterError(wait)), wait)
} else {
setTimeout(() => resolve(new OkData(wait)), wait)
}
})
)
const checkLog = (text: string) =>
Effect.sync(() => console.log(`time : ${new Date().toISOString()} - text: ${text}`))
const testOptions = {
timeout: 1000 * 100
}
const sumNumber = (chunkSize: number) => (a: number[], b: number[]) => {
function addArray(num: number) {
if (a.length === chunkSize) {
const aMin = Math.min(...a)
const aMinIndex = a.indexOf(aMin)
a[aMinIndex] += num
} else {
a.push(num)
}
}
b.forEach((bNum) => addArray(bNum))
return a
}
function getWaitNumber(index: number) {
const wait = index * 1000
return wait
}
function mapError_sleepNg<T>(
site: string,
effectAndThen: Effect.Effect<T, UnknownException, never>
) {
const effectAll = Effect.mapError(effectAndThen, (err) => {
if (SleepNGBeforeError.isClass(err.error)) {
return err.error
} else if (SleepNGAfterError.isClass(err.error)) {
return err.error
} else {
Effect.runPromise(checkLog(`mapError_sleepNg : UnknownException : ${err}`))
return err
}
})
return effectAll
}
function mapNumber_fromData<T, U>(data: Either.Either<T, U>) {
const result = Either.mapBoth(data, {
onLeft: (left) => {
if (OkData.isClass(left)) {
return left.wait
} else if (SleepNGBeforeError.isClass(left)) {
return 0
} else if (SleepNGAfterError.isClass(left)) {
return left.wait
}
return 0
},
onRight: (right) => {
if (OkData.isClass(right)) {
return right.wait
}
return 0
}
})
return Either.isLeft(result) ? result.left : result.right
}
const do_sleep1 = (site: string) => (wait: number) =>
Effect.try(() => {
const sleepEffect = sleep(wait)
const effectAll = mapError_sleepNg(site, sleepEffect)
return effectAll
})
const do_sleep2 = (site: string) => (wait: number) =>
Effect.try(() => {
const sleepEffect = sleep(wait)
const effectAndThen = Effect.andThen(sleepEffect, (okData) => {
const topOkData = new TopOkData(site, okData)
return topOkData
})
const effectAll = mapError_sleepNg(site, effectAndThen)
return effectAll
})
const mapper1_gen = (site: string) =>
Effect.gen(function* () {
const effect = yield* pipe(site, OkData.getLastIndex, getWaitNumber, do_sleep1(site))
const either = yield* Effect.either(effect)
if (Either.isRight(either)) {
const okData = either.right
const message = okData.getMessage(site)
Effect.runPromise(checkLog(message))
} else if (Either.isLeft(either)) {
const ngData = either.left
if (SleepNGBeforeError.isClass(ngData)) {
const message = ngData.getMessage(site)
Effect.runPromise(checkLog(message))
} else if (SleepNGAfterError.isClass(ngData)) {
const message = ngData.getMessage(site)
Effect.runPromise(checkLog(message))
}
}
return either
})
const mapper2_gen = (options: { concurrency: number }) => (sites: string[]) =>
Effect.gen(function* () {
const site = sites[0]
const effect2 = yield* pipe(site, TopOkData.getTopIndex, getWaitNumber, do_sleep2(site))
const either2 = yield* Effect.either(effect2)
const map = Either.mapBoth(either2, {
onLeft: (ngData) => {
const topNgData = new TopSleepNGError(ngData)
const message = topNgData.getMessage(site)
Effect.runPromise(checkLog(message))
const topMessage = topNgData.getTopMessage(site)
return topMessage
},
onRight: (topOkData) =>
Effect.gen(function* () {
const message = TopOkData.getMessage(site)
Effect.runPromise(checkLog(message))
const either1 = yield* Effect.all(sites.map(mapper1_gen), options)
const arrayEither1 = either1.map(mapNumber_fromData)
const concurrency = options['concurrency']
const chunkArray = Array.chunksOf(arrayEither1, concurrency)
const waitNumber = Array.reduce(chunkArray, [0], sumNumber(concurrency))
const topMessage = topOkData.getTopMessage(Math.max(...waitNumber))
return topMessage
})
})
const result = Either.isLeft(map) ? map : Either.right(yield* map.right)
return result
})
describe('effect-ts 非同期処理2つの配列', () => {
test('使い方1ー全部同時実行からの全部同時実行', testOptions, async () => {
const sites = [
// ['https://test/1/1'],
// ['https://test/2/1', 'https://test/2/2']
// ['https://test/3/1', 'https://test/3/2', 'https://test/3/3']
// ['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
[
'https://test/5/1',
'https://test/5/2',
'https://test/5/3',
'https://test/5/4',
'https://test/5/5'
]
]
Effect.runPromise(checkLog('[start]'))
const effect = await Effect.all(sites.map(mapper2_gen({ concurrency: Number.MAX_VALUE })), {
concurrency: Number.MAX_VALUE
})
const result = await Effect.runPromise(effect)
console.log(result)
})
})
実行結果
time : 2025-02-24T11:32:12.775Z - text: [start]
time : 2025-02-24T11:32:17.820Z - text: mapper2-index = 5
time : 2025-02-24T11:32:17.833Z - text: https://test/5/3 - ng - Sleep NG before wait
time : 2025-02-24T11:32:18.839Z - text: https://test/5/1 - ok - 1000
time : 2025-02-24T11:32:19.837Z - text: https://test/5/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-24T11:32:21.829Z - text: https://test/5/4 - ng - 4000 - Sleep NG after wait
time : 2025-02-24T11:32:22.837Z - text: https://test/5/5 - ok - 5000
[
{
_id: 'Either',
_tag: 'Right',
right: 'https://test/5 - ok - 10000'
}
]
全部同時実行からの直列実行
import { describe, test } from 'vitest'
import { Effect, pipe, Array, Either } from 'effect'
import { UnknownException } from 'effect/Cause'
class SleepNGBeforeError {
static tagName = 'Sleep.Ng.Before'
readonly _tag = SleepNGBeforeError.tagName
static isClass(test: any): test is SleepNGBeforeError {
return test._tag === SleepNGBeforeError.tagName
}
getMessage(site: string) {
return `${site} - ng - Sleep NG before wait`
}
}
class SleepNGAfterError {
static tagName = 'Sleep.Ng.After'
readonly _tag = SleepNGAfterError.tagName
wait: number
constructor(w: number) {
this.wait = w
}
static isClass(test: any): test is SleepNGAfterError {
return test._tag === SleepNGAfterError.tagName
}
getMessage(site: string) {
return `${site} - ng - ${this.wait} - Sleep NG after wait`
}
}
type AllNgData = SleepNGBeforeError | SleepNGAfterError | UnknownException
class TopSleepNGError {
static tagName = 'Top.Sleep.Ng'
readonly _tag = TopSleepNGError.tagName
ngData: AllNgData
constructor(ng: AllNgData) {
this.ngData = ng
}
static isClass(test: any): test is SleepNGAfterError {
return test._tag === SleepNGAfterError.tagName
}
getLastMessage() {
const lastMessage =
this.ngData instanceof UnknownException
? ' - ng - UnknownException'
: this.ngData.getMessage('')
return lastMessage
}
getMessage(site: string) {
const index = TopOkData.getTopIndex(site)
const lastMessage = this.getLastMessage()
return `mapper2-index = ${index}${lastMessage}`
}
getTopMessage(site: string) {
const lastMessage = this.getLastMessage()
return `${site.substring(0, site.length - 2)}${lastMessage}`
}
}
class TopOkData {
site: string
wait: number
constructor(s: string, ok: OkData) {
this.site = s
this.wait = ok.wait
}
static isClass(test: any): test is TopOkData {
return test instanceof TopOkData
}
static getTopIndex(site: string) {
const index = site.substring(site.length - 3, site.length - 2)
return Number(index)
}
static getMessage(site: string) {
const index = TopOkData.getTopIndex(site)
return `mapper2-index = ${index}`
}
getTopMessage(opWait: number) {
return `${this.site.substring(0, this.site.length - 2)} - ok - ${this.wait + opWait}`
}
}
class OkData {
wait: number
constructor(w: number) {
this.wait = w
}
static isClass(test: any): test is OkData {
return test instanceof OkData
}
static getLastIndex(site: string) {
const index = site.slice(-1)
return Number(index)
}
getMessage(site: string) {
return `${site} - ok - ${this.wait}`
}
}
const sleep = (wait: number) =>
Effect.tryPromise<OkData>(
() =>
new Promise((resolve, reject) => {
const waitSeconds = wait / 1000
if (waitSeconds % 3 === 0) {
reject(new SleepNGBeforeError())
} else if (waitSeconds % 2 === 0) {
setTimeout(() => reject(new SleepNGAfterError(wait)), wait)
} else {
setTimeout(() => resolve(new OkData(wait)), wait)
}
})
)
const checkLog = (text: string) =>
Effect.sync(() => console.log(`time : ${new Date().toISOString()} - text: ${text}`))
const testOptions = {
timeout: 1000 * 100
}
const sumNumber = (chunkSize: number) => (a: number[], b: number[]) => {
function addArray(num: number) {
if (a.length === chunkSize) {
const aMin = Math.min(...a)
const aMinIndex = a.indexOf(aMin)
a[aMinIndex] += num
} else {
a.push(num)
}
}
b.forEach((bNum) => addArray(bNum))
return a
}
function getWaitNumber(index: number) {
const wait = index * 1000
return wait
}
function mapError_sleepNg<T>(
site: string,
effectAndThen: Effect.Effect<T, UnknownException, never>
) {
const effectAll = Effect.mapError(effectAndThen, (err) => {
if (SleepNGBeforeError.isClass(err.error)) {
return err.error
} else if (SleepNGAfterError.isClass(err.error)) {
return err.error
} else {
Effect.runPromise(checkLog(`mapError_sleepNg : UnknownException : ${err}`))
return err
}
})
return effectAll
}
function mapNumber_fromData<T, U>(data: Either.Either<T, U>) {
const result = Either.mapBoth(data, {
onLeft: (left) => {
if (OkData.isClass(left)) {
return left.wait
} else if (SleepNGBeforeError.isClass(left)) {
return 0
} else if (SleepNGAfterError.isClass(left)) {
return left.wait
}
return 0
},
onRight: (right) => {
if (OkData.isClass(right)) {
return right.wait
}
return 0
}
})
return Either.isLeft(result) ? result.left : result.right
}
const do_sleep1 = (site: string) => (wait: number) =>
Effect.try(() => {
const sleepEffect = sleep(wait)
const effectAll = mapError_sleepNg(site, sleepEffect)
return effectAll
})
const do_sleep2 = (site: string) => (wait: number) =>
Effect.try(() => {
const sleepEffect = sleep(wait)
const effectAndThen = Effect.andThen(sleepEffect, (okData) => {
const topOkData = new TopOkData(site, okData)
return topOkData
})
const effectAll = mapError_sleepNg(site, effectAndThen)
return effectAll
})
const mapper1_gen = (site: string) =>
Effect.gen(function* () {
const effect = yield* pipe(site, OkData.getLastIndex, getWaitNumber, do_sleep1(site))
const either = yield* Effect.either(effect)
if (Either.isRight(either)) {
const okData = either.right
const message = okData.getMessage(site)
Effect.runPromise(checkLog(message))
} else if (Either.isLeft(either)) {
const ngData = either.left
if (SleepNGBeforeError.isClass(ngData)) {
const message = ngData.getMessage(site)
Effect.runPromise(checkLog(message))
} else if (SleepNGAfterError.isClass(ngData)) {
const message = ngData.getMessage(site)
Effect.runPromise(checkLog(message))
}
}
return either
})
const mapper2_gen = (options: { concurrency: number }) => (sites: string[]) =>
Effect.gen(function* () {
const site = sites[0]
const effect2 = yield* pipe(site, TopOkData.getTopIndex, getWaitNumber, do_sleep2(site))
const either2 = yield* Effect.either(effect2)
const map = Either.mapBoth(either2, {
onLeft: (ngData) => {
const topNgData = new TopSleepNGError(ngData)
const message = topNgData.getMessage(site)
Effect.runPromise(checkLog(message))
const topMessage = topNgData.getTopMessage(site)
return topMessage
},
onRight: (topOkData) =>
Effect.gen(function* () {
const message = TopOkData.getMessage(site)
Effect.runPromise(checkLog(message))
const either1 = yield* Effect.all(sites.map(mapper1_gen), options)
const arrayEither1 = either1.map(mapNumber_fromData)
const concurrency = options['concurrency']
const chunkArray = Array.chunksOf(arrayEither1, concurrency)
const waitNumber = Array.reduce(chunkArray, [0], sumNumber(concurrency))
const topMessage = topOkData.getTopMessage(Math.max(...waitNumber))
return topMessage
})
})
const result = Either.isLeft(map) ? map : Either.right(yield* map.right)
return result
})
describe('effect-ts 非同期処理2つの配列', () => {
test('使い方3ー全部同時実行からの直列実行', testOptions, async () => {
const sites = [
// ['https://test/1/1'],
// ['https://test/2/1', 'https://test/2/2']
// ['https://test/3/1', 'https://test/3/2', 'https://test/3/3']
// ['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
[
'https://test/5/1',
'https://test/5/2',
'https://test/5/3',
'https://test/5/4',
'https://test/5/5'
]
]
Effect.runPromise(checkLog('[start]'))
const effect = await Effect.all(sites.map(mapper2_gen({ concurrency: 1 })), {
concurrency: Number.MAX_VALUE
})
const result = await Effect.runPromise(effect)
console.log(result)
})
})
実行結果
time : 2025-02-24T11:33:38.612Z - text: [start]
time : 2025-02-24T11:33:43.637Z - text: mapper2-index = 5
time : 2025-02-24T11:33:44.649Z - text: https://test/5/1 - ok - 1000
time : 2025-02-24T11:33:46.670Z - text: https://test/5/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-24T11:33:46.671Z - text: https://test/5/3 - ng - Sleep NG before wait
time : 2025-02-24T11:33:50.678Z - text: https://test/5/4 - ng - 4000 - Sleep NG after wait
time : 2025-02-24T11:33:55.683Z - text: https://test/5/5 - ok - 5000
[
{
_id: 'Either',
_tag: 'Right',
right: 'https://test/5 - ok - 17000'
}
]
全部同時実行からの最大2個まで同時実行
import { describe, test } from 'vitest'
import { Effect, pipe, Array, Either } from 'effect'
import { UnknownException } from 'effect/Cause'
class SleepNGBeforeError {
static tagName = 'Sleep.Ng.Before'
readonly _tag = SleepNGBeforeError.tagName
static isClass(test: any): test is SleepNGBeforeError {
return test._tag === SleepNGBeforeError.tagName
}
getMessage(site: string) {
return `${site} - ng - Sleep NG before wait`
}
}
class SleepNGAfterError {
static tagName = 'Sleep.Ng.After'
readonly _tag = SleepNGAfterError.tagName
wait: number
constructor(w: number) {
this.wait = w
}
static isClass(test: any): test is SleepNGAfterError {
return test._tag === SleepNGAfterError.tagName
}
getMessage(site: string) {
return `${site} - ng - ${this.wait} - Sleep NG after wait`
}
}
type AllNgData = SleepNGBeforeError | SleepNGAfterError | UnknownException
class TopSleepNGError {
static tagName = 'Top.Sleep.Ng'
readonly _tag = TopSleepNGError.tagName
ngData: AllNgData
constructor(ng: AllNgData) {
this.ngData = ng
}
static isClass(test: any): test is SleepNGAfterError {
return test._tag === SleepNGAfterError.tagName
}
getLastMessage() {
const lastMessage =
this.ngData instanceof UnknownException
? ' - ng - UnknownException'
: this.ngData.getMessage('')
return lastMessage
}
getMessage(site: string) {
const index = TopOkData.getTopIndex(site)
const lastMessage = this.getLastMessage()
return `mapper2-index = ${index}${lastMessage}`
}
getTopMessage(site: string) {
const lastMessage = this.getLastMessage()
return `${site.substring(0, site.length - 2)}${lastMessage}`
}
}
class TopOkData {
site: string
wait: number
constructor(s: string, ok: OkData) {
this.site = s
this.wait = ok.wait
}
static isClass(test: any): test is TopOkData {
return test instanceof TopOkData
}
static getTopIndex(site: string) {
const index = site.substring(site.length - 3, site.length - 2)
return Number(index)
}
static getMessage(site: string) {
const index = TopOkData.getTopIndex(site)
return `mapper2-index = ${index}`
}
getTopMessage(opWait: number) {
return `${this.site.substring(0, this.site.length - 2)} - ok - ${this.wait + opWait}`
}
}
class OkData {
wait: number
constructor(w: number) {
this.wait = w
}
static isClass(test: any): test is OkData {
return test instanceof OkData
}
static getLastIndex(site: string) {
const index = site.slice(-1)
return Number(index)
}
getMessage(site: string) {
return `${site} - ok - ${this.wait}`
}
}
const sleep = (wait: number) =>
Effect.tryPromise<OkData>(
() =>
new Promise((resolve, reject) => {
const waitSeconds = wait / 1000
if (waitSeconds % 3 === 0) {
reject(new SleepNGBeforeError())
} else if (waitSeconds % 2 === 0) {
setTimeout(() => reject(new SleepNGAfterError(wait)), wait)
} else {
setTimeout(() => resolve(new OkData(wait)), wait)
}
})
)
const checkLog = (text: string) =>
Effect.sync(() => console.log(`time : ${new Date().toISOString()} - text: ${text}`))
const testOptions = {
timeout: 1000 * 100
}
const sumNumber = (chunkSize: number) => (a: number[], b: number[]) => {
function addArray(num: number) {
if (a.length === chunkSize) {
const aMin = Math.min(...a)
const aMinIndex = a.indexOf(aMin)
a[aMinIndex] += num
} else {
a.push(num)
}
}
b.forEach((bNum) => addArray(bNum))
return a
}
function getWaitNumber(index: number) {
const wait = index * 1000
return wait
}
function mapError_sleepNg<T>(
site: string,
effectAndThen: Effect.Effect<T, UnknownException, never>
) {
const effectAll = Effect.mapError(effectAndThen, (err) => {
if (SleepNGBeforeError.isClass(err.error)) {
return err.error
} else if (SleepNGAfterError.isClass(err.error)) {
return err.error
} else {
Effect.runPromise(checkLog(`mapError_sleepNg : UnknownException : ${err}`))
return err
}
})
return effectAll
}
function mapNumber_fromData<T, U>(data: Either.Either<T, U>) {
const result = Either.mapBoth(data, {
onLeft: (left) => {
if (OkData.isClass(left)) {
return left.wait
} else if (SleepNGBeforeError.isClass(left)) {
return 0
} else if (SleepNGAfterError.isClass(left)) {
return left.wait
}
return 0
},
onRight: (right) => {
if (OkData.isClass(right)) {
return right.wait
}
return 0
}
})
return Either.isLeft(result) ? result.left : result.right
}
const do_sleep1 = (site: string) => (wait: number) =>
Effect.try(() => {
const sleepEffect = sleep(wait)
const effectAll = mapError_sleepNg(site, sleepEffect)
return effectAll
})
const do_sleep2 = (site: string) => (wait: number) =>
Effect.try(() => {
const sleepEffect = sleep(wait)
const effectAndThen = Effect.andThen(sleepEffect, (okData) => {
const topOkData = new TopOkData(site, okData)
return topOkData
})
const effectAll = mapError_sleepNg(site, effectAndThen)
return effectAll
})
const mapper1_gen = (site: string) =>
Effect.gen(function* () {
const effect = yield* pipe(site, OkData.getLastIndex, getWaitNumber, do_sleep1(site))
const either = yield* Effect.either(effect)
if (Either.isRight(either)) {
const okData = either.right
const message = okData.getMessage(site)
Effect.runPromise(checkLog(message))
} else if (Either.isLeft(either)) {
const ngData = either.left
if (SleepNGBeforeError.isClass(ngData)) {
const message = ngData.getMessage(site)
Effect.runPromise(checkLog(message))
} else if (SleepNGAfterError.isClass(ngData)) {
const message = ngData.getMessage(site)
Effect.runPromise(checkLog(message))
}
}
return either
})
const mapper2_gen = (options: { concurrency: number }) => (sites: string[]) =>
Effect.gen(function* () {
const site = sites[0]
const effect2 = yield* pipe(site, TopOkData.getTopIndex, getWaitNumber, do_sleep2(site))
const either2 = yield* Effect.either(effect2)
const map = Either.mapBoth(either2, {
onLeft: (ngData) => {
const topNgData = new TopSleepNGError(ngData)
const message = topNgData.getMessage(site)
Effect.runPromise(checkLog(message))
const topMessage = topNgData.getTopMessage(site)
return topMessage
},
onRight: (topOkData) =>
Effect.gen(function* () {
const message = TopOkData.getMessage(site)
Effect.runPromise(checkLog(message))
const either1 = yield* Effect.all(sites.map(mapper1_gen), options)
const arrayEither1 = either1.map(mapNumber_fromData)
const concurrency = options['concurrency']
const chunkArray = Array.chunksOf(arrayEither1, concurrency)
const waitNumber = Array.reduce(chunkArray, [0], sumNumber(concurrency))
const topMessage = topOkData.getTopMessage(Math.max(...waitNumber))
return topMessage
})
})
const result = Either.isLeft(map) ? map : Either.right(yield* map.right)
return result
})
describe('effect-ts 非同期処理2つの配列', () => {
test('使い方5ー全部同時実行からの最大2個まで同時実行', testOptions, async () => {
const sites = [
// ['https://test/1/1'],
// ['https://test/2/1', 'https://test/2/2']
// ['https://test/3/1', 'https://test/3/2', 'https://test/3/3']
// ['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
[
'https://test/5/1',
'https://test/5/2',
'https://test/5/3',
'https://test/5/4',
'https://test/5/5'
]
]
Effect.runPromise(checkLog('[start]'))
const effect = await Effect.all(sites.map(mapper2_gen({ concurrency: 2 })), {
concurrency: Number.MAX_VALUE
})
const result = await Effect.runPromise(effect)
console.log(result)
})
})
実行結果
time : 2025-02-24T11:35:37.523Z - text: [start]
time : 2025-02-24T11:35:42.542Z - text: mapper2-index = 5
time : 2025-02-24T11:35:43.562Z - text: https://test/5/1 - ok - 1000
time : 2025-02-24T11:35:43.576Z - text: https://test/5/3 - ng - Sleep NG before wait
time : 2025-02-24T11:35:44.561Z - text: https://test/5/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-24T11:35:47.587Z - text: https://test/5/4 - ng - 4000 - Sleep NG after wait
time : 2025-02-24T11:35:49.566Z - text: https://test/5/5 - ok - 5000
[
{
_id: 'Either',
_tag: 'Right',
right: 'https://test/5 - ok - 12000'
}
]
配列が2つの場合(複雑)
全部同時実行からの全部同時実行
import { describe, test } from 'vitest'
import { Effect, pipe, Array, Either } from 'effect'
import { UnknownException } from 'effect/Cause'
class SleepNGBeforeError {
static tagName = 'Sleep.Ng.Before'
readonly _tag = SleepNGBeforeError.tagName
static isClass(test: any): test is SleepNGBeforeError {
return test._tag === SleepNGBeforeError.tagName
}
getMessage(site: string) {
return `${site} - ng - Sleep NG before wait`
}
}
class SleepNGAfterError {
static tagName = 'Sleep.Ng.After'
readonly _tag = SleepNGAfterError.tagName
wait: number
constructor(w: number) {
this.wait = w
}
static isClass(test: any): test is SleepNGAfterError {
return test._tag === SleepNGAfterError.tagName
}
getMessage(site: string) {
return `${site} - ng - ${this.wait} - Sleep NG after wait`
}
}
type AllNgData = SleepNGBeforeError | SleepNGAfterError | UnknownException
class TopSleepNGError {
static tagName = 'Top.Sleep.Ng'
readonly _tag = TopSleepNGError.tagName
ngData: AllNgData
constructor(ng: AllNgData) {
this.ngData = ng
}
static isClass(test: any): test is SleepNGAfterError {
return test._tag === SleepNGAfterError.tagName
}
getLastMessage() {
const lastMessage =
this.ngData instanceof UnknownException
? ' - ng - UnknownException'
: this.ngData.getMessage('')
return lastMessage
}
getMessage(site: string) {
const index = TopOkData.getTopIndex(site)
const lastMessage = this.getLastMessage()
return `mapper2-index = ${index}${lastMessage}`
}
getTopMessage(site: string) {
const lastMessage = this.getLastMessage()
return `${site.substring(0, site.length - 2)}${lastMessage}`
}
}
class TopOkData {
site: string
wait: number
constructor(s: string, ok: OkData) {
this.site = s
this.wait = ok.wait
}
static isClass(test: any): test is TopOkData {
return test instanceof TopOkData
}
static getTopIndex(site: string) {
const index = site.substring(site.length - 3, site.length - 2)
return Number(index)
}
static getMessage(site: string) {
const index = TopOkData.getTopIndex(site)
return `mapper2-index = ${index}`
}
getTopMessage(opWait: number) {
return `${this.site.substring(0, this.site.length - 2)} - ok - ${this.wait + opWait}`
}
}
class OkData {
wait: number
constructor(w: number) {
this.wait = w
}
static isClass(test: any): test is OkData {
return test instanceof OkData
}
static getLastIndex(site: string) {
const index = site.slice(-1)
return Number(index)
}
getMessage(site: string) {
return `${site} - ok - ${this.wait}`
}
}
const sleep = (wait: number) =>
Effect.tryPromise<OkData>(
() =>
new Promise((resolve, reject) => {
const waitSeconds = wait / 1000
if (waitSeconds % 3 === 0) {
reject(new SleepNGBeforeError())
} else if (waitSeconds % 2 === 0) {
setTimeout(() => reject(new SleepNGAfterError(wait)), wait)
} else {
setTimeout(() => resolve(new OkData(wait)), wait)
}
})
)
const checkLog = (text: string) =>
Effect.sync(() => console.log(`time : ${new Date().toISOString()} - text: ${text}`))
const testOptions = {
timeout: 1000 * 100
}
const sumNumber = (chunkSize: number) => (a: number[], b: number[]) => {
function addArray(num: number) {
if (a.length === chunkSize) {
const aMin = Math.min(...a)
const aMinIndex = a.indexOf(aMin)
a[aMinIndex] += num
} else {
a.push(num)
}
}
b.forEach((bNum) => addArray(bNum))
return a
}
function getWaitNumber(index: number) {
const wait = index * 1000
return wait
}
function mapError_sleepNg<T>(
site: string,
effectAndThen: Effect.Effect<T, UnknownException, never>
) {
const effectAll = Effect.mapError(effectAndThen, (err) => {
if (SleepNGBeforeError.isClass(err.error)) {
return err.error
} else if (SleepNGAfterError.isClass(err.error)) {
return err.error
} else {
Effect.runPromise(checkLog(`mapError_sleepNg : UnknownException : ${err}`))
return err
}
})
return effectAll
}
function mapNumber_fromData<T, U>(data: Either.Either<T, U>) {
const result = Either.mapBoth(data, {
onLeft: (left) => {
if (OkData.isClass(left)) {
return left.wait
} else if (SleepNGBeforeError.isClass(left)) {
return 0
} else if (SleepNGAfterError.isClass(left)) {
return left.wait
}
return 0
},
onRight: (right) => {
if (OkData.isClass(right)) {
return right.wait
}
return 0
}
})
return Either.isLeft(result) ? result.left : result.right
}
const do_sleep1 = (site: string) => (wait: number) =>
Effect.try(() => {
const sleepEffect = sleep(wait)
const effectAll = mapError_sleepNg(site, sleepEffect)
return effectAll
})
const do_sleep2 = (site: string) => (wait: number) =>
Effect.try(() => {
const sleepEffect = sleep(wait)
const effectAndThen = Effect.andThen(sleepEffect, (okData) => {
const topOkData = new TopOkData(site, okData)
return topOkData
})
const effectAll = mapError_sleepNg(site, effectAndThen)
return effectAll
})
const mapper1_gen = (site: string) =>
Effect.gen(function* () {
const effect = yield* pipe(site, OkData.getLastIndex, getWaitNumber, do_sleep1(site))
const either = yield* Effect.either(effect)
if (Either.isRight(either)) {
const okData = either.right
const message = okData.getMessage(site)
Effect.runPromise(checkLog(message))
} else if (Either.isLeft(either)) {
const ngData = either.left
if (SleepNGBeforeError.isClass(ngData)) {
const message = ngData.getMessage(site)
Effect.runPromise(checkLog(message))
} else if (SleepNGAfterError.isClass(ngData)) {
const message = ngData.getMessage(site)
Effect.runPromise(checkLog(message))
}
}
return either
})
const mapper2_gen = (options: { concurrency: number }) => (sites: string[]) =>
Effect.gen(function* () {
const site = sites[0]
const effect2 = yield* pipe(site, TopOkData.getTopIndex, getWaitNumber, do_sleep2(site))
const either2 = yield* Effect.either(effect2)
const map = Either.mapBoth(either2, {
onLeft: (ngData) => {
const topNgData = new TopSleepNGError(ngData)
const message = topNgData.getMessage(site)
Effect.runPromise(checkLog(message))
const topMessage = topNgData.getTopMessage(site)
return topMessage
},
onRight: (topOkData) =>
Effect.gen(function* () {
const message = TopOkData.getMessage(site)
Effect.runPromise(checkLog(message))
const either1 = yield* Effect.all(sites.map(mapper1_gen), options)
const arrayEither1 = either1.map(mapNumber_fromData)
const concurrency = options['concurrency']
const chunkArray = Array.chunksOf(arrayEither1, concurrency)
const waitNumber = Array.reduce(chunkArray, [0], sumNumber(concurrency))
const topMessage = topOkData.getTopMessage(Math.max(...waitNumber))
return topMessage
})
})
const result = Either.isLeft(map) ? map : Either.right(yield* map.right)
return result
})
describe('effect-ts 非同期処理2つの配列', () => {
test('使い方2ー全部同時実行からの全部同時実行', testOptions, async () => {
const sites = [
['https://test/1/1'],
['https://test/2/1', 'https://test/2/2'],
['https://test/3/1', 'https://test/3/2', 'https://test/3/3'],
['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
[
'https://test/5/1',
'https://test/5/2',
'https://test/5/3',
'https://test/5/4',
'https://test/5/5'
]
]
Effect.runPromise(checkLog('[start]'))
const effect = await Effect.all(sites.map(mapper2_gen({ concurrency: Number.MAX_VALUE })), {
concurrency: Number.MAX_VALUE
})
const result = await Effect.runPromise(effect)
console.log(result)
})
})
実行結果
time : 2025-02-24T11:37:05.712Z - text: [start]
time : 2025-02-24T11:37:05.725Z - text: mapper2-index = 3 - ng - Sleep NG before wait
time : 2025-02-24T11:37:06.736Z - text: mapper2-index = 1
time : 2025-02-24T11:37:07.739Z - text: mapper2-index = 2 - ng - 2000 - Sleep NG after wait
time : 2025-02-24T11:37:07.744Z - text: https://test/1/1 - ok - 1000
time : 2025-02-24T11:37:09.729Z - text: mapper2-index = 4 - ng - 4000 - Sleep NG after wait
time : 2025-02-24T11:37:10.730Z - text: mapper2-index = 5
time : 2025-02-24T11:37:10.741Z - text: https://test/5/3 - ng - Sleep NG before wait
time : 2025-02-24T11:37:11.747Z - text: https://test/5/1 - ok - 1000
time : 2025-02-24T11:37:12.750Z - text: https://test/5/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-24T11:37:14.753Z - text: https://test/5/4 - ng - 4000 - Sleep NG after wait
time : 2025-02-24T11:37:15.755Z - text: https://test/5/5 - ok - 5000
[
{ _id: 'Either', _tag: 'Right', right: 'https://test/1 - ok - 2000' },
{
_id: 'Either',
_tag: 'Left',
left: 'https://test/2 - ng - 2000 - Sleep NG after wait'
},
{
_id: 'Either',
_tag: 'Left',
left: 'https://test/3 - ng - Sleep NG before wait'
},
{
_id: 'Either',
_tag: 'Left',
left: 'https://test/4 - ng - 4000 - Sleep NG after wait'
},
{
_id: 'Either',
_tag: 'Right',
right: 'https://test/5 - ok - 10000'
}
]
全部同時実行からの直列実行
import { describe, test } from 'vitest'
import { Effect, pipe, Array, Either } from 'effect'
import { UnknownException } from 'effect/Cause'
class SleepNGBeforeError {
static tagName = 'Sleep.Ng.Before'
readonly _tag = SleepNGBeforeError.tagName
static isClass(test: any): test is SleepNGBeforeError {
return test._tag === SleepNGBeforeError.tagName
}
getMessage(site: string) {
return `${site} - ng - Sleep NG before wait`
}
}
class SleepNGAfterError {
static tagName = 'Sleep.Ng.After'
readonly _tag = SleepNGAfterError.tagName
wait: number
constructor(w: number) {
this.wait = w
}
static isClass(test: any): test is SleepNGAfterError {
return test._tag === SleepNGAfterError.tagName
}
getMessage(site: string) {
return `${site} - ng - ${this.wait} - Sleep NG after wait`
}
}
type AllNgData = SleepNGBeforeError | SleepNGAfterError | UnknownException
class TopSleepNGError {
static tagName = 'Top.Sleep.Ng'
readonly _tag = TopSleepNGError.tagName
ngData: AllNgData
constructor(ng: AllNgData) {
this.ngData = ng
}
static isClass(test: any): test is SleepNGAfterError {
return test._tag === SleepNGAfterError.tagName
}
getLastMessage() {
const lastMessage =
this.ngData instanceof UnknownException
? ' - ng - UnknownException'
: this.ngData.getMessage('')
return lastMessage
}
getMessage(site: string) {
const index = TopOkData.getTopIndex(site)
const lastMessage = this.getLastMessage()
return `mapper2-index = ${index}${lastMessage}`
}
getTopMessage(site: string) {
const lastMessage = this.getLastMessage()
return `${site.substring(0, site.length - 2)}${lastMessage}`
}
}
class TopOkData {
site: string
wait: number
constructor(s: string, ok: OkData) {
this.site = s
this.wait = ok.wait
}
static isClass(test: any): test is TopOkData {
return test instanceof TopOkData
}
static getTopIndex(site: string) {
const index = site.substring(site.length - 3, site.length - 2)
return Number(index)
}
static getMessage(site: string) {
const index = TopOkData.getTopIndex(site)
return `mapper2-index = ${index}`
}
getTopMessage(opWait: number) {
return `${this.site.substring(0, this.site.length - 2)} - ok - ${this.wait + opWait}`
}
}
class OkData {
wait: number
constructor(w: number) {
this.wait = w
}
static isClass(test: any): test is OkData {
return test instanceof OkData
}
static getLastIndex(site: string) {
const index = site.slice(-1)
return Number(index)
}
getMessage(site: string) {
return `${site} - ok - ${this.wait}`
}
}
const sleep = (wait: number) =>
Effect.tryPromise<OkData>(
() =>
new Promise((resolve, reject) => {
const waitSeconds = wait / 1000
if (waitSeconds % 3 === 0) {
reject(new SleepNGBeforeError())
} else if (waitSeconds % 2 === 0) {
setTimeout(() => reject(new SleepNGAfterError(wait)), wait)
} else {
setTimeout(() => resolve(new OkData(wait)), wait)
}
})
)
const checkLog = (text: string) =>
Effect.sync(() => console.log(`time : ${new Date().toISOString()} - text: ${text}`))
const testOptions = {
timeout: 1000 * 100
}
const sumNumber = (chunkSize: number) => (a: number[], b: number[]) => {
function addArray(num: number) {
if (a.length === chunkSize) {
const aMin = Math.min(...a)
const aMinIndex = a.indexOf(aMin)
a[aMinIndex] += num
} else {
a.push(num)
}
}
b.forEach((bNum) => addArray(bNum))
return a
}
function getWaitNumber(index: number) {
const wait = index * 1000
return wait
}
function mapError_sleepNg<T>(
site: string,
effectAndThen: Effect.Effect<T, UnknownException, never>
) {
const effectAll = Effect.mapError(effectAndThen, (err) => {
if (SleepNGBeforeError.isClass(err.error)) {
return err.error
} else if (SleepNGAfterError.isClass(err.error)) {
return err.error
} else {
Effect.runPromise(checkLog(`mapError_sleepNg : UnknownException : ${err}`))
return err
}
})
return effectAll
}
function mapNumber_fromData<T, U>(data: Either.Either<T, U>) {
const result = Either.mapBoth(data, {
onLeft: (left) => {
if (OkData.isClass(left)) {
return left.wait
} else if (SleepNGBeforeError.isClass(left)) {
return 0
} else if (SleepNGAfterError.isClass(left)) {
return left.wait
}
return 0
},
onRight: (right) => {
if (OkData.isClass(right)) {
return right.wait
}
return 0
}
})
return Either.isLeft(result) ? result.left : result.right
}
const do_sleep1 = (site: string) => (wait: number) =>
Effect.try(() => {
const sleepEffect = sleep(wait)
const effectAll = mapError_sleepNg(site, sleepEffect)
return effectAll
})
const do_sleep2 = (site: string) => (wait: number) =>
Effect.try(() => {
const sleepEffect = sleep(wait)
const effectAndThen = Effect.andThen(sleepEffect, (okData) => {
const topOkData = new TopOkData(site, okData)
return topOkData
})
const effectAll = mapError_sleepNg(site, effectAndThen)
return effectAll
})
const mapper1_gen = (site: string) =>
Effect.gen(function* () {
const effect = yield* pipe(site, OkData.getLastIndex, getWaitNumber, do_sleep1(site))
const either = yield* Effect.either(effect)
if (Either.isRight(either)) {
const okData = either.right
const message = okData.getMessage(site)
Effect.runPromise(checkLog(message))
} else if (Either.isLeft(either)) {
const ngData = either.left
if (SleepNGBeforeError.isClass(ngData)) {
const message = ngData.getMessage(site)
Effect.runPromise(checkLog(message))
} else if (SleepNGAfterError.isClass(ngData)) {
const message = ngData.getMessage(site)
Effect.runPromise(checkLog(message))
}
}
return either
})
const mapper2_gen = (options: { concurrency: number }) => (sites: string[]) =>
Effect.gen(function* () {
const site = sites[0]
const effect2 = yield* pipe(site, TopOkData.getTopIndex, getWaitNumber, do_sleep2(site))
const either2 = yield* Effect.either(effect2)
const map = Either.mapBoth(either2, {
onLeft: (ngData) => {
const topNgData = new TopSleepNGError(ngData)
const message = topNgData.getMessage(site)
Effect.runPromise(checkLog(message))
const topMessage = topNgData.getTopMessage(site)
return topMessage
},
onRight: (topOkData) =>
Effect.gen(function* () {
const message = TopOkData.getMessage(site)
Effect.runPromise(checkLog(message))
const either1 = yield* Effect.all(sites.map(mapper1_gen), options)
const arrayEither1 = either1.map(mapNumber_fromData)
const concurrency = options['concurrency']
const chunkArray = Array.chunksOf(arrayEither1, concurrency)
const waitNumber = Array.reduce(chunkArray, [0], sumNumber(concurrency))
const topMessage = topOkData.getTopMessage(Math.max(...waitNumber))
return topMessage
})
})
const result = Either.isLeft(map) ? map : Either.right(yield* map.right)
return result
})
describe('effect-ts 非同期処理2つの配列', () => {
test('使い方4ー全部同時実行からの直列実行', testOptions, async () => {
const sites = [
['https://test/1/1'],
['https://test/2/1', 'https://test/2/2'],
['https://test/3/1', 'https://test/3/2', 'https://test/3/3'],
['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
[
'https://test/5/1',
'https://test/5/2',
'https://test/5/3',
'https://test/5/4',
'https://test/5/5'
]
]
Effect.runPromise(checkLog('[start]'))
const effect = await Effect.all(sites.map(mapper2_gen({ concurrency: 1 })), {
concurrency: Number.MAX_VALUE
})
const result = await Effect.runPromise(effect)
console.log(result)
})
})
実行結果
time : 2025-02-24T11:38:48.551Z - text: [start]
time : 2025-02-24T11:38:48.566Z - text: mapper2-index = 3 - ng - Sleep NG before wait
time : 2025-02-24T11:38:49.565Z - text: mapper2-index = 1
time : 2025-02-24T11:38:50.565Z - text: mapper2-index = 2 - ng - 2000 - Sleep NG after wait
time : 2025-02-24T11:38:50.565Z - text: https://test/1/1 - ok - 1000
time : 2025-02-24T11:38:52.576Z - text: mapper2-index = 4 - ng - 4000 - Sleep NG after wait
time : 2025-02-24T11:38:53.569Z - text: mapper2-index = 5
time : 2025-02-24T11:38:54.587Z - text: https://test/5/1 - ok - 1000
time : 2025-02-24T11:38:56.595Z - text: https://test/5/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-24T11:38:56.596Z - text: https://test/5/3 - ng - Sleep NG before wait
time : 2025-02-24T11:39:00.600Z - text: https://test/5/4 - ng - 4000 - Sleep NG after wait
time : 2025-02-24T11:39:05.606Z - text: https://test/5/5 - ok - 5000
[
{ _id: 'Either', _tag: 'Right', right: 'https://test/1 - ok - 2000' },
{
_id: 'Either',
_tag: 'Left',
left: 'https://test/2 - ng - 2000 - Sleep NG after wait'
},
{
_id: 'Either',
_tag: 'Left',
left: 'https://test/3 - ng - Sleep NG before wait'
},
{
_id: 'Either',
_tag: 'Left',
left: 'https://test/4 - ng - 4000 - Sleep NG after wait'
},
{
_id: 'Either',
_tag: 'Right',
right: 'https://test/5 - ok - 17000'
}
]
全部同時実行からの最大2個まで同時実行
import { describe, test } from 'vitest'
import { Effect, pipe, Array, Either } from 'effect'
import { UnknownException } from 'effect/Cause'
class SleepNGBeforeError {
static tagName = 'Sleep.Ng.Before'
readonly _tag = SleepNGBeforeError.tagName
static isClass(test: any): test is SleepNGBeforeError {
return test._tag === SleepNGBeforeError.tagName
}
getMessage(site: string) {
return `${site} - ng - Sleep NG before wait`
}
}
class SleepNGAfterError {
static tagName = 'Sleep.Ng.After'
readonly _tag = SleepNGAfterError.tagName
wait: number
constructor(w: number) {
this.wait = w
}
static isClass(test: any): test is SleepNGAfterError {
return test._tag === SleepNGAfterError.tagName
}
getMessage(site: string) {
return `${site} - ng - ${this.wait} - Sleep NG after wait`
}
}
type AllNgData = SleepNGBeforeError | SleepNGAfterError | UnknownException
class TopSleepNGError {
static tagName = 'Top.Sleep.Ng'
readonly _tag = TopSleepNGError.tagName
ngData: AllNgData
constructor(ng: AllNgData) {
this.ngData = ng
}
static isClass(test: any): test is SleepNGAfterError {
return test._tag === SleepNGAfterError.tagName
}
getLastMessage() {
const lastMessage =
this.ngData instanceof UnknownException
? ' - ng - UnknownException'
: this.ngData.getMessage('')
return lastMessage
}
getMessage(site: string) {
const index = TopOkData.getTopIndex(site)
const lastMessage = this.getLastMessage()
return `mapper2-index = ${index}${lastMessage}`
}
getTopMessage(site: string) {
const lastMessage = this.getLastMessage()
return `${site.substring(0, site.length - 2)}${lastMessage}`
}
}
class TopOkData {
site: string
wait: number
constructor(s: string, ok: OkData) {
this.site = s
this.wait = ok.wait
}
static isClass(test: any): test is TopOkData {
return test instanceof TopOkData
}
static getTopIndex(site: string) {
const index = site.substring(site.length - 3, site.length - 2)
return Number(index)
}
static getMessage(site: string) {
const index = TopOkData.getTopIndex(site)
return `mapper2-index = ${index}`
}
getTopMessage(opWait: number) {
return `${this.site.substring(0, this.site.length - 2)} - ok - ${this.wait + opWait}`
}
}
class OkData {
wait: number
constructor(w: number) {
this.wait = w
}
static isClass(test: any): test is OkData {
return test instanceof OkData
}
static getLastIndex(site: string) {
const index = site.slice(-1)
return Number(index)
}
getMessage(site: string) {
return `${site} - ok - ${this.wait}`
}
}
const sleep = (wait: number) =>
Effect.tryPromise<OkData>(
() =>
new Promise((resolve, reject) => {
const waitSeconds = wait / 1000
if (waitSeconds % 3 === 0) {
reject(new SleepNGBeforeError())
} else if (waitSeconds % 2 === 0) {
setTimeout(() => reject(new SleepNGAfterError(wait)), wait)
} else {
setTimeout(() => resolve(new OkData(wait)), wait)
}
})
)
const checkLog = (text: string) =>
Effect.sync(() => console.log(`time : ${new Date().toISOString()} - text: ${text}`))
const testOptions = {
timeout: 1000 * 100
}
const sumNumber = (chunkSize: number) => (a: number[], b: number[]) => {
function addArray(num: number) {
if (a.length === chunkSize) {
const aMin = Math.min(...a)
const aMinIndex = a.indexOf(aMin)
a[aMinIndex] += num
} else {
a.push(num)
}
}
b.forEach((bNum) => addArray(bNum))
return a
}
function getWaitNumber(index: number) {
const wait = index * 1000
return wait
}
function mapError_sleepNg<T>(
site: string,
effectAndThen: Effect.Effect<T, UnknownException, never>
) {
const effectAll = Effect.mapError(effectAndThen, (err) => {
if (SleepNGBeforeError.isClass(err.error)) {
return err.error
} else if (SleepNGAfterError.isClass(err.error)) {
return err.error
} else {
Effect.runPromise(checkLog(`mapError_sleepNg : UnknownException : ${err}`))
return err
}
})
return effectAll
}
function mapNumber_fromData<T, U>(data: Either.Either<T, U>) {
const result = Either.mapBoth(data, {
onLeft: (left) => {
if (OkData.isClass(left)) {
return left.wait
} else if (SleepNGBeforeError.isClass(left)) {
return 0
} else if (SleepNGAfterError.isClass(left)) {
return left.wait
}
return 0
},
onRight: (right) => {
if (OkData.isClass(right)) {
return right.wait
}
return 0
}
})
return Either.isLeft(result) ? result.left : result.right
}
const do_sleep1 = (site: string) => (wait: number) =>
Effect.try(() => {
const sleepEffect = sleep(wait)
const effectAll = mapError_sleepNg(site, sleepEffect)
return effectAll
})
const do_sleep2 = (site: string) => (wait: number) =>
Effect.try(() => {
const sleepEffect = sleep(wait)
const effectAndThen = Effect.andThen(sleepEffect, (okData) => {
const topOkData = new TopOkData(site, okData)
return topOkData
})
const effectAll = mapError_sleepNg(site, effectAndThen)
return effectAll
})
const mapper1_gen = (site: string) =>
Effect.gen(function* () {
const effect = yield* pipe(site, OkData.getLastIndex, getWaitNumber, do_sleep1(site))
const either = yield* Effect.either(effect)
if (Either.isRight(either)) {
const okData = either.right
const message = okData.getMessage(site)
Effect.runPromise(checkLog(message))
} else if (Either.isLeft(either)) {
const ngData = either.left
if (SleepNGBeforeError.isClass(ngData)) {
const message = ngData.getMessage(site)
Effect.runPromise(checkLog(message))
} else if (SleepNGAfterError.isClass(ngData)) {
const message = ngData.getMessage(site)
Effect.runPromise(checkLog(message))
}
}
return either
})
const mapper2_gen = (options: { concurrency: number }) => (sites: string[]) =>
Effect.gen(function* () {
const site = sites[0]
const effect2 = yield* pipe(site, TopOkData.getTopIndex, getWaitNumber, do_sleep2(site))
const either2 = yield* Effect.either(effect2)
const map = Either.mapBoth(either2, {
onLeft: (ngData) => {
const topNgData = new TopSleepNGError(ngData)
const message = topNgData.getMessage(site)
Effect.runPromise(checkLog(message))
const topMessage = topNgData.getTopMessage(site)
return topMessage
},
onRight: (topOkData) =>
Effect.gen(function* () {
const message = TopOkData.getMessage(site)
Effect.runPromise(checkLog(message))
const either1 = yield* Effect.all(sites.map(mapper1_gen), options)
const arrayEither1 = either1.map(mapNumber_fromData)
const concurrency = options['concurrency']
const chunkArray = Array.chunksOf(arrayEither1, concurrency)
const waitNumber = Array.reduce(chunkArray, [0], sumNumber(concurrency))
const topMessage = topOkData.getTopMessage(Math.max(...waitNumber))
return topMessage
})
})
const result = Either.isLeft(map) ? map : Either.right(yield* map.right)
return result
})
describe('effect-ts 非同期処理2つの配列', () => {
test('使い方6ー全部同時実行からの最大2個まで同時実行', testOptions, async () => {
const sites = [
['https://test/1/1'],
['https://test/2/1', 'https://test/2/2'],
['https://test/3/1', 'https://test/3/2', 'https://test/3/3'],
['https://test/4/1', 'https://test/4/2', 'https://test/4/3', 'https://test/4/4'],
[
'https://test/5/1',
'https://test/5/2',
'https://test/5/3',
'https://test/5/4',
'https://test/5/5'
]
]
Effect.runPromise(checkLog('[start]'))
const effect = await Effect.all(sites.map(mapper2_gen({ concurrency: 2 })), {
concurrency: Number.MAX_VALUE
})
const result = await Effect.runPromise(effect)
console.log(result)
})
})
実行結果
time : 2025-02-24T11:40:45.713Z - text: [start]
time : 2025-02-24T11:40:45.726Z - text: mapper2-index = 3 - ng - Sleep NG before wait
time : 2025-02-24T11:40:46.726Z - text: mapper2-index = 1
time : 2025-02-24T11:40:47.729Z - text: mapper2-index = 2 - ng - 2000 - Sleep NG after wait
time : 2025-02-24T11:40:47.732Z - text: https://test/1/1 - ok - 1000
time : 2025-02-24T11:40:49.734Z - text: mapper2-index = 4 - ng - 4000 - Sleep NG after wait
time : 2025-02-24T11:40:50.739Z - text: mapper2-index = 5
time : 2025-02-24T11:40:51.754Z - text: https://test/5/1 - ok - 1000
time : 2025-02-24T11:40:51.758Z - text: https://test/5/3 - ng - Sleep NG before wait
time : 2025-02-24T11:40:52.757Z - text: https://test/5/2 - ng - 2000 - Sleep NG after wait
time : 2025-02-24T11:40:55.763Z - text: https://test/5/4 - ng - 4000 - Sleep NG after wait
time : 2025-02-24T11:40:57.767Z - text: https://test/5/5 - ok - 5000
[
{ _id: 'Either', _tag: 'Right', right: 'https://test/1 - ok - 2000' },
{
_id: 'Either',
_tag: 'Left',
left: 'https://test/2 - ng - 2000 - Sleep NG after wait'
},
{
_id: 'Either',
_tag: 'Left',
left: 'https://test/3 - ng - Sleep NG before wait'
},
{
_id: 'Either',
_tag: 'Left',
left: 'https://test/4 - ng - 4000 - Sleep NG after wait'
},
{
_id: 'Either',
_tag: 'Right',
right: 'https://test/5 - ok - 12000'
}
]