1
Help us understand the problem. What are the problem?

posted at

Step FunctionsがAWS Serviceを直接実行できるようになったのでステートマシンでCloudWatch Logs Insightによるログ監視を実行してみる

概要

少し前にリリースされたStep FunctionsのSDK Service Integrationによって、Step Functionsのステートマシンから直接呼び出せるAWSサービスが劇的に増えました。
また、Workflow StudioによってGUIでフローを組み立てられるのも便利そう、ということで、
CloudWatch Logs Insightを利用したモニタリングを定期実行して監視するステートマシンを作ってみました。

参考

やりたいこと

  • Lambdaのログを監視する
    • Node.jsのLambdaコード内でconsole.error()によってエラーログを出している
    • CloudWatch Logs Insightでロググループに対して、エラーを検出するクエリを定期的に実行する
    • クエリ結果をチェックし、エラーを検出した場合にSNSに通知する

Step Functions

今回作成したステートマシンは以下のような形になります

image.png

このステートマシンをEventBridgeのスケジュール実行で呼び出すのがゴール。

CloudWatch Logs Insight

CloudWatch Logs InsightはCloudWatch Logsのロググループに対してクエリを実行することでログ分析が行える機能です。

APIは非同期APIとなっており、StartQueryでクエリ処理を開始してGetQueryResultsでクエリの実行ステータスと実行結果を取得します。

StartQuery

StartQueryAPIは対象ロググループ、検索する時間範囲を指定してクエリを実行します。

image.png

APIパラメータ.json
{
  "StartTime.$": "$.StartTime",
  "EndTime.$": "$.EndTime",
  "LogGroupName": "/aws/lambda/targetFunction",
  "QueryString": "filter !ispresent(@type) | parse @message /^(?<date>\\S+)\\s+(?<requestId>\\S+)\\s+(?<logLevel>\\S+)\\s(?<logMessage>\\S+)$/ | filter logLevel = \"ERROR\" | stats count() as errorCount"
}

QueryString詳細

参考: Insightで実行可能なクエリはこちら

Lambdaが組み込みで出力しているログについては@typeを始めとしていくつかのフィールドが自動的に認識できる状態になっています。
それを利用してfilter !ispresent(@type)とすることで組み込みログを除外して、コード内から出しているログのみに絞り込んでいます。

Node.jsのコード内からconsole.log()で出力したログは以下のようなフォーマットでCloudWatch Logsに出力されます。

2021-12-05T05:40:42.094Z 3c83a643-f1fc-439f-9767-45c8357adee9 INFO messageBody

これをquery内のパイプで正規表現を利用してdate,requestId,logLevel,logMessageの4カラムに分解します。
parse @message /^(?<date>\\S+)\\s+(?<requestId>\\S+)\\s+(?<logLevel>\\S+)\\s(?<logMessage>\\S+)$/

正規表現でマッチしたカラムは以降のパイプで活用できるので、filter logLevel = \"ERROR\"でERRORログのみを絞り込みます。

最後にstats count()でマッチした行数をカウントします。

このサンプルではERRORログの件数をカウントしているだけなのでメトリクスフィルターを使う方が素直な感じですが、指定範囲のログに対して任意のクエリを実行して統計関数を実行できるのは色々と幅が広がりそうです。

Build Search Time Range Lambda

検索範囲を指定するStartTimeとEndTimeはlong型のunix timeで指定する必要があり、EventBridgeのinput.jsonやStateMachineのExecutionContextからはうまく取り出せないので、前段にStartTimeとEndTimeを構築するLambdaを挟んでいます。

index.js
exports.handler = async (event) => {
    const executeTime = Date.parse(event.Input.time); // EventBridgeの実行開始時間
    const delay = 300; // CloudWatch Logs Insightでログが検索できるようになるまでタイムラグがあるので、検索対象時間をずらす
    const range = 3600;
    const executeSec = Math.floor(executeTime / 1000);
    const response = {
        StartTime: executeSec - range - delay,
        EndTime: executeSec - delay,
    };
    return response;
};

このLambda Functionは事前に作成しておき、Lambda Invoke APIを組み込みます。
image.png

このLambdaのレスポンスがStartQueryステートへのInputになります。

GetQueryResults

StartQueryのレスポンスとしてQueryIdが返ってくるため、このQueryIdを利用してGetQueryResultsを実行します。

image.png

タイミングによってはクエリがまだ実行中の可能性があるので、ステータスをチェックしてクエリが未完了であればWaitへ戻ってループさせます。
この時の注意として、GetQueryResultsのレスポンスにはQueryIdが含まれないため、ループを回って戻る時のためにQueryIdをきちんと持ち回る必要があります。

このためには、状態の「出力」タブで"ResultPathを使用して元の入力を出力に追加"してあげます。
image.png
この設定を加えることでInputPathの一部にレスポンスを組み込んだ値をレスポンスとして次の状態に渡すことができます。

response.json
{
  "QueryId": "18351df6-b756-4ee3-8c48-0a88a469029f",
  "Result": {
    "Results": [
      [
        {
          "Field": "errorCount",
          "Value": "1"
        }
      ]
    ],
    "Statistics": {
      "BytesScanned": 487,
      "RecordsMatched": 1,
      "RecordsScanned": 4
    },
    "Status": "Complete"
  }
}

CheckQueryCompleted

GetQueryResultsの実行結果からステータスをチェックして、完了していれば次へ進むように分岐していきます。

image.png
image.png

CheckErrorCount

最後にクエリの実行結果からエラーが検出されていればSNSで通知して完了です。

image.png
image.png

今回のクエリ的にはERRORという文字列でfilterした結果が0件だとResultsのArrayが空配列として返ってくるため、$.Result.Results[0] is presentという条件で、要素が存在した場合にはエラーが1件以上あったとして分岐しています。
ここはAND条件などでもう少し複雑なRuleも定義できるので通知条件によって調整が可能そうです。

SNS Publish

image.png

送信するtopicを指定してメッセージを送信します。
InputPathを利用して送信先topicを切り替えることもできるようです。

EventBridge

最後にEventBridgeからCron式で1時間毎にステートマシンを実行してあげれば完成です。
image.png

まとめ

Workflow Studioの具体的な使い方については以下の記事が参考になりました。
https://qiita.com/imai_amazia/items/45484d73fdcc5d188ada

操作はかなり直感的に使えるのでAPIの使い方さえ押さえておけばそんなにハマリポイントはなさそうです。
StepFunctions特有のInputPath/OutputPathの癖はありますが、ガイドラインが親切なので直接書いていた頃よりはずっと快適。

このサンプルで実装したコードは検索範囲を指定するためのLambdaと、CloudWatch Logs Insightのクエリくらいでした。
かなりの部分ノーコードでワークフローが組み立てられるので応用の幅はすごく広そうです。
今回のステートマシンではサボりましたが、GetQueryResultsのステータスが"Failed"になるなど予期しないエラーが発生した場合にまた別のSNSに通知するといったエラー制御も簡単にできるのがとてもよいです。

動的に入力パラメータを調整するあたりはまだどうしても難しい部分なので、このあたりをもっと簡単に組めると嬉しいですね。
CDKのaws-cdk_aws-stepfunctions-tasksに組み込まれているEvaluateExpression的なものをWorkflow Studioから組み込めると幸せそうな予感。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Sign upLogin
1
Help us understand the problem. What are the problem?