はじめに
AWS Step FunctionsはAWSサービスを利用したワークフローを作成できるサービスです。
Athenaで定期的にクエリを実行したい、という際に使えるようだったので、以下試した結果を記載します。
1 今回の構成
今回は以下の構成・ワークフローで、定期的にクエリを実行させます。
上記のワークフローの中で、Athenaからクエリ結果を取得するまでは以下の流れになっています。
1.「Athena StartQueryExecution」でAthenaでクエリを実行する(クエリ実行IDを取得)
2.「Wait」で一定時間待機する(クエリ結果取得を待つ)
3.「Athena GetQueryExecution」で、クエリ実行IDを元にAthenaのクエリ実行結果を取得する
4.「Choice」でクエリ実行が完了していない場合は「Wait」に戻って待機、そうでなければ次へ、と分岐させる
5.「Athena GetQueryResults」で、クエリ実行IDを元にクエリの実行結果を取得する
2 クエリを実行するステートマシンを作成する
クエリを実際に実行させるステートマシンを作成します。
※今回、Athenaの設定は省略しています。
2-1 ステートマシンの作成
1.Step Functionsのマネジメントコンソールで「ステートマシンの作成」をクリックします。
2.今回は作成方法を「ワークフローを視覚的に設計」、タイプを「標準」として「次へ」をクリックします。
3.次の通り、ワークフローを作成していきます。
3.1.「Athena StartQueryExecution」を追加し、以下の通り設定します。
3.1.1. 設定タブのAPIパラメータを以下の通り記載します。
{
"QueryString.$":"$.query",
"WorkGroup": "AthenaのWorkGroup"
}
$.query
は、ステートマシン実行時query
を参照し、この値を実行するクエリとして扱います。
また、参照を行う場合はキーの末尾を.$
にします。
3.1.2.出力タブで「ResultPath を使用して元の入力を出力に追加」にチェックし、以下の通り設定します。
プルダウン | 入力欄 |
---|---|
Combine original input with result | $.startAthenaQueryExecutionResult |
これによって、「Athena StartQueryExecution」のreturnをstartAthenaQueryExecutionResult
として次のステップに追加で渡すことができるようになります。
3.2.「Wait」を追加します。
今回はデフォルト設定の5秒待機のままとします。
3.3.「Athena GetQueryExecution」を追加します。
3.3.1.設定タブのAPIパラメータを以下の通り記載します。
{
"QueryExecutionId.$": "$.startAthenaQueryExecutionResult.QueryExecutionId"
}
ここで「Athena StartQueryExecution」の実行結果として取得したクエリ実行IDを参照しています。
3.3.2.出力タブで「ResultSelector を使用して結果を変換」にチェックし、以下の通り設定します。
{
"queryState.$": "$.QueryExecution.Status.State"
}
これによって、「Athena GetQueryExecution」のreturnをフィルタリングして取得することができます。
ここでは、return中のQueryExecution.Status.State
をqueryState
として取得しています。
3.3.3.出力タブで「ResultPath を使用して元の入力を出力に追加」にチェックし、以下の通り設定します。
プルダウン | 入力欄 |
---|---|
Combine original input with result | $.GetQueryExecutionResult |
手順3.3.2で設定したqueryState
(クエリの実行状態)を「GetQueryExecutionResult」として次のステップに渡す出力に追加しています。
3.4.「Athena GetQueryResults」を追加します。
3.4.1.設定タブのAPIパラメータを以下の通り記載します。
{
"MaxResults": 10,
"QueryExecutionId.$": "$.startAthenaQueryExecutionResult.QueryExecutionId"
}
ここで再び「Athena StartQueryExecution」の実行結果として取得したクエリ実行IDを参照しています。
3.4.2.出力タブで「ResultPath を使用して元の入力を出力に追加」にチェックし、以下の通り設定します。
プルダウン | 入力欄 |
---|---|
Combine original input with result | $.getAthenaQueryResults |
クエリの実行結果を出力に渡しています。
3.5.「Choice」を追加します。
3.5.1.設定タブのChoice Rulesで、Default RuleのDefault Stateを「GetQueryResults」にします。
次に設定するRuleに合致しない場合は3.4で設定した「Athena GetQueryResults」に進みます。
3.5.2.設定タブのRule #1で「Edit conditions」をクリックし、以下のOR条件を追加します。
Not | Variable | Operator | Value |
---|---|---|---|
$.GetQueryExecutionResult.queryState | is equal to | String constant / QUEUED | |
$.GetQueryExecutionResult.queryState | is equal to | String constant / RUNNING |
クエリの実行が待機、実行中の場合を拾う条件です。
3.5.3.設定タブのRule #1のプルダウン「Then next state is:」で「Wait」を選択します。
3.6.「次へ」をクリックします。
4.生成されたコードを確認画面で、「次へ」をクリックします。
5.ステートマシン名を入力し、その他はデフォルトのママ「ステートマシンの作成」をクリックします。
2-2 動作確認
作成したステートマシンが正常に動作するか確認します。
1.ステートマシンの詳細画面で「実行の開始」をクリックします。
2.入力を以下の通り設定して「実行の開始」をクリックします。
{
"query": "※実行するクエリ"
}
3.実行の詳細画面で最後のステップまで成功することを確認します。
4.「Athena GetQueryResults」をクリックし、入力と出力タブで「出力」のgetAthenaQueryResults.ResultSet.Rows
にクエリ結果が出力されていることを確認します。
3 定期的にステートマシンを実行する
今回、ステートマシンの起動はEventBridgeを利用して行います。
1.EventBridgeのマネジメントコンソールで バス > ルールの順にクリックします。
2.「ルールを作成」をクリックします。
3.ルールの名前を入力し、ルールタイプを「スケジュール」にして「次へ」をクリックします。
4.今回は毎日9時に実行する想定で、以下の通りスケジュールパターンを設定します。
なお、cron式で曜日と日のフィールドは同時に指定できないので、一方を指定する場合は他方には?
を入れます。
5.ターゲットを選択画面で以下の通り選択します。
設定項目 | 設定値 |
---|---|
ターゲットタイプ | AWSのサービス |
ターゲットを選択 | Step Functionsステートマシン |
ステートマシン | ※作成したステートマシン |
実行ロール | この特定のリソースについて新しいロールを作成 |
6.ターゲットを選択の追加設定でターゲット入力設定の「定数(JSONテキスト)」を選択し、ステートマシンに渡すJSONを定義し、「次へ」をクリックします。
{
"query": "※実行するクエリ"
}
7.タグを設定画面で「次へ」をクリックします。
8.レビューと作成画面で「ルールの作成」をクリックします。
おわりに
Step Functionsを使うことで、コードをほぼ書かずに定期的にクエリを実行する仕組みを用意することできました。
今回の記事ではEventBridgeから固定のクエリ文を渡していましたが、EventBridgeから起動したLambdaで生成したコードを都度ステートマシンに渡したり、クエリの実行結果をLambdaで取得してほかのサービスに連携する、といったことも可能です。