2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Step Functions 単体テスト — TestState API を試してみた

2
Last updated at Posted at 2026-06-26

0. まとめ

  • S3にあるIceberg テーブルをUPSERT(更新+挿入)するワークフロー(ステートマシン)を AWS Step Functions で作った。
  • ステートマシンの単体テストを、AWS上にデプロイせず、外部呼出もモックして実施したい。
  • そこで TestState API を使い、単体テストしてみた。

1. 前提知識: Step Functions とは

AWS Step Functions は、複数の処理を「順番」「分岐」「リトライ」といったフローチャートとして組み立てて実行するサービスです。

  • フローチャートの 1 つ 1 つの箱を ステート(state) と呼びます。
  • フロー全体の設計図を ステートマシン定義 と呼び、JSONataまたはJSONPath で書きます。

よく使うステートの種類は次のとおりです。

ステート 役割 説明
Choice 条件分岐 プログラムの if。条件で次の行き先を変える
Pass 入力の加工 計算や文字列組み立て。次のステートへ渡す
Task 処理本体 Lambda・Athena など AWS サービスを実際に呼ぶ
Map 繰り返し 配列の各要素に同じ処理を適用する(for ループ)
Parallel 並列実行 複数のブランチを同時に走らせる
Wait 待機 指定時間・指定時刻まで待つ
Succeed / Fail 終了 成功/失敗で終わる

本記事ではAWSでも推奨されている、JSONata({% ... %} で書く式言語)を使っています。

詳しく知りたい方は、AWSのBlackBeltや以下記事などが参考になります。


2. 前提知識: Apache Iceberg とは

CSVやParquetファイルは、「特定の1行だけ書き換える(UPDATE)」「1行だけ消す(DELETE)」といった操作が苦手です。ファイルのどこに何があるかを管理していないからです。

Apache Iceberg は、Parquet などのファイル群を「データベースのテーブルのように」扱えるようにするテーブルフォーマットです。
データ本体(Parquet ファイル)とは別に、「今このテーブルを構成しているファイルはどれか」を記録したメタデータを持ちます。

できること 説明
UPDATE / DELETE / MERGE 行単位の更新・削除・UPSERT ができる
ACID トランザクション 書き込み途中の中途半端な状態が見えない
スキーマ変更 列の追加・改名などをテーブルを作り直さずに行える
タイムトラベル 「過去のある時点のテーブル」を読める
エンジンへの非依存 Athena、Spark、Trinoなど、複数のクエリエンジンから同一テーブルを安全に操作可能

詳しく知りたい方は、以下記事などが参考になります。


3. テスト対象のステートマシン

作成したステートマシン(今回の単体テストの対象)のワークフローを簡単に紹介します。
TestState APIの使い方だけ知りたい方は、この章は読み飛ばしてください。
image.png

更新対象テーブルと元テーブルの情報をインプットとして、  
①入力チェック ⇒ ②SQL(Merge)文作成 ⇒ ③Merge文をAthenaで同期実行 という簡単なフローです。

3.1. ① 入力チェックステート(ValidateInput

ワークフローに渡された入力に、必要な項目(対象 DB・対象テーブル・結合キーなど 7 つ)がそろっているかを確認します。
Choiceステートで実現しています。
1 つでも欠けていたらエラーハンドリングするようにしています。

定義(抜粋);必須 7 項目を $exists() で AND 連結し、1 つでも欠ければエラー時のステート(ValidationFailed) へ分岐します。

"ValidateInput": {
  "Type": "Choice",
  "Choices": [
    {
      "Condition": "{% $not($exists($states.input.target_database) and $exists($states.input.target_table) and … and $exists($states.input.insert_columns)) %}",
      "Next": "ValidationFailed"
    }
  ],
  "Default": "BuildMergeSql"
}

3.2. ② SQLの組み立てステート(BuildMergeSql

入力でもらった「結合キー」「更新する列」「挿入する列」から、
MERGE 文の SQL 文字列を JSONata で組み立てます。
Passステートで実現しています。
たとえば次の入力:

{
  "target_database": "sales_db",
  "target_table": "customer_master",
  "source_database": "staging_db",
  "source_table": "customer_master_delta",
  "merge_keys": ["id"],
  "update_columns": ["name", "email"],
  "insert_columns": ["id", "name", "email"]
}

から、こういう SQL を作ります:

MERGE INTO sales_db.customer_master t USING staging_db.customer_master_delta s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET name = s.name, email = s.email
WHEN NOT MATCHED THEN INSERT (id, name, email) VALUES (s.id, s.name, s.email)

定義(抜粋);JSONataでは、Assign フィールドに変数を格納できます。
merge_sql が JSONata の文字列結合で MERGE 文を組み立てている部分です。

"BuildMergeSql": {
  "Type": "Pass",
  "Assign": {
    "merge_sql": "{% 'MERGE INTO ' & $states.input.target_database & '.' & $states.input.target_table & ' t USING ' & $states.input.source_database & '.' & $states.input.source_table & ' s ON ' & $join($map($states.input.merge_keys, function($k){ 't.' & $k & ' = s.' & $k }), ' AND ') & ' WHEN MATCHED THEN UPDATE SET ' & $join($map($states.input.update_columns, function($c){ $c & ' = s.' & $c }), ', ') & ' WHEN NOT MATCHED THEN INSERT (' & $join($states.input.insert_columns, ', ') & ') VALUES (' & $join($map($states.input.insert_columns, function($c){ 's.' & $c }), ', ') & ')' %}",
    "target_database": "{% $states.input.target_database %}",
    "target_table": "{% $states.input.target_database & '.' & $states.input.target_table %}"
  },
  "Next": "ExecuteAthenaUpsert"
}

3.3. ③ Athena でクエリを同期実行(ExecuteAthenaUpsert

組み立てた SQL を Amazon Athena に投げて実行します。
Taskステートで実現しています。
.sync によりAthenaへの同期実行を行っています。
すなわち、クエリが終わるまで待ち、失敗したらこのステートも失敗します。

定義(抜粋);Resource 末尾の .sync が「完了まで待つ」指定です。QueryString に前ステートの $merge_sql を渡し、スロットリング等の一過性エラーだけ Retry します。

"ExecuteAthenaUpsert": {
  "Type": "Task",
  "Resource": "arn:aws:states:::athena:startQueryExecution.sync",
  "Arguments": {
    "QueryString": "{% $merge_sql %}",
    "QueryExecutionContext": { "Database": "{% $target_database %}" },
    "ResultConfiguration": { "OutputLocation": "s3://iceberg-demo/athena-results/" }
  },
  "Retry": [
    {
      "ErrorEquals": ["Athena.TooManyRequestsException", "Athena.SdkClientException"],
      "IntervalSeconds": 10, "MaxAttempts": 3, "BackoffRate": 2
    }
  ],
  "End": true
}

4. TestState API - 概要

このステートマシンをテストしようとすると、AWSへデプロイして、実際にIcebergテーブルやテストデータを用意する必要があります。
Lambdaを呼び出すステートマシンの場合は、Lambdaのデプロイも必要です。

実行することで多少の課金も発生しますし、そもそもこれって単体テストなのかという話にもなります。

そこで登場するのが TestState API です。

ステートマシン全体をデプロイ・実行しなくても、ステートを 1 つだけ取り出して動かせる APIaws stepfunctions test-state)。

ポイントは 3 つ。

  1. 1 ステート単位で試せる。
  2. モック(--mock)が使える
    Athena 呼び出しを「本物を呼ばずに、結果を差し込む」形でテストできる。
  3. 何が起きたかを細かく見られる--inspection-levelDEBUG にすると、途中の計算結果(組み立てた SQL など)まで返ってきます。

5. TestState API - 試してみる

5.0 前提

  • AWS CLI v2 が使えること。
  • 実行するIAM 権限に states:TestState があること。

注意
日本語 Windows の場合: 定義 JSON に日本語コメントが含まれると、AWS CLI v2 は file:// をシステムロケール(CP932)で開こうとし、UTF-8 ファイルだと text contents could not be decoded エラーになります。
AWS_CLI_FILE_ENCODING=utf-8 を設定したら回避できました(bash: export AWS_CLI_FILE_ENCODING=utf-8 / PowerShell: $env:AWS_CLI_FILE_ENCODING="utf-8")。

今回は以下のようなフォルダ構成にしています。

作業フォルダ
├── step_functions_definition.json   # ステートマシンの定義
└── tests/
    ├── inputs/                        # ステートマシンに与える入力イベント
    │   ├── valid.json                 #  必須項目が全部そろっている(正常系)
    │   ├── invalid_missing_param.json #  insert_columns 欠落(異常系)
    │   └── multi_key.json             #  複合キー・複数列(SQL組み立ての確認用)
    ├── variables/
    │   └── execute_vars.json          #  ③ が参照する変数(②が Assign した値を手で再現)
    └── mocks/                         # ③ に差し込むモック応答
        ├── athena_succeeded.json      #  クエリ成功
        ├── athena_failed.json         #  クエリ失敗(States.TaskFailed)
        └── athena_throttled.json      #  スロットリング(Retry 確認用)

test-state でよく使うオプションは次の通りです。

オプション 役割 必須
--definition テスト対象の定義 必須
--state-name 定義の中のどのステートを試すか フル定義を渡すなら必須
--input ステートマシンへの入力 入力をそのまま参照するステートの場合必須
--variables 前段のステートが Assign した変数を手で渡す 変数を参照するステートの場合必須
--mock Taskステートで呼び出した結果・エラーを差し込む 任意
--inspection-level 返す情報量(INFO/DEBUG/TRACE)を指定 任意
--role-arn 実行ロール 任意(mock なしで実リソースを呼ぶときだけ必須)
--region リージョン(ap-northeast-1 必須

5.1 ①の分岐をテストする(Choice)

ValidateInputChoiceステート)を --state-name で指定し、入力を与えて「どちらへ分岐したか」を nextState で確認します。

5.1.1 正常系: 必須項目がそろっている

入力tests/inputs/valid.json

{
  "target_database": "sales_db",
  "target_table": "customer_master",
  "source_database": "staging_db",
  "source_table": "customer_master_delta",
  "merge_keys": ["id"],
  "update_columns": ["name", "email"],
  "insert_columns": ["id", "name", "email"]
}

コマンド

aws stepfunctions test-state --region ap-northeast-1 \
  --definition file://step_functions_definition.json \
  --state-name ValidateInput \
  --input file://tests/inputs/valid.json

出力

{
  "output": "{ ...入力がそのまま... }",
  "nextState": "BuildMergeSql",
  "status": "SUCCEEDED"
}

nextStateBuildMergeSql なので、欠落なしと判定して次へ進んだと分かります。

5.1.2 異常系: insert_columns が欠落

入力tests/inputs/invalid_missing_param.jsoninsert_columns が無い)

{
  "target_database": "sales_db",
  "target_table": "customer_master",
  "source_database": "staging_db",
  "source_table": "customer_master_delta",
  "merge_keys": ["id"],
  "update_columns": ["name", "email"]
}

コマンド

aws stepfunctions test-state --region ap-northeast-1 \
  --definition file://step_functions_definition.json \
  --state-name ValidateInput \
  --input file://tests/inputs/invalid_missing_param.json

出力

{
  "output": "{ ...入力がそのまま... }",
  "nextState": "ValidationFailed",
  "status": "SUCCEEDED"
}

nextStateValidationFailed に変わり、必須チェックの分岐ロジックが効いていると確認できます。

5.2 ②の SQL 組み立てをテストする(Pass)

BuildMergeSqlPassステート)を DEBUG で動かし、Assign した merge_sqlinspectionData.variables で覗きます。中身はエスケープされた JSON 文字列なので | jq 'fromjson' で展開します。

5.2.1 単一キー

入力tests/inputs/valid.json(5.1 と同じ)

{
  "target_database": "sales_db",
  "target_table": "customer_master",
  "source_database": "staging_db",
  "source_table": "customer_master_delta",
  "merge_keys": ["id"],
  "update_columns": ["name", "email"],
  "insert_columns": ["id", "name", "email"]
}

コマンド

aws stepfunctions test-state --region ap-northeast-1 \
  --definition file://step_functions_definition.json \
  --state-name BuildMergeSql \
  --input file://tests/inputs/valid.json \
  --inspection-level DEBUG \
  --query 'inspectionData.variables' | jq 'fromjson'

出力jq 'fromjson' で展開後)

{
  "target_database": "sales_db",
  "merge_sql": "MERGE INTO sales_db.customer_master t USING staging_db.customer_master_delta s ON t.id = s.id WHEN MATCHED THEN UPDATE SET name = s.name, email = s.email WHEN NOT MATCHED THEN INSERT (id, name, email) VALUES (s.id, s.name, s.email)",
  "target_table": "sales_db.customer_master"
}

期待通りのMERGE文が作成されています。

5.2.2 複合キー

入力tests/inputs/multi_key.json(結合キー 2 列・更新/挿入列も複数)

{
  "target_database": "sales_db",
  "target_table": "orders",
  "source_database": "staging_db",
  "source_table": "orders_delta",
  "merge_keys": ["order_id", "line_no"],
  "update_columns": ["qty", "amount"],
  "insert_columns": ["order_id", "line_no", "qty", "amount"]
}

コマンド

aws stepfunctions test-state --region ap-northeast-1 \
  --definition file://step_functions_definition.json \
  --state-name BuildMergeSql \
  --input file://tests/inputs/multi_key.json \
  --inspection-level DEBUG \
  --query 'inspectionData.variables' | jq 'fromjson'

出力jq 'fromjson' で展開後)

{
  "target_database": "sales_db",
  "merge_sql": "MERGE INTO sales_db.orders t USING staging_db.orders_delta s ON t.order_id = s.order_id AND t.line_no = s.line_no WHEN MATCHED THEN UPDATE SET qty = s.qty, amount = s.amount WHEN NOT MATCHED THEN INSERT (order_id, line_no, qty, amount) VALUES (s.order_id, s.line_no, s.qty, s.amount)",
  "target_table": "sales_db.orders"
}

ON 句が AND で連結され、複合キー版の SQL になっています。
Athenaを呼ばずに、SQL生成ロジックの正しさを確認できました。

inspectionData の各フィールド(variables / afterArguments / input / result)は、中身がエスケープされた JSON 文字列で返ります。

5.3 ③の Athena 呼び出しをモックでテストする

--mock で結果を差し込みます。
参照する変数は --variables で手渡しします。

共通の入力① イベントtests/inputs/valid.json(5.1と同じなので省略)

共通の入力② 変数tests/variables/execute_vars.json(5.2が Assign した値を手で再現)

{
  "merge_sql": "MERGE INTO sales_db.customer_master t USING staging_db.customer_master_delta s ON t.id = s.id WHEN MATCHED THEN UPDATE SET name = s.name, email = s.email WHEN NOT MATCHED THEN INSERT (id, name, email) VALUES (s.id, s.name, s.email)",
  "target_database": "sales_db",
  "target_table": "sales_db.customer_master"
}

5.3.1 成功(クエリ完了)

入力③ モックtests/mocks/athena_succeeded.json

{
  "result": "{\"QueryExecution\":{\"QueryExecutionId\":\"q-0123456789abcdef\",\"Status\":{\"State\":\"SUCCEEDED\"}}}"
}

コマンド

aws stepfunctions test-state --region ap-northeast-1 \
  --definition file://step_functions_definition.json \
  --state-name ExecuteAthenaUpsert \
  --input file://tests/inputs/valid.json \
  --variables file://tests/variables/execute_vars.json \
  --mock file://tests/mocks/athena_succeeded.json \
  --inspection-level DEBUG

出力(抜粋)

{
  "output": "{\"query_execution_id\":\"q-0123456789abcdef\",\"state\":\"SUCCEEDED\",\"target_table\":\"sales_db.customer_master\",\"executed_sql\":\"MERGE INTO sales_db.customer_master ...\"}",
  "inspectionData": {
    "afterArguments": "{\"QueryString\":\"MERGE INTO sales_db.customer_master ...\",\"QueryExecutionContext\":{\"Database\":\"sales_db\"},\"ResultConfiguration\":{\"OutputLocation\":\"s3://iceberg-demo/athena-results/\"}}",
    "result": "{\"QueryExecution\":{\"QueryExecutionId\":\"q-0123456789abcdef\",\"Status\":{\"State\":\"SUCCEEDED\"}}}",
    "variables": "{ ...merge_sql など... }"
  },
  "status": "SUCCEEDED"
}

output に実行結果(query_execution_id / state / executed_sql)が入ります。
モックを使うと本物のデータを触らずに成功時の出力を確認できます。

5.3.2 失敗(クエリエラー)

入力③ モックtests/mocks/athena_failed.json

{
  "errorOutput": {
    "error": "States.TaskFailed",
    "cause": "Athena query FAILED: SYNTAX_ERROR: line 1:8 Table 'sales_db.customer_master' does not exist"
  }
}

コマンド--mock だけ差し替え)

aws stepfunctions test-state --region ap-northeast-1 \
  --definition file://step_functions_definition.json \
  --state-name ExecuteAthenaUpsert \
  --input file://tests/inputs/valid.json \
  --variables file://tests/variables/execute_vars.json \
  --mock file://tests/mocks/athena_failed.json \
  --inspection-level DEBUG

出力(抜粋)

{
  "error": "States.TaskFailed",
  "cause": "Athena query FAILED: SYNTAX_ERROR: line 1:8 Table 'sales_db.customer_master' does not exist",
  "inspectionData": {
    "afterArguments": "{ ...解決済み引数... }",
    "variables": "{ ... }"
  },
  "status": "FAILED"
}

RetryErrorEquals に当たらないエラーなので status: FAILED

5.3.3 スロットリング(Retry にマッチ)

入力③ モックtests/mocks/athena_throttled.json

{
  "errorOutput": {
    "error": "Athena.TooManyRequestsException",
    "cause": "Rate exceeded"
  }
}

コマンド--state-configuration で試行回数を与える)

aws stepfunctions test-state --region ap-northeast-1 \
  --definition file://step_functions_definition.json \
  --state-name ExecuteAthenaUpsert \
  --input file://tests/inputs/valid.json \
  --variables file://tests/variables/execute_vars.json \
  --mock file://tests/mocks/athena_throttled.json \
  --state-configuration '{"retrierRetryCount": 1}' \
  --inspection-level DEBUG

出力(抜粋)

{
  "error": "Athena.TooManyRequestsException",
  "cause": "Rate exceeded",
  "inspectionData": {
    "errorDetails": { "retryIndex": 0, "retryBackoffIntervalSeconds": 20 }
  },
  "status": "RETRIABLE"
}

status: RETRIABLE となり、スロットリングが Retry にマッチして再試行対象になると確認できます(retryBackoffIntervalSeconds は次回までのバックオフ秒数)。

5.4 --inspection-level で出力がどう変わるか試す

同じ BuildMergeSql を同じ入力で INFODEBUG で叩き、出力の情報量だけを比べます。

入力tests/inputs/valid.json(5.1 と同じ)

5.4.1 INFO(既定)

コマンド

aws stepfunctions test-state --region ap-northeast-1 \
  --definition file://step_functions_definition.json --state-name BuildMergeSql \
  --input file://tests/inputs/valid.json --inspection-level INFO

出力output / nextState / status の 3 つだけ)

{
  "output": "{ ...入力がそのまま... }",
  "nextState": "ExecuteAthenaUpsert",
  "status": "SUCCEEDED"
}

5.4.2 DEBUG

コマンド

aws stepfunctions test-state --region ap-northeast-1 \
  --definition file://step_functions_definition.json --state-name BuildMergeSql \
  --input file://tests/inputs/valid.json --inspection-level DEBUG

出力(INFO に inspectionData が増える。Assign した merge_sql はここ)

{
  "output": "{ ...入力がそのまま... }",
  "inspectionData": {
    "input": "{ ...生の入力... }",
    "variables": "{\"target_database\":\"sales_db\",\"merge_sql\":\"MERGE INTO ...\",\"target_table\":\"sales_db.customer_master\"}"
  },
  "nextState": "ExecuteAthenaUpsert",
  "status": "SUCCEEDED"
}

レベルごとの返却内容:

レベル 返るもの 使いどころ
INFO(既定) output / nextState / status(失敗時は error / cause 分岐先や成否だけ見たいとき
DEBUG INFO + inspectionDatainput / afterArguments/ result/ variables 生成 SQL・引数の中身を確認したいとき
TRACE DEBUG 相当 + HTTP Task のときだけ HTTP リクエスト/レスポンス生データ HTTP Task のデバッグ
2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?