きっかけ
AWS CloudWatchLogs ロググループに保存されたログファイルを任意の時間でS3に出力したかったのですがLambdaを使用すると言語の選択や運用管理までのコストが大きいと思い
言語に左右されない方法としてStep FunctionsとAthenaで実装してみました。
最終的な必要情報
AWS Step Functionsでロググループのログエクスポートを実行する際は以下のような情報が必要になります。
{
"destination": "string",
"destinationPrefix": "string",
"from": number,
"logGroupName": "string",
"to": number
}
上記のfrom
とto
を変更させることで任意の時間のみの出力を行うことができます。
今回は実行時点から最新までをエクスポートするので、fromのみを動的に変更できるよう進めます。
構成
Athena実行用のワークグループの作成
Athenaのログ保管用のS3作成
以下のように、名前以外デフォルトのバケットを作成してください。
検証用なのでデフォルトの状態ですが、
実際は組織のログ保管要件に従って、ライフサイクルの設定を行なってください。
- 作成したバケットに対して、CloudWatch Logsからの通信を許可するためのバケットポリシーを設定します
以下のような設定を入れてください。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "logs.us-east-1.amazonaws.com"
},
"Action": [
"s3:PutObject",
"s3:GetBucketAcl"
],
"Resource": [
"arn:aws:s3:::[作成したバケット名]",
"arn:aws:s3:::[作成したバケット名]/*"
]
}
]
}
Athena実行用のワークグループ作成
Athenaログ保管用のS3バケットの作成が完了したので、Athenaワークグループの作成を行います。
以下のように、ワークグループ名を入れます。
また、クエリ結果の設定も必要になるので、
先ほど作成したバケットを参照して、入力してあげてください。
※ここで設定しなくても、クエリを実行するときに設定が必要になります。
ワークフローの作成
コードでの作成
まず先にコードを置いておきます。
実際にStep Functionsに入れてみてください。
※EventBridgeで実行できるようにtimestampに対応済みのコードです。
ワークグループ名と出力先バケット、出力元ロググループについては各々設定をしてください。
Step Functions用JSONPath
{
"Comment": "logexport-sfn",
"QueryLanguage": "JSONPath",
"StartAt": "Date Split",
"States": {
"Athena Exec Calc Result": {
"Next": "Calc Result Check",
"Parameters": {
"QueryExecutionId.$": "$.startAthenaQueryExecutionResult.QueryExecutionId"
},
"Resource": "arn:aws:states:::athena:getQueryExecution",
"ResultSelector": {
"queryState.$": "$.QueryExecution.Status.State"
},
"Type": "Task",
"ResultPath": "$.GetQueryExecutionResult"
},
"Athena Exec Calc Wait": {
"Next": "Athena Exec Calc Result",
"Seconds": 1,
"Type": "Wait"
},
"Athena Exec Date Calc": {
"Next": "Athena Exec Calc Wait",
"Parameters": {
"QueryString.$": "States.Format('SELECT timestamp \\'{}\\' - interval \\'60\\' minute', $.date)",
"WorkGroup": "[作成したAthenaのワークグループ名]"
},
"Resource": "arn:aws:states:::athena:startQueryExecution",
"ResultPath": "$.startAthenaQueryExecutionResult",
"Type": "Task"
},
"Athena Exec Unixtime": {
"Next": "Athena Exec Unixtime wait",
"Parameters": {
"QueryString.$": "States.Format('SELECT cast(to_unixtime(timestamp \\'{}\\') as integer)',$.GetAthenaQueryResults.ResultSet.Rows[1].Data[0].VarCharValue)",
"WorkGroup": "[作成したAthenaのワークグループ名]"
},
"Resource": "arn:aws:states:::athena:startQueryExecution",
"ResultPath": "$.startAthenaQueryExecutionResult",
"Type": "Task"
},
"Athena Exec Unixtime Result": {
"Next": "Unixtime Result Check",
"Parameters": {
"QueryExecutionId.$": "$.startAthenaQueryExecutionResult.QueryExecutionId"
},
"Resource": "arn:aws:states:::athena:getQueryExecution",
"ResultPath": "$.GetQueryExecutionResult",
"ResultSelector": {
"queryState.$": "$.QueryExecution.Status.State"
},
"Type": "Task"
},
"Athena Exec Unixtime wait": {
"Next": "Athena Exec Unixtime Result",
"Seconds": 1,
"Type": "Wait"
},
"Calc Result": {
"Next": "Athena Exec Unixtime",
"Parameters": {
"MaxResults": 10,
"QueryExecutionId.$": "$.startAthenaQueryExecutionResult.QueryExecutionId"
},
"Resource": "arn:aws:states:::athena:getQueryResults",
"ResultPath": "$.GetAthenaQueryResults",
"Type": "Task"
},
"Calc Result Check": {
"Choices": [
{
"Next": "Athena Exec Calc Wait",
"Or": [
{
"StringEquals": "QUEUED",
"Variable": "$.GetQueryExecutionResult.queryState"
},
{
"StringEquals": "RUNNING",
"Variable": "$.GetQueryExecutionResult.queryState"
}
]
}
],
"Default": "Calc Result",
"Type": "Choice"
},
"CreateExportTask": {
"Next": "Logs Export Task Wait",
"Parameters": {
"Destination": "[出力先バケット名]",
"DestinationPrefix.$": "States.Format('{}',$.date)",
"From.$": "States.StringToJson($.GetAthenaQueryResults.queryresult)",
"LogGroupName": "[出力元ロググループ名]",
"To": 7258118400
},
"Resource": "arn:aws:states:::aws-sdk:cloudwatchlogs:createExportTask",
"ResultPath": "$.createExportTaskResult",
"Type": "Task"
},
"Date Binding": {
"Next": "Athena Exec Date Calc",
"Parameters": {
"date.$": "States.Format('{} {}', $.SplittedString[0],$.SplittedString[1])"
},
"Type": "Pass"
},
"Date Split": {
"Next": "Date Binding",
"Parameters": {
"SplittedString.$": "States.StringSplit($.date, 'TZ')"
},
"Type": "Pass"
},
"DescribeExportTasks": {
"Next": "ExportTaskResult",
"Parameters": {
"TaskId.$": "$.createExportTaskResult.TaskId"
},
"Resource": "arn:aws:states:::aws-sdk:cloudwatchlogs:describeExportTasks",
"ResultPath": "$.getExportTaskResults",
"Type": "Task"
},
"ExportTaskResult": {
"Choices": [
{
"Next": "Logs Export Task Wait",
"Or": [
{
"StringEquals": "RUNNING",
"Variable": "$.getExportTaskResults.ExportTasks[0].Status.Code"
},
{
"StringEquals": "PENDING",
"Variable": "$.getExportTaskResults.ExportTasks[0].Status.Code"
}
]
}
],
"Default": "成功",
"Type": "Choice"
},
"Logs Export Task Wait": {
"Next": "DescribeExportTasks",
"Seconds": 5,
"Type": "Wait"
},
"Unixtime Result Check": {
"Choices": [
{
"Next": "Athena Exec Unixtime wait",
"Or": [
{
"StringEquals": "QUEUED",
"Variable": "$.GetQueryExecutionResult.queryState"
},
{
"StringEquals": "RUNNING",
"Variable": "$.GetQueryExecutionResult.queryState"
}
]
}
],
"Default": "Unixtime Results",
"Type": "Choice"
},
"Unixtime Results": {
"Next": "CreateExportTask",
"Parameters": {
"MaxResults": 10,
"QueryExecutionId.$": "$.startAthenaQueryExecutionResult.QueryExecutionId"
},
"Resource": "arn:aws:states:::athena:getQueryResults",
"ResultPath": "$.GetAthenaQueryResults",
"ResultSelector": {
"queryresult.$": "$.ResultSet.Rows[1].Data[0].VarCharValue"
},
"Type": "Task"
},
"成功": {
"Type": "Succeed"
}
}
}
マネジメントコンソール側での作成方法
JSONNataではなくJSONPathで作成を進めます。
1. Athenaクエリの実行用ステップの追加(日付の計算)
startの直下に「Athena StartQueryExecution」で始まるステップを追加します。
右側のペイン→設定→APIパラメータの以下コードを変更します。
{
"QueryString": "myQueryString",
"WorkGroup": "primary"
}
下記に変更してあげてください。
{
"QueryString.$": "States.Format('SELECT timestamp \\'{}\\' - interval \\'60\\' minute', $.date)",
"WorkGroup": "[作成したAthenaのワークグループ名]"
}
ここでは、dateという引数に入った日時から何分間引くかを計算しています。
※hourにすることで時でも計算可能ですが、細かく設定できるので分で引いています。
また、右側ペイン→出力→[ResultPath を使用して元の入力を出力に追加]を選択し、以下を設定します。
元の入力と結果を結合
$.startAthenaQueryExecutionResult
このようにすることで、実行されたときの返り値がstartAthenaQueryExecutionResultに保存されます。
2.クエリの待機時間の追加
Athenaクエリの実行の完了を待つための待機時間を追加します。
wait stateを1のステップ後に追加してください。
右側ペイン→設定→秒を任意の秒に設定します。
ここでは、爆速で完了するとして1秒で設定しています。
3.クエリ実行ステータスの確認の追加
1(StartQueryExecution)で実行したクエリの実行ステータスを取得するためにステップを追加します。
2(wait)の後に「Athena GetQueryExecution」を追加してください。
右側のペイン→設定→APIパラメータの以下コードを変更します。
{
"QueryExecutionId": "queryExecutionId"
}
下記に変更してあげてください。
{
"QueryExecutionId.$": "$.startAthenaQueryExecutionResult.QueryExecutionId"
}
ここでは、1の実行したクエリIDを使用しステータスを取得しています。
また、右側ペイン→出力→[ResultPath を使用して元の入力を出力に追加]を選択し、以下を設定します。
元の入力と結果を結合
$.GetQueryExecutionResult
このようにすることで、実行されたときの返り値がGetQueryExecutionResultに保存されます。
追加で、[ResultSelector を使用して結果を変換]についても✓を入れ、以下を設定します。
{
"queryState.$": "$.QueryExecution.Status.State"
}
4.実行ステータスの確認
Athena環境側の問題などにより、クエリが止まっていることやエラーを起こしている可能性があるため、ここで確認するためのステップを追加します。
3(Athena GetQueryExecution)の後に「Choice」を追加してください。
Rule1には、以下のようにorで作成し2(Wait)に繋いでください。
$.GetQueryExecutionResult.queryState == "QUEUED" or $.GetQueryExecutionResult.queryState == "RUNNING"
DefaultRuleについては、この後追加します
5.Athenaクエリ結果の取得
Athenaが実行完了している状態なので、結果を取得します。
4(choice)のDefaultに[Athena GetQueryResults]を追加します。
右側のペイン→設定→APIパラメータの以下コードを変更します。
{
"MaxResults": 10,
"NextToken": "nextToken",
"QueryExecutionId": "queryExecutionId"
}
下記に変更してあげてください。
{
"MaxResults": 10,
"QueryExecutionId.$": "$.startAthenaQueryExecutionResult.QueryExecutionId"
}
ここでは、1の実行したクエリIDを使用しクエリ結果を取得しています。
また、右側ペイン→出力→[ResultPath を使用して元の入力を出力に追加]を選択し、以下を設定します。
元の入力と結果を結合
$.GetAthenaQueryResults
6. Unixtime変換をAthenaで行う
ここまででAthenaクエリの実行がStep Functionsで可能になりました。
同様にUnixtimeへの変換をAthenaで実装します。
1~5を再度追加するので変更箇所だけ記載します。
6-1. Athena StartQueryExecutionの設定
使用するクエリは以下です。
{
"QueryString.$": "States.Format('SELECT cast(to_unixtime(timestamp \\'{}\\') as integer)',$.GetAthenaQueryResults.ResultSet.Rows[1].Data[0]F.VarCharValue)",
"WorkGroup": "[作成したAthenaのワークグループ名]"
}
残りの設定は、1と同一です。
6-2. 2~5を複製し同様に接続する。
6-1を追加後は、2~5(Athena StartQueryExecution ~ Athena GetQueryResults)の内容を同様に追加していきます。
追加すると以下のような、ステップになります。
7. createExportTaskの追加
本題のcreateExportTaskを追加します。
6(Athena GetQueryResults)の後に[CreateExportTask]を追加します。
右側のペイン→設定→APIパラメータの以下コードを変更します。
{
"Destination": "MyData",
"From": 1234,
"LogGroupName": "MyData",
"To": 1234
}
下記に変更してあげてください。
{
"Destination": "[出力先バケット名]",
"DestinationPrefix.$": "States.Format('{}',$.date)",
"From.$": "States.StringToJson($.GetAthenaQueryResults.queryresult)",
"LogGroupName": "[出力元ロググループ名]",
"To": 7258118400
}
Toは実行時間までの最新でよいので、ベタ打ちです。
ここでは2200-01-01 00:00:00を表しています。
ここまでで実行可能な状態です。
以下については、createExportTaskの完了を確認&待機するためのステップを追加していきます。
以降に使用するので、設定を追加します。
右側ペイン→出力→[ResultPath を使用して元の入力を出力に追加]を選択し、以下を設定します。
元の入力と結果を結合
$.createExportTaskResult
8. exportTaskが完了するまで待機させる(appendix)
waitとDescribeExportTasksを追加します。
waitについては、待機時間5秒としています。
DescribeExportTaskの設定については、以下に記載していきます。
- 右側のペイン→設定→APIパラメータを以下コードに変更します
{
"TaskId.$": "$.createExportTaskResult.TaskId"
}
- 右側ペイン→出力→[ResultPath を使用して元の入力を出力に追加]を選択し、以下を設定します
元の入力と結果を結合
$.getExportTaskResults
9.createExportTaskのステータス確認(appendix)
createExportTaskのステータスを[DescribeExportTask]で取得したので、waitに戻すかのchoiceを追加します。
Choiceのルール1の設定は以下です。
$.getExportTaskResults.ExportTasks[0].Status.Code == "RUNNING" or $.getExportTaskResults.ExportTasks[0].Status.Code == "PENDING"
RUNNINGとPENDING状態の時に、Waitに戻してあげたいので、Waitにつなげます。
ここまででExportTaskの処理が完了しているので、Defaultには成功のステップを追加します。
IAM周りの設定
Step Functionsの用意は完了しました。
[作成]ボタンを押すと以下のようなIAM周りの設定が自動作成するかの画面が表示されます。
そのまま作成でも良いですしX-Ray周りは過剰なので、別途作成でも問題ないです。
CloudWatch Logsはポリシーを手動で追加する必要があるので、以下の権限をStep Functionsの実行ロールに追加してください。
"logs:CreateExportTask",
"logs:DescribeExportTasks"
動作確認
Step Functions側
以下のような値をStep Functionsに渡してあげると、実行可能になるかと思います。
{
"date": "2025-01-13 18:08:36"
}
実際はEventBridgeなどで実行することになり、
受け渡される値が"2025-01-13T18:08:36Z"になりTZが不要なので、TZを削除するstepをstartの後に追加してあげれば、問題なく動作します。
※コード上は追加済みです。
S3バケット側
CloudWatchロググループに実ログを出力していないので、ログは表示されないですが、
ログ出力前の書き込みテストが完了し、出力されていることが確認できます。
まとめ
Athenaを計算場所として、Step Functionsを作成してみました。
問題なく動作することができたので、Lambdaを使いたくないが簡易的な計算を要する時は検討してみると良さそうだと思いました。
今回は旧式のJSONPathでの作成でありAWS推奨であるJSONNataではないので時が来れば変更する必要があると思います。
今後、JSONNataでの実装をやってみたいなと思いました。
引き続きやっていきます。
参考