はじめに
自分はミロゴスに今年(2023年4月)入社した新卒エンジニアで、TypeScriptを使って開発を行っています。
複数のプロセスを非同期で同時に走らせたいという要求を実現するのに、苦戦をしたのでその実装方法をここで紹介したいと思います。
想定する読者
- 複数のプロセスを同時に走らせる非同期処理の実装方法を知りたい方
- 非同期処理に対する例外処理の実装方法が分からない方
実行環境
- AWS Lambda
- ランタイム
- Node.js 20.x
- アーキテクチャ
- x86_64
- ランタイム
今回実装する要求・要件
要求
5個のプロセスを同時に走らせたい
要件
- 5個のプロセスを同時に走らせる。
- 途中で止まるプロセスがあっても、正常なプロセスは最後まで実行させる。
- 5個全てのプロセスの終了が確認できたとき、途中でエラーが発生したプロセスがあった場合は、どのプロセスが誤りだったかの情報を含め例外を発生させる。
実装方法
全体のコード
Lambdaを起動すると、関数(handler)が動きます。
export const handler = async (event) => {
const errorItems = [];
const counts = await Promise.all(
[1, 2, 3, 4, 5].map(async(num) => {
try {
return await childProcess(num);
} catch (e) {
errorItems.push(num);
console.log(e);
}
})
);
console.log(counts)
if (errorItems.length >= 1) {
throw Error(`error exist: ${errorItems.join(',')}`)
}
const response = {
statusCode: 200,
body: JSON.stringify('Hello from Lambda!'),
};
return response;
};
const delay = (ms) => new Promise(resolve => setTimeout(resolve, ms));
const childProcess = async (num) => {
switch (num) {
case 1:
await delay(7000);
console.log('Finish process 1');
return 100;
case 2:
await delay(2000);
console.log('This is 2. Error will occur.');
await new Promise((resolve, reject) => {
setTimeout((resolve) => {
try{
throw Error('error')
} catch(e) {
reject(e);
}
}, 100);
});
return 200;
case 3:
await delay(3000);
console.log('Finish process 3');
return 3;
case 4:
await delay(3000);
throw Error('Error occur at 4');
case 5:
await delay(3000);
console.log('Finish process 5');
return 5;
default:
throw Error('Invalid Num');
}
}
実行結果は下記のようになっており、要件通り動いていることが確認できます。
Test Event Name
test
Response
{
"errorType": "Error",
"errorMessage": "error exist: 2,4",
"trace": [
"Error: error exist: 2,4",
" at Runtime.handler (file:///var/task/index.mjs:17:11)"
]
}
Function Logs
START RequestId: 287b3bfc-ec88-4a75-b45e-c17372e6249e Version: $LATEST
2023-12-01T03:03:52.574Z 287b3bfc-ec88-4a75-b45e-c17372e6249e INFO This is 2. Error Will occur.
2023-12-01T03:03:52.674Z 287b3bfc-ec88-4a75-b45e-c17372e6249e INFO Error: error
at Timeout._onTimeout (file:///var/task/index.mjs:42:19)
at listOnTimeout (node:internal/timers:573:17)
at process.processTimers (node:internal/timers:514:7)
2023-12-01T03:03:53.572Z 287b3bfc-ec88-4a75-b45e-c17372e6249e INFO Finish process 3
2023-12-01T03:03:53.573Z 287b3bfc-ec88-4a75-b45e-c17372e6249e INFO Error: Error occur at 4
at childProcess (file:///var/task/index.mjs:55:13)
at runNextTicks (node:internal/process/task_queues:60:5)
at listOnTimeout (node:internal/timers:540:9)
at process.processTimers (node:internal/timers:514:7)
at async file:///var/task/index.mjs:6:18
at async Promise.all (index 3)
at async Runtime.handler (file:///var/task/index.mjs:3:18)
2023-12-01T03:03:53.573Z 287b3bfc-ec88-4a75-b45e-c17372e6249e INFO Finish process 5
2023-12-01T03:03:57.576Z 287b3bfc-ec88-4a75-b45e-c17372e6249e INFO Finish process 1
2023-12-01T03:03:57.577Z 287b3bfc-ec88-4a75-b45e-c17372e6249e INFO [ 100, undefined, 3, undefined, 5 ]
2023-12-01T03:03:57.577Z 287b3bfc-ec88-4a75-b45e-c17372e6249e ERROR Invoke Error {"errorType":"Error","errorMessage":"error exist: 2,4","stack":["Error: error exist: 2,4"," at Runtime.handler (file:///var/task/index.mjs:17:11)"]}
END RequestId: 287b3bfc-ec88-4a75-b45e-c17372e6249e
REPORT RequestId: 287b3bfc-ec88-4a75-b45e-c17372e6249e Duration: 7056.60 ms Billed Duration: 7057 ms Memory Size: 128 MB Max Memory Used: 66 MB
コードの解説
全体の流れ
このLambda関数では、5つの非同期プロセスを同時に実行し、各プロセスの結果を配列に格納します。途中でエラーが発生した場合は、エラーが起きたプロセスの番号をエラーメッセージに含めて例外を発生させます。最終的に、全てのプロセスが完了した場合には成功のレスポンスを返します。
handler 関数
Promise.allを使用して、5つの非同期プロセスを同時に実行しています。各プロセスの結果とエラー処理のための例外が捕捉された場合、結果はcountsに格納され、エラーがある場合はerrorItemsにエラーが発生したプロセスの番号が追加されます。
全てのプロセスが完了した後に、エラーがあればそれを含めて例外を発生させ、なければ成功のレスポンスを返します。
map関数内のコールバック関数を非同期関数(aync)にしたうえで、childProccessの処理を待つように(await)することがポイントです。
export const handler = async (event) => {
const errorItems = [];
const counts = await Promise.all(
[1, 2, 3, 4, 5].map(async(num) => {
try {
return await childProcess(num);
} catch (e) {
errorItems.push(num);
console.log(e);
}
})
);
console.log(counts)
if (errorItems.length >= 1) {
throw Error(`error exist: ${errorItems.join(',')}`)
}
const response = {
statusCode: 200,
body: JSON.stringify('Hello from Lambda!'),
};
return response;
};
childProcess 関数
各非同期プロセスの処理が実装されています。ここでは、指定された番号に基づいて非同期処理を行います。例えば、delay関数を使用して一定時間待機し、その後に成功またはエラーを返すような処理を記述しています。
const childProcess = async (num) => {
switch (num) {
case 1:
await delay(7000);
console.log('Finish process 1');
return 100;
case 2:
await delay(2000);
console.log('This is 2. Error Will occur.');
await new Promise((resolve, reject) => {
setTimeout((resolve) => {
try{
throw Error('error')
} catch(e) {
reject(e);
}
}, 100);
});
return 200;
case 3:
await delay(3000);
console.log('Finish process 3');
return 3;
case 4:
await delay(3000);
console.log('Finish process 4');
return 4;
case 5:
await delay(3000);
console.log('Finish process 5');
return 5;
default:
throw Error('Invalid Num');
}
}
rejectを使ったエラーハンドリング
2つ目のプロセスのエラーハンドリングは少し特殊です。setTimeoutのようなPromiseを返さない非同期関数のエラーハンドリングでは、Promiseを明示的に作成した上で、rejectする必要があります。
rejectせずに例外を発生させると、他のchildProccessも止めてしまうことになります。
case 2:
await delay(2000);
console.log('This is 2. Error Will occur.');
await new Promise((resolve, reject) => {
setTimeout((resolve) => {
try{
throw Error('error')
} catch(e) {
reject(e);
}
}, 100);
});
return 200;
実行結果の解説
実行結果では、プロセス2,4でエラーが発生しているため、errorItemsに2,4が追加され、最終的に例外が発生しています。全体の処理中にエラーが発生しても、他のプロセスは最後まで実行されることが確認できます。
Test Event Name
test
Response
{
"errorType": "Error",
"errorMessage": "error exist: 2,4",
"trace": [
"Error: error exist: 2,4",
" at Runtime.handler (file:///var/task/index.mjs:17:11)"
]
}
Function Logs
START RequestId: 287b3bfc-ec88-4a75-b45e-c17372e6249e Version: $LATEST
2023-12-01T03:03:52.574Z 287b3bfc-ec88-4a75-b45e-c17372e6249e INFO This is 2. Error Will occur.
2023-12-01T03:03:52.674Z 287b3bfc-ec88-4a75-b45e-c17372e6249e INFO Error: error
at Timeout._onTimeout (file:///var/task/index.mjs:42:19)
at listOnTimeout (node:internal/timers:573:17)
at process.processTimers (node:internal/timers:514:7)
2023-12-01T03:03:53.572Z 287b3bfc-ec88-4a75-b45e-c17372e6249e INFO Finish process 3
2023-12-01T03:03:53.573Z 287b3bfc-ec88-4a75-b45e-c17372e6249e INFO Error: Error occur at 4
at childProcess (file:///var/task/index.mjs:55:13)
at runNextTicks (node:internal/process/task_queues:60:5)
at listOnTimeout (node:internal/timers:540:9)
at process.processTimers (node:internal/timers:514:7)
at async file:///var/task/index.mjs:6:18
at async Promise.all (index 3)
at async Runtime.handler (file:///var/task/index.mjs:3:18)
2023-12-01T03:03:53.573Z 287b3bfc-ec88-4a75-b45e-c17372e6249e INFO Finish process 5
2023-12-01T03:03:57.576Z 287b3bfc-ec88-4a75-b45e-c17372e6249e INFO Finish process 1
2023-12-01T03:03:57.577Z 287b3bfc-ec88-4a75-b45e-c17372e6249e INFO [ 100, undefined, 3, undefined, 5 ]
2023-12-01T03:03:57.577Z 287b3bfc-ec88-4a75-b45e-c17372e6249e ERROR Invoke Error {"errorType":"Error","errorMessage":"error exist: 2,4","stack":["Error: error exist: 2,4"," at Runtime.handler (file:///var/task/index.mjs:17:11)"]}
END RequestId: 287b3bfc-ec88-4a75-b45e-c17372e6249e
REPORT RequestId: 287b3bfc-ec88-4a75-b45e-c17372e6249e Duration: 7056.60 ms Billed Duration: 7057 ms Memory Size: 128 MB Max Memory Used: 66 MB
まとめ
このLambda関数では、非同期プロセスの同時実行とエラー処理を行えるように実装しました。Promise.allを使って簡潔に複数の非同期処理を同時に実行する方法や、エラーが発生した場合の例外処理の考え方が分からない方の一助になればと思っています。