0. まとめ
- S3にあるIceberg テーブルをUPSERT(更新+挿入)するワークフロー(ステートマシン)を AWS Step Functions で作った。
- ステートマシンの単体テストを、AWS上にデプロイせず、外部呼出もモックして実施したい。
- そこで TestState API を使い、単体テストしてみた。
1. 前提知識: Step Functions とは
AWS Step Functions は、複数の処理を「順番」「分岐」「リトライ」といったフローチャートとして組み立てて実行するサービスです。
- フローチャートの 1 つ 1 つの箱を ステート(state) と呼びます。
- フロー全体の設計図を ステートマシン定義 と呼び、JSONataまたはJSONPath で書きます。
よく使うステートの種類は次のとおりです。
| ステート | 役割 | 説明 |
|---|---|---|
| Choice | 条件分岐 | プログラムの if。条件で次の行き先を変える |
| Pass | 入力の加工 | 計算や文字列組み立て。次のステートへ渡す |
| Task | 処理本体 | Lambda・Athena など AWS サービスを実際に呼ぶ |
| Map | 繰り返し | 配列の各要素に同じ処理を適用する(for ループ) |
| Parallel | 並列実行 | 複数のブランチを同時に走らせる |
| Wait | 待機 | 指定時間・指定時刻まで待つ |
| Succeed / Fail | 終了 | 成功/失敗で終わる |
本記事ではAWSでも推奨されている、JSONata({% ... %} で書く式言語)を使っています。
詳しく知りたい方は、AWSのBlackBeltや以下記事などが参考になります。
2. 前提知識: Apache Iceberg とは
CSVやParquetファイルは、「特定の1行だけ書き換える(UPDATE)」「1行だけ消す(DELETE)」といった操作が苦手です。ファイルのどこに何があるかを管理していないからです。
Apache Iceberg は、Parquet などのファイル群を「データベースのテーブルのように」扱えるようにするテーブルフォーマットです。
データ本体(Parquet ファイル)とは別に、「今このテーブルを構成しているファイルはどれか」を記録したメタデータを持ちます。
| できること | 説明 |
|---|---|
| UPDATE / DELETE / MERGE | 行単位の更新・削除・UPSERT ができる |
| ACID トランザクション | 書き込み途中の中途半端な状態が見えない |
| スキーマ変更 | 列の追加・改名などをテーブルを作り直さずに行える |
| タイムトラベル | 「過去のある時点のテーブル」を読める |
| エンジンへの非依存 | Athena、Spark、Trinoなど、複数のクエリエンジンから同一テーブルを安全に操作可能 |
詳しく知りたい方は、以下記事などが参考になります。
3. テスト対象のステートマシン
作成したステートマシン(今回の単体テストの対象)のワークフローを簡単に紹介します。
TestState APIの使い方だけ知りたい方は、この章は読み飛ばしてください。

更新対象テーブルと元テーブルの情報をインプットとして、
①入力チェック ⇒ ②SQL(Merge)文作成 ⇒ ③Merge文をAthenaで同期実行 という簡単なフローです。
3.1. ① 入力チェックステート(ValidateInput)
ワークフローに渡された入力に、必要な項目(対象 DB・対象テーブル・結合キーなど 7 つ)がそろっているかを確認します。
Choiceステートで実現しています。
1 つでも欠けていたらエラーハンドリングするようにしています。
定義(抜粋);必須 7 項目を $exists() で AND 連結し、1 つでも欠ければエラー時のステート(ValidationFailed) へ分岐します。
"ValidateInput": {
"Type": "Choice",
"Choices": [
{
"Condition": "{% $not($exists($states.input.target_database) and $exists($states.input.target_table) and … and $exists($states.input.insert_columns)) %}",
"Next": "ValidationFailed"
}
],
"Default": "BuildMergeSql"
}
3.2. ② SQLの組み立てステート(BuildMergeSql)
入力でもらった「結合キー」「更新する列」「挿入する列」から、
MERGE 文の SQL 文字列を JSONata で組み立てます。
Passステートで実現しています。
たとえば次の入力:
{
"target_database": "sales_db",
"target_table": "customer_master",
"source_database": "staging_db",
"source_table": "customer_master_delta",
"merge_keys": ["id"],
"update_columns": ["name", "email"],
"insert_columns": ["id", "name", "email"]
}
から、こういう SQL を作ります:
MERGE INTO sales_db.customer_master t USING staging_db.customer_master_delta s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET name = s.name, email = s.email
WHEN NOT MATCHED THEN INSERT (id, name, email) VALUES (s.id, s.name, s.email)
定義(抜粋);JSONataでは、Assign フィールドに変数を格納できます。
merge_sql が JSONata の文字列結合で MERGE 文を組み立てている部分です。
"BuildMergeSql": {
"Type": "Pass",
"Assign": {
"merge_sql": "{% 'MERGE INTO ' & $states.input.target_database & '.' & $states.input.target_table & ' t USING ' & $states.input.source_database & '.' & $states.input.source_table & ' s ON ' & $join($map($states.input.merge_keys, function($k){ 't.' & $k & ' = s.' & $k }), ' AND ') & ' WHEN MATCHED THEN UPDATE SET ' & $join($map($states.input.update_columns, function($c){ $c & ' = s.' & $c }), ', ') & ' WHEN NOT MATCHED THEN INSERT (' & $join($states.input.insert_columns, ', ') & ') VALUES (' & $join($map($states.input.insert_columns, function($c){ 's.' & $c }), ', ') & ')' %}",
"target_database": "{% $states.input.target_database %}",
"target_table": "{% $states.input.target_database & '.' & $states.input.target_table %}"
},
"Next": "ExecuteAthenaUpsert"
}
3.3. ③ Athena でクエリを同期実行(ExecuteAthenaUpsert)
組み立てた SQL を Amazon Athena に投げて実行します。
Taskステートで実現しています。
.sync によりAthenaへの同期実行を行っています。
すなわち、クエリが終わるまで待ち、失敗したらこのステートも失敗します。
定義(抜粋);Resource 末尾の .sync が「完了まで待つ」指定です。QueryString に前ステートの $merge_sql を渡し、スロットリング等の一過性エラーだけ Retry します。
"ExecuteAthenaUpsert": {
"Type": "Task",
"Resource": "arn:aws:states:::athena:startQueryExecution.sync",
"Arguments": {
"QueryString": "{% $merge_sql %}",
"QueryExecutionContext": { "Database": "{% $target_database %}" },
"ResultConfiguration": { "OutputLocation": "s3://iceberg-demo/athena-results/" }
},
"Retry": [
{
"ErrorEquals": ["Athena.TooManyRequestsException", "Athena.SdkClientException"],
"IntervalSeconds": 10, "MaxAttempts": 3, "BackoffRate": 2
}
],
"End": true
}
4. TestState API - 概要
このステートマシンをテストしようとすると、AWSへデプロイして、実際にIcebergテーブルやテストデータを用意する必要があります。
Lambdaを呼び出すステートマシンの場合は、Lambdaのデプロイも必要です。
実行することで多少の課金も発生しますし、そもそもこれって単体テストなのかという話にもなります。
そこで登場するのが TestState API です。
ステートマシン全体をデプロイ・実行しなくても、ステートを 1 つだけ取り出して動かせる API(
aws stepfunctions test-state)。
ポイントは 3 つ。
- 1 ステート単位で試せる。
-
モック(
--mock)が使える。
Athena 呼び出しを「本物を呼ばずに、結果を差し込む」形でテストできる。 -
何が起きたかを細かく見られる。
--inspection-levelをDEBUGにすると、途中の計算結果(組み立てた SQL など)まで返ってきます。
5. TestState API - 試してみる
5.0 前提
- AWS CLI v2 が使えること。
- 実行するIAM 権限に
states:TestStateがあること。
注意:
日本語 Windows の場合: 定義 JSON に日本語コメントが含まれると、AWS CLI v2 は file:// をシステムロケール(CP932)で開こうとし、UTF-8 ファイルだと text contents could not be decoded エラーになります。
⇒ AWS_CLI_FILE_ENCODING=utf-8 を設定したら回避できました(bash: export AWS_CLI_FILE_ENCODING=utf-8 / PowerShell: $env:AWS_CLI_FILE_ENCODING="utf-8")。
今回は以下のようなフォルダ構成にしています。
作業フォルダ
├── step_functions_definition.json # ステートマシンの定義
└── tests/
├── inputs/ # ステートマシンに与える入力イベント
│ ├── valid.json # 必須項目が全部そろっている(正常系)
│ ├── invalid_missing_param.json # insert_columns 欠落(異常系)
│ └── multi_key.json # 複合キー・複数列(SQL組み立ての確認用)
├── variables/
│ └── execute_vars.json # ③ が参照する変数(②が Assign した値を手で再現)
└── mocks/ # ③ に差し込むモック応答
├── athena_succeeded.json # クエリ成功
├── athena_failed.json # クエリ失敗(States.TaskFailed)
└── athena_throttled.json # スロットリング(Retry 確認用)
test-state でよく使うオプションは次の通りです。
| オプション | 役割 | 必須 |
|---|---|---|
--definition |
テスト対象の定義 | 必須 |
--state-name |
定義の中のどのステートを試すか | フル定義を渡すなら必須 |
--input |
ステートマシンへの入力 | 入力をそのまま参照するステートの場合必須 |
--variables |
前段のステートが Assign した変数を手で渡す |
変数を参照するステートの場合必須 |
--mock |
Taskステートで呼び出した結果・エラーを差し込む | 任意 |
--inspection-level |
返す情報量(INFO/DEBUG/TRACE)を指定 |
任意 |
--role-arn |
実行ロール | 任意(mock なしで実リソースを呼ぶときだけ必須) |
--region |
リージョン(ap-northeast-1) |
必須 |
5.1 ①の分岐をテストする(Choice)
ValidateInput(Choiceステート)を --state-name で指定し、入力を与えて「どちらへ分岐したか」を nextState で確認します。
5.1.1 正常系: 必須項目がそろっている
入力 — tests/inputs/valid.json
{
"target_database": "sales_db",
"target_table": "customer_master",
"source_database": "staging_db",
"source_table": "customer_master_delta",
"merge_keys": ["id"],
"update_columns": ["name", "email"],
"insert_columns": ["id", "name", "email"]
}
コマンド
aws stepfunctions test-state --region ap-northeast-1 \
--definition file://step_functions_definition.json \
--state-name ValidateInput \
--input file://tests/inputs/valid.json
出力
{
"output": "{ ...入力がそのまま... }",
"nextState": "BuildMergeSql",
"status": "SUCCEEDED"
}
nextState が BuildMergeSql なので、欠落なしと判定して次へ進んだと分かります。
5.1.2 異常系: insert_columns が欠落
入力 — tests/inputs/invalid_missing_param.json(insert_columns が無い)
{
"target_database": "sales_db",
"target_table": "customer_master",
"source_database": "staging_db",
"source_table": "customer_master_delta",
"merge_keys": ["id"],
"update_columns": ["name", "email"]
}
コマンド
aws stepfunctions test-state --region ap-northeast-1 \
--definition file://step_functions_definition.json \
--state-name ValidateInput \
--input file://tests/inputs/invalid_missing_param.json
出力
{
"output": "{ ...入力がそのまま... }",
"nextState": "ValidationFailed",
"status": "SUCCEEDED"
}
nextState が ValidationFailed に変わり、必須チェックの分岐ロジックが効いていると確認できます。
5.2 ②の SQL 組み立てをテストする(Pass)
BuildMergeSql(Passステート)を DEBUG で動かし、Assign した merge_sql を inspectionData.variables で覗きます。中身はエスケープされた JSON 文字列なので | jq 'fromjson' で展開します。
5.2.1 単一キー
入力 — tests/inputs/valid.json(5.1 と同じ)
{
"target_database": "sales_db",
"target_table": "customer_master",
"source_database": "staging_db",
"source_table": "customer_master_delta",
"merge_keys": ["id"],
"update_columns": ["name", "email"],
"insert_columns": ["id", "name", "email"]
}
コマンド
aws stepfunctions test-state --region ap-northeast-1 \
--definition file://step_functions_definition.json \
--state-name BuildMergeSql \
--input file://tests/inputs/valid.json \
--inspection-level DEBUG \
--query 'inspectionData.variables' | jq 'fromjson'
出力(jq 'fromjson' で展開後)
{
"target_database": "sales_db",
"merge_sql": "MERGE INTO sales_db.customer_master t USING staging_db.customer_master_delta s ON t.id = s.id WHEN MATCHED THEN UPDATE SET name = s.name, email = s.email WHEN NOT MATCHED THEN INSERT (id, name, email) VALUES (s.id, s.name, s.email)",
"target_table": "sales_db.customer_master"
}
期待通りのMERGE文が作成されています。
5.2.2 複合キー
入力 — tests/inputs/multi_key.json(結合キー 2 列・更新/挿入列も複数)
{
"target_database": "sales_db",
"target_table": "orders",
"source_database": "staging_db",
"source_table": "orders_delta",
"merge_keys": ["order_id", "line_no"],
"update_columns": ["qty", "amount"],
"insert_columns": ["order_id", "line_no", "qty", "amount"]
}
コマンド
aws stepfunctions test-state --region ap-northeast-1 \
--definition file://step_functions_definition.json \
--state-name BuildMergeSql \
--input file://tests/inputs/multi_key.json \
--inspection-level DEBUG \
--query 'inspectionData.variables' | jq 'fromjson'
出力(jq 'fromjson' で展開後)
{
"target_database": "sales_db",
"merge_sql": "MERGE INTO sales_db.orders t USING staging_db.orders_delta s ON t.order_id = s.order_id AND t.line_no = s.line_no WHEN MATCHED THEN UPDATE SET qty = s.qty, amount = s.amount WHEN NOT MATCHED THEN INSERT (order_id, line_no, qty, amount) VALUES (s.order_id, s.line_no, s.qty, s.amount)",
"target_table": "sales_db.orders"
}
ON 句が AND で連結され、複合キー版の SQL になっています。
Athenaを呼ばずに、SQL生成ロジックの正しさを確認できました。
inspectionDataの各フィールド(variables/afterArguments/input/result)は、中身がエスケープされた JSON 文字列で返ります。
5.3 ③の Athena 呼び出しをモックでテストする
--mock で結果を差し込みます。
参照する変数は --variables で手渡しします。
共通の入力① イベント — tests/inputs/valid.json(5.1と同じなので省略)
共通の入力② 変数 — tests/variables/execute_vars.json(5.2が Assign した値を手で再現)
{
"merge_sql": "MERGE INTO sales_db.customer_master t USING staging_db.customer_master_delta s ON t.id = s.id WHEN MATCHED THEN UPDATE SET name = s.name, email = s.email WHEN NOT MATCHED THEN INSERT (id, name, email) VALUES (s.id, s.name, s.email)",
"target_database": "sales_db",
"target_table": "sales_db.customer_master"
}
5.3.1 成功(クエリ完了)
入力③ モック — tests/mocks/athena_succeeded.json
{
"result": "{\"QueryExecution\":{\"QueryExecutionId\":\"q-0123456789abcdef\",\"Status\":{\"State\":\"SUCCEEDED\"}}}"
}
コマンド
aws stepfunctions test-state --region ap-northeast-1 \
--definition file://step_functions_definition.json \
--state-name ExecuteAthenaUpsert \
--input file://tests/inputs/valid.json \
--variables file://tests/variables/execute_vars.json \
--mock file://tests/mocks/athena_succeeded.json \
--inspection-level DEBUG
出力(抜粋)
{
"output": "{\"query_execution_id\":\"q-0123456789abcdef\",\"state\":\"SUCCEEDED\",\"target_table\":\"sales_db.customer_master\",\"executed_sql\":\"MERGE INTO sales_db.customer_master ...\"}",
"inspectionData": {
"afterArguments": "{\"QueryString\":\"MERGE INTO sales_db.customer_master ...\",\"QueryExecutionContext\":{\"Database\":\"sales_db\"},\"ResultConfiguration\":{\"OutputLocation\":\"s3://iceberg-demo/athena-results/\"}}",
"result": "{\"QueryExecution\":{\"QueryExecutionId\":\"q-0123456789abcdef\",\"Status\":{\"State\":\"SUCCEEDED\"}}}",
"variables": "{ ...merge_sql など... }"
},
"status": "SUCCEEDED"
}
output に実行結果(query_execution_id / state / executed_sql)が入ります。
モックを使うと本物のデータを触らずに成功時の出力を確認できます。
5.3.2 失敗(クエリエラー)
入力③ モック — tests/mocks/athena_failed.json
{
"errorOutput": {
"error": "States.TaskFailed",
"cause": "Athena query FAILED: SYNTAX_ERROR: line 1:8 Table 'sales_db.customer_master' does not exist"
}
}
コマンド(--mock だけ差し替え)
aws stepfunctions test-state --region ap-northeast-1 \
--definition file://step_functions_definition.json \
--state-name ExecuteAthenaUpsert \
--input file://tests/inputs/valid.json \
--variables file://tests/variables/execute_vars.json \
--mock file://tests/mocks/athena_failed.json \
--inspection-level DEBUG
出力(抜粋)
{
"error": "States.TaskFailed",
"cause": "Athena query FAILED: SYNTAX_ERROR: line 1:8 Table 'sales_db.customer_master' does not exist",
"inspectionData": {
"afterArguments": "{ ...解決済み引数... }",
"variables": "{ ... }"
},
"status": "FAILED"
}
Retry の ErrorEquals に当たらないエラーなので status: FAILED。
5.3.3 スロットリング(Retry にマッチ)
入力③ モック — tests/mocks/athena_throttled.json
{
"errorOutput": {
"error": "Athena.TooManyRequestsException",
"cause": "Rate exceeded"
}
}
コマンド(--state-configuration で試行回数を与える)
aws stepfunctions test-state --region ap-northeast-1 \
--definition file://step_functions_definition.json \
--state-name ExecuteAthenaUpsert \
--input file://tests/inputs/valid.json \
--variables file://tests/variables/execute_vars.json \
--mock file://tests/mocks/athena_throttled.json \
--state-configuration '{"retrierRetryCount": 1}' \
--inspection-level DEBUG
出力(抜粋)
{
"error": "Athena.TooManyRequestsException",
"cause": "Rate exceeded",
"inspectionData": {
"errorDetails": { "retryIndex": 0, "retryBackoffIntervalSeconds": 20 }
},
"status": "RETRIABLE"
}
status: RETRIABLE となり、スロットリングが Retry にマッチして再試行対象になると確認できます(retryBackoffIntervalSeconds は次回までのバックオフ秒数)。
5.4 --inspection-level で出力がどう変わるか試す
同じ BuildMergeSql を同じ入力で INFO と DEBUG で叩き、出力の情報量だけを比べます。
入力 — tests/inputs/valid.json(5.1 と同じ)
5.4.1 INFO(既定)
コマンド
aws stepfunctions test-state --region ap-northeast-1 \
--definition file://step_functions_definition.json --state-name BuildMergeSql \
--input file://tests/inputs/valid.json --inspection-level INFO
出力(output / nextState / status の 3 つだけ)
{
"output": "{ ...入力がそのまま... }",
"nextState": "ExecuteAthenaUpsert",
"status": "SUCCEEDED"
}
5.4.2 DEBUG
コマンド
aws stepfunctions test-state --region ap-northeast-1 \
--definition file://step_functions_definition.json --state-name BuildMergeSql \
--input file://tests/inputs/valid.json --inspection-level DEBUG
出力(INFO に inspectionData が増える。Assign した merge_sql はここ)
{
"output": "{ ...入力がそのまま... }",
"inspectionData": {
"input": "{ ...生の入力... }",
"variables": "{\"target_database\":\"sales_db\",\"merge_sql\":\"MERGE INTO ...\",\"target_table\":\"sales_db.customer_master\"}"
},
"nextState": "ExecuteAthenaUpsert",
"status": "SUCCEEDED"
}
レベルごとの返却内容:
| レベル | 返るもの | 使いどころ |
|---|---|---|
| INFO(既定) |
output / nextState / status(失敗時は error / cause) |
分岐先や成否だけ見たいとき |
| DEBUG | INFO + inspectionData(input / afterArguments/ result/ variables) |
生成 SQL・引数の中身を確認したいとき |
| TRACE | DEBUG 相当 + HTTP Task のときだけ HTTP リクエスト/レスポンス生データ | HTTP Task のデバッグ |