0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

AWS Step Functions の Map と ItemReader を OSS 実装から学ぶ

0
Posted at

AWS Step Functions の MapItemReader を OSS 実装から学ぶ

1. はじめに

Floci という OSS の AWS エミュレーターで Step Functions の実装を追っていたとき、Map state と ItemReader の挙動が実 AWS と揃っていない箇所にぶつかりました。

以前、Step Functions で簡単な PoC を作ったことがあり、そのときに MapItemReader は一通り触っていました。そこで Floci でも同じような定義を動かしてみたところ、States.ItemReaderFailed の扱いと、定義時バリデーションと実行時エラーの境目が気になりました。そこから実 AWS の挙動を確認しながら調べることになりました。

調べてみると、Map state が何を反復するのか、ItemReader がどこまでを担うのか、失敗がどう分類されるのかがきれいにつながって見えてきます。後半では、その理解をもとに OSS 側をどう直したか、テストでどう固定したかまで触れます。

2. Map state とは何か

Step Functions の Map state は、配列やデータセットの各要素に対して同じ処理を順番に適用するための state です。

たとえば次のような入力があるとします。

{
  "workers": [
    { "workerId": "w1" },
    { "workerId": "w2" }
  ]
}

この workers を 1 件ずつ処理したいなら、Map state を使うと素直に書けます。

{
  "StartAt": "ProcessWorkers",
  "States": {
    "ProcessWorkers": {
      "Type": "Map",
      "ItemsPath": "$.workers",
      "ItemProcessor": {
        "StartAt": "PassItem",
        "States": {
          "PassItem": {
            "Type": "Pass",
            "End": true
          }
        }
      },
      "End": true
    }
  }
}

ここで見ておきたいのは、Map が反復対象と処理内容を分けて持っていることです。何を回すかは ItemsPathItems で決まり、各要素に何をするかは ItemProcessor に書きます。

Step Functions を触り始めた段階では、Map は配列を回すための state だと捉えておけば十分です。ただし、反復対象を state input の中ではなく外から持ってこられるようになると、この理解にもう一段階加わります。その入口になるのが ItemReader です。

3. ItemReader とは何か

ItemReader は、Map state が反復するデータセットを state input からではなく外部ソースから読むための仕組みです。

今回扱うのは、S3 に置いた JSON ファイルを読むケースです。定義は次のようになります。

{
  "StartAt": "ProcessWorkers",
  "States": {
    "ProcessWorkers": {
      "Type": "Map",
      "ItemReader": {
        "Resource": "arn:aws:states:::s3:getObject",
        "ReaderConfig": {
          "InputType": "JSON"
        },
        "Parameters": {
          "Bucket": "example-bucket",
          "Key": "workers.json"
        }
      },
      "ItemProcessor": {
        "ProcessorConfig": {
          "Mode": "DISTRIBUTED",
          "ExecutionType": "STANDARD"
        },
        "StartAt": "PassItem",
        "States": {
          "PassItem": {
            "Type": "Pass",
            "End": true
          }
        }
      },
      "End": true
    }
  }
}

見た目どおり、Map が S3 上のデータを読んで反復対象を作る、という理解で問題ありません。Resource = arn:aws:states:::s3:getObject は S3 の GetObject を使う指定で、ReaderConfig.InputType = JSON は取得した本文を JSON として解釈する指定です。Parameters.BucketParameters.Key で、どのオブジェクトを読むかを決めます。

ItemsPath との違いははっきりしています。ItemsPath は、すでに state input に入っている配列を使います。ItemReader は、実行時に外部ソースへ取りに行きます。外へ読みに行く以上、配列参照では出てこない失敗も入ってきます。そこで States.ItemReaderFailed が効いてきます。

4. どんな時に失敗するのか

ItemReader は外部ソースを読むので、失敗の種類も増えます。

実 AWS で確認した代表的な例は、S3 bucket が存在しない場合、object key が存在しない場合、実行ロールに S3 権限が足りない場合、ReaderConfig.InputType = JSON なのに本文が不正な JSON だった場合です。

どれも Map の入力を読む途中で起きる失敗です。AWS では、こうした実行時の失敗は States.ItemReaderFailed に分類されました。

たとえば存在しない key を読んだときの DescribeExecution の結果は次のようになります。

{
  "status": "FAILED",
  "error": "States.ItemReaderFailed",
  "cause": "The specified S3 object does not exist. | Message from S3: The specified key does not exist. ..."
}

ここで見落とせないのは、失敗したことそのものより、どういう種類の失敗として返されるかです。

5. States.ItemReaderFailed が重要な理由

Step Functions の error name は表示用のラベルではありません。Catch でどの失敗を拾うかを決めるために、そのまま制御フローで使われます。

S3 からの入力取得に失敗したときだけ別の処理へ流したい、という設計は自然にありえます。

{
  "Type": "Map",
  "ItemReader": {
    "...": "..."
  },
  "Catch": [
    {
      "ErrorEquals": ["States.ItemReaderFailed"],
      "Next": "Fallback"
    }
  ],
  "ItemProcessor": {
    "...": "..."
  }
}

もしここが States.Runtime など別の分類になってしまうと、書いた Catch と実際の挙動がずれます。

互換実装でも事情は同じです。S3 を読みに行って失敗したのに雑に States.Runtime へ寄せてしまうと、failure にはなるものの、ワークフローの意味は AWS と食い違います。今回 States.ItemReaderFailed を追っていて、AWS 互換を名乗るなら成功パスだけでなく失敗分類まで揃えないといけないと強く感じました。

6. 定義エラーと実行時エラーは別物

もう一つ面白かったのは、設定値そのものが不正なケースでは States.ItemReaderFailed にならなかったことです。

実 AWS で確認したのは、未対応の ItemReader.Resource と、未対応の ReaderConfig.InputType です。

ItemReader.Resource に未対応の ARN を入れると、StartExecution まで進む前に CreateStateMachine が失敗します。

Invalid State Machine Definition: 'SCHEMA_VALIDATION_FAILED: The field 'Resource' does not match any of the allowed values. Examples: [arn:<partition>:states:::s3:getObject, arn:<partition>:states:::s3:listObjectsV2] at /States/ProcessItems/ItemReader/Resource'

ReaderConfig.InputType も同じで、未対応の値を入れると InvalidDefinition になります。

Invalid State Machine Definition: 'SCHEMA_VALIDATION_FAILED: The field 'InputType' should have one of these values: [MANIFEST, JSON, CSV, JSONL, PARQUET] at /States/ProcessItems/ItemReader/ReaderConfig/InputType'

つまり、定義自体が不正なのか、定義は正しいが実際に読みに行って失敗したのかで、失敗の場所も名前も変わります。前者は CreateStateMachine 時点の InvalidDefinition で、後者は実行時の States.ItemReaderFailed です。

この線引きは、Step Functions を使う側にも、互換実装を作る側にも効いてきます。どこで失敗するべきかが違えば、テストの置き方も実装の責務分割も変わるからです。

7. OSS 実装を読んで未実装を見つけた

ここまで AWS 側の挙動を整理してから Floci の実装を見ると、差分がかなり追いやすくなりました。

きっかけは、以前作った Step Functions の PoC を Floci で動かしてみたことです。MapItemReader を含む定義が期待どおりに動かず、実装を追っていくと、Map state の入力解決が ItemsItemsPath を前提に組まれていることが見えてきました。

当時の実装は、おおむね Map の反復対象を ItemsItemsPath からしか解決しておらず、ItemReader で S3 を読む経路もありませんでした。実行時の失敗分類も、AWS の実挙動と比べると一部ずれていました。

最初は単純な bug に見えましたが、実 AWS を確認しながら切り分けると、S3-backed ItemReader の未対応機能と、既存実装のエラー分類差分が混ざっていました。この整理ができたことで、どこを feature として足し、どこを AWS 互換の bug fix として直すべきかが見えました。

8. 実 AWS で確認しながらどう直したか

実装ではまず、Map の入力解決を一か所に寄せるため resolveMapItems という helper を切り出しました。これで ItemsItemsPathItemReader の分岐を executeMapState(...) に直接書き散らさずに済みます。

その上で、ItemReader.Resource = arn:aws:states:::s3:getObject かつ ReaderConfig.InputType = JSON の場合に、S3 オブジェクトの本文を JSON として読み込み、Map の入力に使う処理を追加しました。

失敗分類も、実 AWS で確認した結果に合わせて調整しました。S3 の読み取りに失敗したときは States.ItemReaderFailed を返し、未対応の ResourceInputType は実行時ではなく定義時バリデーションで reject します。

このとき意識したのは、とりあえず動かすことより責務の置き場所をはっきりさせることでした。ItemReader のデータ取得は resolveItemReaderItems(...)Map の入力選択は resolveMapItems(...)、定義時バリデーションは StepFunctionsService の validator に寄せておくと、正常系も異常系も追いやすくなります。

9. テストでどう挙動を固定したか

今回の修正では、成功パスよりむしろ失敗系のテストを厚めに見ました。

S3 JSON を ItemReader で読めること、Map.Item の context が iteration ごとに使えること、不正 JSON が States.ItemReaderFailed になること、bucket 不存在と key 不存在が States.ItemReaderFailed になること、未対応の ItemReader.ResourceReaderConfig.InputTypeCreateStateMachine で reject されることを主に確認しています。

このあたりをテストで固定しておくと、後から実装を触ったときに、AWS とのずれが成功系ではなく失敗分類の側で出ていても気づきやすくなります。

AWS 互換実装では、正常に動くかどうかだけでは足りません。どう壊れるかまで揃っているかで、使う側の体験が変わります。今回の題材は、その差がかなり見えやすい例でした。

10. まとめ

今回は Step Functions の Map state と ItemReader を題材に、S3-backed dataset の読み込みとエラーハンドリングを整理しました。

Map state は配列やデータセットを 1 件ずつ処理するための state で、ItemReader を使うと S3 など外部ソースを反復対象にできます。S3 の読み取り失敗は States.ItemReaderFailed に分類され、設定値自体が不正な場合は InvalidDefinition になります。

今回の調査で、States.ItemReaderFailed が単なる名前ではなく、どこで何が失敗したのかをワークフローに伝えるための分類だとかなり腹落ちしました。Step Functions を使うだけでも役に立つ知識ですが、OSS 実装を直す文脈で見ると、なぜその error name で返す必要があるのかがいっそうはっきり見えてきます。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?