Step Functionsの組み込み関数
今年の8月頃にStep Functionsに新しい組み込み関数が多数追加されました。
昨年のSDK統合と合わせ、これらの組み込み関数を用いることで柔軟にパラメータを組み立ててAWS APIを実行できるようになっています。
この組み込み関数の活用例として、Athenaの日次パーティション追加処理をStep Functionsで実装していきます。
お題
ここでは
- 日付に基づいたHive形式のパーティションが定義されたAthenaテーブルがある
-
ALTER TABLE ADD PARTITION
を用いてパーティションを追加する - このパーティション追加をStep Functionsを利用して、Event Bridgeのルールなどから日次実行する
ケースについて書いていきたいと思います。
対象となるパーティションはyear
,month
,day
の3つのキーを持ち、
s3://example-bucket/data/year=2022/month=12/day=02/
のようなHive形式でS3にデータが保管されています。
この場合、以下のようなクエリを構築してStartQueryExecution APIを実行することで指定の日付のパーティションを追加できます。
ALTER TABLE target_table ADD PARTITION (year = '2022', month = '12', day = '02')
このクエリを利用して、日付パラメータを動的に構築してタスク実行日のパーティションを追加する日次処理をStep Functionsを用いて作成していきます。
ステートマシン定義の作成
組み込み関数を利用して日付パラメータを動的に構築する
上記クエリをStep Functionsで日次実行する上でポイントとなるのは、クエリの日付パラメータの動的な組み立てです。
まずは現在時刻を把握したいためコンテキストから実行開始時刻を取得します。
"StartTime.$": "$$.Execution.StartTime"
//=> "2022-12-02T02:49:37.140Z"
コンテキストはタスク実行時のメタ情報で、$$
に格納されています。
コンテキストの詳細についてはこちらを参照してください。
これを組み込み関数States.StringSplit
を用いて日付と時刻に分離します。
"StartTimeArray.$": "States.StringSplit($.StartTime, 'T')"
//=> ["2022-12-02", "02:49:37.140Z"]
T
で分割した結果が配列で返ってくるため、States.ArrayGetItem
によって日付部分を取り出します。
"StartDate.$": "States.ArrayGetItem($.StartTimeArray, 0)"
//=> "2022-12-02"
同様の処理を日付に対して実行することで、年、月、日をそれぞれ取り出します。
{
"Year.$": "States.ArrayGetItem(States.StringSplit($.StartDate, '-'), 0)",
"Month.$": "States.ArrayGetItem(States.StringSplit($.StartDate, '-'), 1)",
"Day.$": "States.ArrayGetItem(States.StringSplit($.StartDate, '-'), 2)"
}
余談ですが、同じ要領でExecution.Idに対して分割抽出処理を行うことで実行環境のAWS AccoutIDを取得することも出来ます。AccountIDを使いたい場面は偶にあるので便利です。
{
"AccountId.$": "States.ArrayGetItem(States.StringSplit($$.Execution.Id, ':'), 4)"
}
Passタスク
ここまでの日付パラメータの構築を最初にPass
タスクで定義して、次のStartQueryExecution
を実行するタスクへのインプットとします。
"Extract Parameters": {
"Type": "Pass",
"Next": "Add Partition",
"Parameters": {
"Database.$": "$.Database",
"Table.$": "$.Table",
"WorkGroup.$": "$.WorkGroup",
"Year.$": "States.ArrayGetItem(States.StringSplit(States.ArrayGetItem(States.StringSplit($$.Execution.StartTime, 'T'), 0), '-'), 0)",
"Month.$": "States.ArrayGetItem(States.StringSplit(States.ArrayGetItem(States.StringSplit($$.Execution.StartTime, 'T'), 0), '-'), 1)",
"Day.$": "States.ArrayGetItem(States.StringSplit(States.ArrayGetItem(States.StringSplit($$.Execution.StartTime, 'T'), 0), '-'), 2)"
}
}
ここでは抽出した日付パラメータの他にDatabase
、Table
、WorkGroup
をタスク実行時の入力として受け取っています。
Pass
タスクの詳細についてはこちらを参照してください。
StartQueryExecutionを実行する
上記のインプットを受け取って StartQueryExecutionを実行するタスクを定義します。
StartQueryExecution
はAthenaに対して指定したクエリを実行してくれるAPIです。
今回実行したいQueryStringは以下のような記述になります。
"QueryString.$": "States.Format('ALTER TABLE {} ADD IF NOT EXISTS PARTITION (year = \\'{}\\', month = \\'{}\\', day = \\'{}\\')', $.Table, $.Year, $.Month, $.Day)"
統合タイプ
StartQueryExecution APIはOptimized Integrationsに対応しているため、Optimized IntegrationsとSDK Integrationsのどちらかを選ぶことができます。
Optimized Integrationsの場合、Run a Job(.sync)
によるジョブ実行モードにすることでタスク自身が非同期APIの結果を待って同期的にタスクを動作させることができます。
https://docs.aws.amazon.com/step-functions/latest/dg/connect-supported-services.html
1つのタスク内でエラーのリトライやタイムアウトを定義できるので、自前でループを回して結果を確認するようなループステップを省略する事ができ、状態定義がシンプルになります。
タスク定義
ADD PARTITIONを実行するタスク定義は以下のようになります。
"Add Partition": {
"Type": "Task",
"Resource": "arn:aws:states:::athena:startQueryExecution.sync",
"Parameters": {
"QueryString.$": "States.Format('ALTER TABLE {} ADD IF NOT EXISTS PARTITION (year = \\'{}\\', month = \\'{}\\', day = \\'{}\\')', $.Table, $.Year, $.Month, $.Day)",
"WorkGroup.$": "$.WorkGroup",
"QueryExecutionContext": {
"Database.$": "$.Database"
}
},
"TimeoutSeconds": 120,
"Next": "Success",
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "Notification",
"Comment": "Fail"
}
]
}
Run a Jobで動かすためにResourceの末尾に.sync
がついています。
他には、エラーをキャッチした場合はエラー通知を行うタスクに遷移するような定義がしてあります。
ステートマシンを定義する
これらをまとめると定義グラフの完成形は以下のような図になります。
状態定義のJSON全体はこちら
{
"StartAt": "Extract Parameters",
"States": {
"Extract Parameters": {
"Type": "Pass",
"Next": "Add Partition",
"Parameters": {
"Database.$": "$.Database",
"Table.$": "$.Table",
"WorkGroup.$": "$.WorkGroup",
"Year.$": "States.ArrayGetItem(States.StringSplit(States.ArrayGetItem(States.StringSplit($$.Execution.StartTime, 'T'), 0), '-'), 0)",
"Month.$": "States.ArrayGetItem(States.StringSplit(States.ArrayGetItem(States.StringSplit($$.Execution.StartTime, 'T'), 0), '-'), 1)",
"Day.$": "States.ArrayGetItem(States.StringSplit(States.ArrayGetItem(States.StringSplit($$.Execution.StartTime, 'T'), 0), '-'), 2)"
}
},
"Add Partition": {
"Type": "Task",
"Resource": "arn:aws:states:::athena:startQueryExecution.sync",
"Parameters": {
"QueryString.$": "States.Format('ALTER TABLE {} ADD IF NOT EXISTS PARTITION (year = \\'{}\\', month = \\'{}\\', day = \\'{}\\')', $.Table, $.Year, $.Month, $.Day)",
"WorkGroup.$": "$.WorkGroup",
"QueryExecutionContext": {
"Database.$": "$.Database"
}
},
"TimeoutSeconds": 120,
"Next": "Success",
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "Notification",
"Comment": "Fail"
}
]
},
"Notification": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"Message.$": "$",
"TopicArn": "arn:aws:sns:ap-northeast-1:123456789012:errorNotify"
},
"Next": "Fail"
},
"Success": {
"Type": "Succeed"
},
"Fail": {
"Type": "Fail"
}
},
"TimeoutSeconds": 300
}
あとはこのステートマシンをEventBridgeのルールターゲットに指定するなどして日次で実行すると、実行時間に対応したパーティションを毎日追加することが出来ます。
まとめ
組み込み関数が増えたことでAPIパラメータを調整するだけのLambda関数などがかなり減らせるようになり、とても便利ですね。
StringSplit
とArrayGetItem
の組み合わせは汎用性が高く、様々な場面で活用できそうです。
個人的にはあとはDateのParse/Format系が増えてくれるととても嬉しいなと思っています。
コンテキストのExecution.StartTime
をタイムスタンプに変換、APIパラメータに合わせてフォーマットしたり、今回のようなケースでタイムゾーンを意識した変換ができると嬉しいケースがありそうです。
補足
以下は補足や余談など。
組み込み関数のスタックサイズ
今回$.Year
, $.Month
, $.Day
を抽出するためにPassタスクを利用しましたが、ステップ数を節約したい場合はQueryString.$
のStates.Format()
内にすべて記載して展開することも理論上可能です。
ですが、現時点では1つの組み込み関数内にネストできる関数スタック上限が10件となっているようで、関数スタックがそれ以上になるとエラーになります。
例えば極端な例としては以下の記述は文法エラー扱いとなって定義を保存出来ません。
"Test.$": "States.MathAdd(States.MathAdd(States.MathAdd(States.MathAdd(States.MathAdd(States.MathAdd(States.MathAdd(States.MathAdd(States.MathAdd(States.MathAdd(States.MathAdd(1, 1), 1), 1), 1), 1), 1), 1), 1), 1), 1), 1)"
この制限を回避するよう工夫をした下記のようなQueryStringを定義することでPassタスクを利用しなくても今回の機能は実現可能です。
"QueryString.$": "States.Format('ALTER TABLE {} ADD IF NOT EXISTS PARTITION (year = \\'{}\\', month = \\'{}\\', day = \\'{}\\')', $.Table, States.ArrayGetItem(States.StringSplit($$.Execution.StartTime, '-'), 0), States.ArrayGetItem(States.StringSplit($$.Execution.StartTime, '-'), 1), States.ArrayGetItem(States.StringSplit(States.ArrayGetItem(States.StringSplit($$.Execution.StartTime, 'T'), 0), '-'), 2))"
ですがややトリッキーな回避をしていることもあり可読性も低いので、適宜Passタスクを利用して見通しをよく保つほうがほとんどの場合で望ましいでしょう。
Optimized Integrationsの挙動
Run a Jobでタスクを同期実行した場合、結果を待つためにタスクが最低1分弱waitします。
そのためタスクのタイムアウト時間を20秒など短めに設定すると、クエリ自体はミリ秒単位で成功していてもタスクはタイムアウトエラーになります。
高速に処理したい場合などは注意が必要です。
ちなみに下記ドキュメントによるとOptimizedがサポートしているパラメータが明示されているようにも読めますが、今回試してみた範囲だとドキュメントに記載のなかったExecutionParametersを用いたParameterizedQueryも実行することができました。
https://docs.aws.amazon.com/step-functions/latest/dg/connect-athena.html
ドキュメントに記載が無い以上動作確認は必要ですが、API実行の挙動そのものにはSDKと違いはなく、同期実行(Run a Job)が出来るかどうかのみがSDK Integrationsとの大きな違いと考えてもよさそうです。