0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Step Functions + ECSで作る「部分失敗・リトライ付き」並列バッチ処理PoC

0
Last updated at Posted at 2026-05-08

はじめに

「並列でECSタスクを複数起動して、失敗したやつだけ再実行したい」という要件をAWS Step Functionsで実現するPoCを作った。

調べてみるとStep Functionsは公式ドキュメントが充実している反面、設計の判断ポイント(なぜそうするか・何を選ぶか)が散らばっていて、全体像をつかむのに時間がかかった。この記事ではPoCの構成と、実装中に判断が必要だった箇所を中心にまとめる。

実装コード:

Step Functionsとは

AWS Step Functionsは、複数の処理(Lambda / ECS / Batchなど)を「状態遷移」としてつなぐオーケストレーションサービス。
アプリ本体に分岐・再試行・並列制御を埋め込まず、ワークフローとして分離できるのが強み。

このPoCではStep Functionsを次の責務で使っている。

  • 実行入力の検証(batchId, text
  • 親タスク -> ワーカー群の順序制御
  • Distributed Mapによる並列実行
  • 部分失敗の集約と失敗分だけの再実行
  • 再試行上限(retryLimit)での打ち切り判定

対象読者

  • ECS・Lambda・S3・SQSなどの基本的なAWSサービスは使ったことがある
  • Step Functionsを使ったことがない、または触り始めたばかり
  • 「バッチ処理をどうオーケストレーションするか」に興味がある

この記事では扱わないこと

  • AWSの基本サービス(ECS・Lambda・S3・SQS)の説明
  • Terraformの書き方
  • Goの実装詳細

PoCで確認したかったこと

  • Step FunctionsからECS Run Taskで親タスクを起動し、出力をもとに複数のワーカータスクを並列実行できるか
  • ワーカーの部分失敗を許容しながら全件処理を走らせ、失敗分だけ再実行できるか
  • 業務的な失敗(アプリ側で制御したい失敗)と技術的な失敗(ECS起動失敗など)を分離できるか
  • SQS + Lambdaの非同期後続処理を組み合わせられるか

PoCで作ったもの

サービス構成

サービス 役割
Step Functions ワークフロー全体の制御
ECS Fargate(親タスク) ワーカー一覧JSONをS3に書き出す
ECS Fargate(ワーカータスク) 1件単位の処理 + SQS送信 + 結果JSONをS3に保存
S3 親タスクの出力(workers.json)置き場
Lambda(集約) ワーカー実行結果を集約し、失敗分を抽出
SQS ワーカータスクから処理Lambdaへの非同期受け渡し
Lambda(処理) SQSメッセージの消費・後続処理
Terraform インフラ全体の管理

全体シーケンス図

Step Functionsがワークフロー全体を制御し、ECS/Lambda/S3は「何をするか」だけを担う構造になっている。


設計ポイント

1. 業務失敗と技術失敗を分ける

このPoCで一番重要な設計判断がこれ。

技術失敗

ECS起動失敗・タイムアウト・コンテナクラッシュなど、インフラ側の問題。Step FunctionsのRetry / Catchでそのまま扱う。

業務失敗

アプリ側で「失敗した」と判定したケース(PoCではworkerIdごとにハードコードで失敗を注入している)。
現在の実装では業務失敗でもexit 0 で終了し、失敗内容はS3の結果JSONに保存する。

集約LambdaはresultS3Keyからこの結果JSONを読み、再試行判定に使う。

{
  "workerId": "w2",
  "status": "FAILED",
  "errorType": "RETRYABLE",
  "message": "forced retryable failure for worker w2 first attempt"
}

現在の実装での失敗分類

  • Task.RetryはECS起動失敗・タイムアウトなどの一時的な技術失敗を吸収
  • 業務失敗はworkerがS3へ保存したstatus/errorTypeを集約Lambdaが読んで判定
  • Retryしきれず失敗したworker(技術失敗)はCatchRETRYABLEに正規化して再実行候補に積む

2. Distributed Map + S3 ItemReader

ワーカー並列実行にはDistributed Mapを選んだ。

Inline Map Distributed Map
各アイテムの実行 親の実行内で処理 子のStep Functions実行として独立
1件失敗時の挙動 Map全体が止まる 他件は継続(ToleratedFailurePercentage次第)
向き不向き 小規模・短時間 大規模・長時間・部分失敗許容

「部分失敗を許容しながら全件処理を継続する」という要件上、Distributed Mapが必須だった。

ItemReaderでS3から直接読む

"ItemReader": {
  "Resource": "arn:aws:states:::s3:getObject",
  "ReaderConfig": { "InputType": "JSON" },
  "Parameters": {
    "Bucket": "__WORKERS_S3_BUCKET__",
    "Key.$": "$.workersS3Key"
  }
}

親タスクがS3に書いたworkers.jsonを、Step Functionsが直接読み込んでMapの入力に使う。Stateのデータ(256KB上限)を経由しないので、ワーカー数が増えても上限を気にしなくてよい。

リトライ時はItemReaderを使わずItemsPath: "$.workers"でState上の失敗リストを使う。Lambdaが抽出済みなのでS3を読み直す必要がない。

ToleratedFailurePercentage: 100

デフォルトでは1件でも失敗するとMapが中断する。100にすることで全件処理を完走させ、次の集約ステップに進める。部分失敗を「後で集約して再実行する」設計には必須の設定。


3. カスタムリトライループ

Step Functions標準のRetryはステート単位の再試行で、「失敗したワーカーだけを再実行する」という粒度では使えない。そのためワークフローレベルのループを自前で実装している。

RunInitialWorkerTasksFromS3 (Map)
  ↓
AggregateWorkerTaskAttemptResults (Lambda)
  failedWorkersを抽出
  ↓
HasRetryableFailedWorkerTasks (Choice)
  failedWorkersあり → CanRetryFailedWorkerTasksへ
  なし → Finish
  ↓
CanRetryFailedWorkerTasks (Choice)
  retryCount < retryLimit → PrepareNextRetryへ
  retryCount >= retryLimit → FinishWithFailedWorkers
  ↓
PrepareNextRetryWorkerTasks (Pass)
  retryCount++, workers = failedWorkers
  ↓
RunRetryWorkerTasks (Map)
  ↓
AggregateWorkerTaskAttemptResultsへ戻る(ループ)
  • results:今回の試行結果(毎回リセット)
  • allResults:全試行の累積(集約Lambdaが引き継いでいく)
  • resultS3Key:各worker試行の結果JSON保存先(results/{jobId}/<worker>-attempt-<n>.json

この2つを分けることで、集約Lambdaが「今回の失敗」だけを正確に判定できる。

ワークフローがFailで終わる条件

業務的な失敗(ワーカーが処理に失敗した)はSucceedで終わる。失敗の詳細はfailedWorkersFinalというデータとして返す。
Fail Stateになるのはワークフロー自体が動けない場合のみ:

State 条件
FailInvalidWorkflowInput batchId / textが入力にない
FailParentTaskExecution 親ECSタスクが技術的に失敗した

ASLの基礎

設計思想:宣言的なワークフロー

ASL(Amazon States Language)の本質は「何をするかだけ書いて、どう動かすかはStep Functionsに任せる」宣言的な記述スタイル。Reactとの対比が分かりやすい:

React Step Functions ASL
宣言するもの UIの状態とコンポーネントツリー ワークフローの状態と遷移
ランタイム React DOM Step Functionsエンジン
開発者が書かない DOM操作・差分更新 リトライ・並列制御・エラー捕捉

より正確にはステートマシンに近い。XStateやUMLステートチャートと同じ概念で、「状態と遷移の図」をそのままJSONで書く。

この設計の恩恵:

  • リトライRetryを書くだけで自動再実行
  • 並列実行Mapを書くだけでスレッド管理不要
  • 可視化 → ASLをそのままAWS ConsoleがGUIで描画

Stateの種類

Type 役割
Task 実際の処理(ECS・Lambda呼び出し)
Choice 条件分岐(if文)
Map 並列実行(各アイテムに同じ処理)
Pass データの変換・初期化(外部アクセスなし)
Succeed 正常終了
Fail 異常終了

$$$の違い

記法 指すもの
$ 今のStateに流れてきたデータ(入力JSON) $.batchId
$$ Step Functionsが管理するコンテキスト情報 $$.Execution.Id

$$.Execution.Idはユーザーが渡すものではなく、Step Functionsが実行ごとに自動で持つメタデータ。全レイヤーのログにこれを出力することで、CloudWatch Logsでの横断的なトレースが可能になる。

入出力データの5段変換

Stateを通るたびにデータは最大5段階で変換される:

入力JSON
  → [InputPath]   何を取り出すか
  → [Parameters]  どう整形してTaskに渡すか
  → Task実行
  → [ResultPath]  結果をどこに書くか
  → [OutputPath]  次のStateに何を渡すか
→ 次のStateの入力JSON

ResultPathを指定しないと元データが消える

入力: { batchId, text, retryCount, ... }
ECS実行結果: { TaskArn, Containers, ... }

ResultPathなし → 出力: { TaskArn, Containers, ... }  ← 元データが消える
"ResultPath": "$.parentTaskExecution" → 出力: { batchId, ..., parentTaskExecution: { TaskArn, ... } }

OutputPathでLambdaのラッパーを剥がす

Lambdaの戻り値はStep Functionsがラップした構造で返ってくる:

{
  "StatusCode": 200,
  "Payload": { "jobId": "...", "failedWorkers": [...] }
}

"OutputPath": "$.Payload"Payloadの中身だけ取り出すと、次のStateがそのまま$.failedWorkersを参照できる。


.sync vs .waitForTaskToken

ECSタスクの完了を検知する方法は2つある。

.sync

Step FunctionsがポーリングでECSタスクのステータスを監視。STOPPEDになったら完了とみなす。コンテナ側はStep Functionsの存在を知らない。

.waitForTaskToken

Step Functionsがタスクトークンを発行して一時停止。コンテナが明示的にSendTaskSuccess / SendTaskFailureを呼ぶまで待つ。

どちらを選ぶか

.sync .waitForTaskToken
コンテナの改修 不要 AWS SDK + SendTaskSuccessの実装が必要
出力データ exitCode程度 任意のJSON(256KBまで)
成功/失敗判定 exitCodeのみ コンテナ側で自由に制御
Heartbeat なし あり(長時間タスクの生存確認)
コンテナの独立性 高い(単体でも動く) Step Functionsに密結合

このPoCでは.syncを採用した。ワーカータスクをStep Functionsの外でも単体実行できるようにしておきたかった点と、出力受け渡しはS3経由で十分だったため、.waitForTaskTokenで任意JSONを返す構成にはしなかった。

.syncで値を後続Stateに渡す場合

.syncではECSの出力を直接Step Functionsに返せないため、S3やDynamoDBを経由する。このPoCでは以下をS3経由にしている。

  • 親タスク: workers.jsonを保存し、MapがItemReaderで読む
  • ワーカータスク: results/*.jsonを保存し、集約Lambdaが読む

Terraformでのデプロイ(__PLACEHOLDER__問題)

ASLのJSON内には$$.Execution.Idなど$が多用されている。Terraformのtemplatefile()${VAR}記法で変数を埋め込むが、ASL内の$と衝突する。

そのため、ARNなどの環境依存値を__PLACEHOLDER__記法で書いておき、replace()でデプロイ時に置換する構成にしている:

locals {
  definition = replace(
    replace(
      templatefile(var.workflow_template_file, {}),
      "__PARENT_ECS_CLUSTER_ARN__",
      var.parent_cluster_arn
    ),
    "__WORKER_ECS_CLUSTER_ARN__",
    var.worker_cluster_arn
  )
}

CDKを使えば変数を直接渡せるのでreplace()のネストは不要になる。今回はTerraformで全体を管理する方針のためこの構成を採用した。


まとめ

このPoCで判断が必要だった主なポイント:

  1. 失敗制御の分離 → 一時的な失敗はTask.Retryで吸収し、業務失敗はS3結果JSONを集約して再実行判定する
  2. Distributed Mapを選ぶ理由 → Inline Mapは1件失敗でMap全体が止まるため、部分失敗許容には使えない
  3. カスタムリトライループ → 「失敗したワーカーだけを再実行する」は標準Retryでは実現できない
  4. .syncを選ぶ理由 → コンテナの独立性を保ちたい場合・exitCodeで十分な場合はシンプルな.syncが適切

Step Functionsは「どう動かすか(スレッド管理・リトライ・分岐)」をエンジンに任せられるのが強みで、コンテナやLambdaは「何をするか」だけに集中できる。ただし、Stateを横断するデータの流れ(InputPath/ResultPath/OutputPath)の設計は最初に把握しておかないとデバッグが辛い。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?