概要
少し前にリリースされたStep FunctionsのSDK Service Integrationによって、Step Functionsのステートマシンから直接呼び出せるAWSサービスが劇的に増えました。
また、Workflow StudioによってGUIでフローを組み立てられるのも便利そう、ということで、
CloudWatch Logs Insightを利用したモニタリングを定期実行して監視するステートマシンを作ってみました。
参考
- StepFunctionsのSDK統合: https://aws.amazon.com/jp/about-aws/whats-new/2021/09/aws-step-functions-200-aws-sdk-integration/
- Workflow Studio: https://aws.amazon.com/jp/blogs/news/new-aws-step-functions-workflow-studio-a-low-code-visual-tool-for-building-state-machines/
やりたいこと
- Lambdaのログを監視する
- Node.jsのLambdaコード内で
console.error()
によってエラーログを出している - CloudWatch Logs Insightでロググループに対して、エラーを検出するクエリを定期的に実行する
- クエリ結果をチェックし、エラーを検出した場合にSNSに通知する
- Node.jsのLambdaコード内で
Step Functions
今回作成したステートマシンは以下のような形になります
このステートマシンをEventBridgeのスケジュール実行で呼び出すのがゴール。
CloudWatch Logs Insight
CloudWatch Logs InsightはCloudWatch Logsのロググループに対してクエリを実行することでログ分析が行える機能です。
APIは非同期APIとなっており、StartQuery
でクエリ処理を開始してGetQueryResults
でクエリの実行ステータスと実行結果を取得します。
StartQuery
StartQuery
APIは対象ロググループ、検索する時間範囲を指定してクエリを実行します。
{
"StartTime.$": "$.StartTime",
"EndTime.$": "$.EndTime",
"LogGroupName": "/aws/lambda/targetFunction",
"QueryString": "filter !ispresent(@type) | parse @message /^(?<date>\\S+)\\s+(?<requestId>\\S+)\\s+(?<logLevel>\\S+)\\s(?<logMessage>\\S+)$/ | filter logLevel = \"ERROR\" | stats count() as errorCount"
}
QueryString詳細
参考: Insightで実行可能なクエリはこちら。
Lambdaが組み込みで出力しているログについては@type
を始めとしていくつかのフィールドが自動的に認識できる状態になっています。
それを利用してfilter !ispresent(@type)
とすることで組み込みログを除外して、コード内から出しているログのみに絞り込んでいます。
Node.jsのコード内からconsole.log()
で出力したログは以下のようなフォーマットでCloudWatch Logsに出力されます。
2021-12-05T05:40:42.094Z 3c83a643-f1fc-439f-9767-45c8357adee9 INFO messageBody
これをquery内のパイプで正規表現を利用してdate,requestId,logLevel,logMessageの4カラムに分解します。
parse @message /^(?<date>\\S+)\\s+(?<requestId>\\S+)\\s+(?<logLevel>\\S+)\\s(?<logMessage>\\S+)$/
正規表現でマッチしたカラムは以降のパイプで活用できるので、filter logLevel = \"ERROR\"
でERRORログのみを絞り込みます。
最後にstats count()
でマッチした行数をカウントします。
このサンプルではERRORログの件数をカウントしているだけなのでメトリクスフィルターを使う方が素直な感じですが、指定範囲のログに対して任意のクエリを実行して統計関数を実行できるのは色々と幅が広がりそうです。
Build Search Time Range Lambda
検索範囲を指定するStartTimeとEndTimeはlong型のunix timeで指定する必要があり、EventBridgeのinput.jsonやStateMachineのExecutionContextからはうまく取り出せないので、前段にStartTimeとEndTimeを構築するLambdaを挟んでいます。
exports.handler = async (event) => {
const executeTime = Date.parse(event.Input.time); // EventBridgeの実行開始時間
const delay = 300; // CloudWatch Logs Insightでログが検索できるようになるまでタイムラグがあるので、検索対象時間をずらす
const range = 3600;
const executeSec = Math.floor(executeTime / 1000);
const response = {
StartTime: executeSec - range - delay,
EndTime: executeSec - delay,
};
return response;
};
このLambda Functionは事前に作成しておき、Lambda Invoke APIを組み込みます。
このLambdaのレスポンスがStartQuery
ステートへのInputになります。
GetQueryResults
StartQuery
のレスポンスとしてQueryId
が返ってくるため、このQueryIdを利用してGetQueryResultsを実行します。
タイミングによってはクエリがまだ実行中の可能性があるので、ステータスをチェックしてクエリが未完了であればWaitへ戻ってループさせます。
この時の注意として、GetQueryResultsのレスポンスにはQueryIdが含まれないため、ループを回って戻る時のためにQueryIdをきちんと持ち回る必要があります。
このためには、状態の「出力」タブで"ResultPathを使用して元の入力を出力に追加"してあげます。
この設定を加えることでInputPathの一部にレスポンスを組み込んだ値をレスポンスとして次の状態に渡すことができます。
{
"QueryId": "18351df6-b756-4ee3-8c48-0a88a469029f",
"Result": {
"Results": [
[
{
"Field": "errorCount",
"Value": "1"
}
]
],
"Statistics": {
"BytesScanned": 487,
"RecordsMatched": 1,
"RecordsScanned": 4
},
"Status": "Complete"
}
}
CheckQueryCompleted
GetQueryResultsの実行結果からステータスをチェックして、完了していれば次へ進むように分岐していきます。
CheckErrorCount
最後にクエリの実行結果からエラーが検出されていればSNSで通知して完了です。
今回のクエリ的にはERROR
という文字列でfilterした結果が0件だとResultsのArrayが空配列として返ってくるため、$.Result.Results[0] is present
という条件で、要素が存在した場合にはエラーが1件以上あったとして分岐しています。
ここはAND条件などでもう少し複雑なRuleも定義できるので通知条件によって調整が可能そうです。
SNS Publish
送信するtopicを指定してメッセージを送信します。
InputPathを利用して送信先topicを切り替えることもできるようです。
EventBridge
最後にEventBridgeからCron式で1時間毎にステートマシンを実行してあげれば完成です。
まとめ
Workflow Studioの具体的な使い方については以下の記事が参考になりました。
https://qiita.com/imai_amazia/items/45484d73fdcc5d188ada
操作はかなり直感的に使えるのでAPIの使い方さえ押さえておけばそんなにハマリポイントはなさそうです。
StepFunctions特有のInputPath/OutputPathの癖はありますが、ガイドラインが親切なので直接書いていた頃よりはずっと快適。
このサンプルで実装したコードは検索範囲を指定するためのLambdaと、CloudWatch Logs Insightのクエリくらいでした。
かなりの部分ノーコードでワークフローが組み立てられるので応用の幅はすごく広そうです。
今回のステートマシンではサボりましたが、GetQueryResultsのステータスが"Failed"になるなど予期しないエラーが発生した場合にまた別のSNSに通知するといったエラー制御も簡単にできるのがとてもよいです。
動的に入力パラメータを調整するあたりはまだどうしても難しい部分なので、このあたりをもっと簡単に組めると嬉しいですね。
CDKのaws-cdk_aws-stepfunctions-tasksに組み込まれているEvaluateExpression的なものをWorkflow Studioから組み込めると幸せそうな予感。