はじめに
「並列でECSタスクを複数起動して、失敗したやつだけ再実行したい」という要件をAWS Step Functionsで実現するPoCを作った。
調べてみるとStep Functionsは公式ドキュメントが充実している反面、設計の判断ポイント(なぜそうするか・何を選ぶか)が散らばっていて、全体像をつかむのに時間がかかった。この記事ではPoCの構成と、実装中に判断が必要だった箇所を中心にまとめる。
実装コード:
Step Functionsとは
AWS Step Functionsは、複数の処理(Lambda / ECS / Batchなど)を「状態遷移」としてつなぐオーケストレーションサービス。
アプリ本体に分岐・再試行・並列制御を埋め込まず、ワークフローとして分離できるのが強み。
このPoCではStep Functionsを次の責務で使っている。
- 実行入力の検証(
batchId,text) - 親タスク -> ワーカー群の順序制御
- Distributed Mapによる並列実行
- 部分失敗の集約と失敗分だけの再実行
- 再試行上限(
retryLimit)での打ち切り判定
対象読者
- ECS・Lambda・S3・SQSなどの基本的なAWSサービスは使ったことがある
- Step Functionsを使ったことがない、または触り始めたばかり
- 「バッチ処理をどうオーケストレーションするか」に興味がある
この記事では扱わないこと
- AWSの基本サービス(ECS・Lambda・S3・SQS)の説明
- Terraformの書き方
- Goの実装詳細
PoCで確認したかったこと
- Step FunctionsからECS Run Taskで親タスクを起動し、出力をもとに複数のワーカータスクを並列実行できるか
- ワーカーの部分失敗を許容しながら全件処理を走らせ、失敗分だけ再実行できるか
- 業務的な失敗(アプリ側で制御したい失敗)と技術的な失敗(ECS起動失敗など)を分離できるか
- SQS + Lambdaの非同期後続処理を組み合わせられるか
PoCで作ったもの
サービス構成
| サービス | 役割 |
|---|---|
| Step Functions | ワークフロー全体の制御 |
| ECS Fargate(親タスク) | ワーカー一覧JSONをS3に書き出す |
| ECS Fargate(ワーカータスク) | 1件単位の処理 + SQS送信 + 結果JSONをS3に保存 |
| S3 | 親タスクの出力(workers.json)置き場 |
| Lambda(集約) | ワーカー実行結果を集約し、失敗分を抽出 |
| SQS | ワーカータスクから処理Lambdaへの非同期受け渡し |
| Lambda(処理) | SQSメッセージの消費・後続処理 |
| Terraform | インフラ全体の管理 |
全体シーケンス図
Step Functionsがワークフロー全体を制御し、ECS/Lambda/S3は「何をするか」だけを担う構造になっている。
設計ポイント
1. 業務失敗と技術失敗を分ける
このPoCで一番重要な設計判断がこれ。
技術失敗
ECS起動失敗・タイムアウト・コンテナクラッシュなど、インフラ側の問題。Step FunctionsのRetry / Catchでそのまま扱う。
業務失敗
アプリ側で「失敗した」と判定したケース(PoCではworkerIdごとにハードコードで失敗を注入している)。
現在の実装では業務失敗でもexit 0 で終了し、失敗内容はS3の結果JSONに保存する。
集約LambdaはresultS3Keyからこの結果JSONを読み、再試行判定に使う。
{
"workerId": "w2",
"status": "FAILED",
"errorType": "RETRYABLE",
"message": "forced retryable failure for worker w2 first attempt"
}
現在の実装での失敗分類
-
Task.RetryはECS起動失敗・タイムアウトなどの一時的な技術失敗を吸収 - 業務失敗はworkerがS3へ保存した
status/errorTypeを集約Lambdaが読んで判定 -
Retryしきれず失敗したworker(技術失敗)はCatchでRETRYABLEに正規化して再実行候補に積む
2. Distributed Map + S3 ItemReader
ワーカー並列実行にはDistributed Mapを選んだ。
| Inline Map | Distributed Map | |
|---|---|---|
| 各アイテムの実行 | 親の実行内で処理 | 子のStep Functions実行として独立 |
| 1件失敗時の挙動 | Map全体が止まる | 他件は継続(ToleratedFailurePercentage次第) |
| 向き不向き | 小規模・短時間 | 大規模・長時間・部分失敗許容 |
「部分失敗を許容しながら全件処理を継続する」という要件上、Distributed Mapが必須だった。
ItemReaderでS3から直接読む
"ItemReader": {
"Resource": "arn:aws:states:::s3:getObject",
"ReaderConfig": { "InputType": "JSON" },
"Parameters": {
"Bucket": "__WORKERS_S3_BUCKET__",
"Key.$": "$.workersS3Key"
}
}
親タスクがS3に書いたworkers.jsonを、Step Functionsが直接読み込んでMapの入力に使う。Stateのデータ(256KB上限)を経由しないので、ワーカー数が増えても上限を気にしなくてよい。
リトライ時はItemReaderを使わずItemsPath: "$.workers"でState上の失敗リストを使う。Lambdaが抽出済みなのでS3を読み直す必要がない。
ToleratedFailurePercentage: 100
デフォルトでは1件でも失敗するとMapが中断する。100にすることで全件処理を完走させ、次の集約ステップに進める。部分失敗を「後で集約して再実行する」設計には必須の設定。
3. カスタムリトライループ
Step Functions標準のRetryはステート単位の再試行で、「失敗したワーカーだけを再実行する」という粒度では使えない。そのためワークフローレベルのループを自前で実装している。
RunInitialWorkerTasksFromS3 (Map)
↓
AggregateWorkerTaskAttemptResults (Lambda)
failedWorkersを抽出
↓
HasRetryableFailedWorkerTasks (Choice)
failedWorkersあり → CanRetryFailedWorkerTasksへ
なし → Finish
↓
CanRetryFailedWorkerTasks (Choice)
retryCount < retryLimit → PrepareNextRetryへ
retryCount >= retryLimit → FinishWithFailedWorkers
↓
PrepareNextRetryWorkerTasks (Pass)
retryCount++, workers = failedWorkers
↓
RunRetryWorkerTasks (Map)
↓
AggregateWorkerTaskAttemptResultsへ戻る(ループ)
-
results:今回の試行結果(毎回リセット) -
allResults:全試行の累積(集約Lambdaが引き継いでいく) -
resultS3Key:各worker試行の結果JSON保存先(results/{jobId}/<worker>-attempt-<n>.json)
この2つを分けることで、集約Lambdaが「今回の失敗」だけを正確に判定できる。
ワークフローがFailで終わる条件
業務的な失敗(ワーカーが処理に失敗した)はSucceedで終わる。失敗の詳細はfailedWorkersFinalというデータとして返す。
Fail Stateになるのはワークフロー自体が動けない場合のみ:
| State | 条件 |
|---|---|
FailInvalidWorkflowInput |
batchId / textが入力にない |
FailParentTaskExecution |
親ECSタスクが技術的に失敗した |
ASLの基礎
設計思想:宣言的なワークフロー
ASL(Amazon States Language)の本質は「何をするかだけ書いて、どう動かすかはStep Functionsに任せる」宣言的な記述スタイル。Reactとの対比が分かりやすい:
| React | Step Functions ASL | |
|---|---|---|
| 宣言するもの | UIの状態とコンポーネントツリー | ワークフローの状態と遷移 |
| ランタイム | React DOM | Step Functionsエンジン |
| 開発者が書かない | DOM操作・差分更新 | リトライ・並列制御・エラー捕捉 |
より正確にはステートマシンに近い。XStateやUMLステートチャートと同じ概念で、「状態と遷移の図」をそのままJSONで書く。
この設計の恩恵:
-
リトライ →
Retryを書くだけで自動再実行 -
並列実行 →
Mapを書くだけでスレッド管理不要 - 可視化 → ASLをそのままAWS ConsoleがGUIで描画
Stateの種類
| Type | 役割 |
|---|---|
Task |
実際の処理(ECS・Lambda呼び出し) |
Choice |
条件分岐(if文) |
Map |
並列実行(各アイテムに同じ処理) |
Pass |
データの変換・初期化(外部アクセスなし) |
Succeed |
正常終了 |
Fail |
異常終了 |
$と$$の違い
| 記法 | 指すもの | 例 |
|---|---|---|
$ |
今のStateに流れてきたデータ(入力JSON) | $.batchId |
$$ |
Step Functionsが管理するコンテキスト情報 | $$.Execution.Id |
$$.Execution.Idはユーザーが渡すものではなく、Step Functionsが実行ごとに自動で持つメタデータ。全レイヤーのログにこれを出力することで、CloudWatch Logsでの横断的なトレースが可能になる。
入出力データの5段変換
Stateを通るたびにデータは最大5段階で変換される:
入力JSON
→ [InputPath] 何を取り出すか
→ [Parameters] どう整形してTaskに渡すか
→ Task実行
→ [ResultPath] 結果をどこに書くか
→ [OutputPath] 次のStateに何を渡すか
→ 次のStateの入力JSON
ResultPathを指定しないと元データが消える
入力: { batchId, text, retryCount, ... }
ECS実行結果: { TaskArn, Containers, ... }
ResultPathなし → 出力: { TaskArn, Containers, ... } ← 元データが消える
"ResultPath": "$.parentTaskExecution" → 出力: { batchId, ..., parentTaskExecution: { TaskArn, ... } }
OutputPathでLambdaのラッパーを剥がす
Lambdaの戻り値はStep Functionsがラップした構造で返ってくる:
{
"StatusCode": 200,
"Payload": { "jobId": "...", "failedWorkers": [...] }
}
"OutputPath": "$.Payload"でPayloadの中身だけ取り出すと、次のStateがそのまま$.failedWorkersを参照できる。
.sync vs .waitForTaskToken
ECSタスクの完了を検知する方法は2つある。
.sync
Step FunctionsがポーリングでECSタスクのステータスを監視。STOPPEDになったら完了とみなす。コンテナ側はStep Functionsの存在を知らない。
.waitForTaskToken
Step Functionsがタスクトークンを発行して一時停止。コンテナが明示的にSendTaskSuccess / SendTaskFailureを呼ぶまで待つ。
どちらを選ぶか
.sync |
.waitForTaskToken |
|
|---|---|---|
| コンテナの改修 | 不要 | AWS SDK + SendTaskSuccessの実装が必要 |
| 出力データ | exitCode程度 | 任意のJSON(256KBまで) |
| 成功/失敗判定 | exitCodeのみ | コンテナ側で自由に制御 |
| Heartbeat | なし | あり(長時間タスクの生存確認) |
| コンテナの独立性 | 高い(単体でも動く) | Step Functionsに密結合 |
このPoCでは.syncを採用した。ワーカータスクをStep Functionsの外でも単体実行できるようにしておきたかった点と、出力受け渡しはS3経由で十分だったため、.waitForTaskTokenで任意JSONを返す構成にはしなかった。
.syncで値を後続Stateに渡す場合
.syncではECSの出力を直接Step Functionsに返せないため、S3やDynamoDBを経由する。このPoCでは以下をS3経由にしている。
- 親タスク:
workers.jsonを保存し、MapがItemReaderで読む - ワーカータスク:
results/*.jsonを保存し、集約Lambdaが読む
Terraformでのデプロイ(__PLACEHOLDER__問題)
ASLのJSON内には$$.Execution.Idなど$が多用されている。Terraformのtemplatefile()は${VAR}記法で変数を埋め込むが、ASL内の$と衝突する。
そのため、ARNなどの環境依存値を__PLACEHOLDER__記法で書いておき、replace()でデプロイ時に置換する構成にしている:
locals {
definition = replace(
replace(
templatefile(var.workflow_template_file, {}),
"__PARENT_ECS_CLUSTER_ARN__",
var.parent_cluster_arn
),
"__WORKER_ECS_CLUSTER_ARN__",
var.worker_cluster_arn
)
}
CDKを使えば変数を直接渡せるのでreplace()のネストは不要になる。今回はTerraformで全体を管理する方針のためこの構成を採用した。
まとめ
このPoCで判断が必要だった主なポイント:
-
失敗制御の分離 → 一時的な失敗は
Task.Retryで吸収し、業務失敗はS3結果JSONを集約して再実行判定する - Distributed Mapを選ぶ理由 → Inline Mapは1件失敗でMap全体が止まるため、部分失敗許容には使えない
- カスタムリトライループ → 「失敗したワーカーだけを再実行する」は標準Retryでは実現できない
-
.syncを選ぶ理由 → コンテナの独立性を保ちたい場合・exitCodeで十分な場合はシンプルな.syncが適切
Step Functionsは「どう動かすか(スレッド管理・リトライ・分岐)」をエンジンに任せられるのが強みで、コンテナやLambdaは「何をするか」だけに集中できる。ただし、Stateを横断するデータの流れ(InputPath/ResultPath/OutputPath)の設計は最初に把握しておかないとデバッグが辛い。