この記事は、「Fargate バッチを読むための Step Functions 入門」 の補講です。
Step Functions は何を解決するサービスか
一言でいうと、Step Functions は、処理の順序・再試行・失敗時の遷移といった制御フローを、アプリケーションコードの外側で管理するサービスです。
たとえば、バッチ処理を 1 本のスクリプトにまとめると、次のような関心がすべて入り込みます。
- 入力の受け取り
- 実処理の起動
- 成功 / 失敗の判定
- 一時的な失敗時の再試行
- タイムアウト
- 最終的なエラー通知
これが小規模なうちは問題になりにくいものの、処理が重くなったり、外部 API 呼び出しやジョブ起動を含むようになると、制御フローと本来の業務ロジックが混ざり始めます。
Step Functions は、この「制御フロー」の部分を状態機械として外に出します。
アプリケーションコードの責務:
- 何を計算するか
- どの API を呼ぶか
- どのデータを保存するか
Step Functions の責務:
- どの順で処理するか
- 失敗したら何回やり直すか
- どこで打ち切るか
- どこで成功とみなすか
これは、HTTP のリクエストハンドラに retry 制御やプロセス管理を全部埋め込まないのと似ています。業務ロジックと制御ロジックを分けることで、読みやすさと運用性が上がります。
では、Python などのコードはどこに書くのか
ここで疑問になりやすいのが、実際のコードをどこに書くのかという点です。結論からいうと、Step Functions に書くのは業務ロジックそのものではなく、どの順でどの処理を呼ぶかというフロー定義です。実際の計算や API 呼び出しや保存処理は、Lambda や Fargate コンテナの中の Python や Node.js などで実装します。
つまり役割分担はこうです。
- Python / Node.js / Go などのアプリコード:
実際の仕事をする - Step Functions:
その仕事をどういう順で進めるかを決める
最小の例で確認します。たとえば「対象 URL をクロールして、要約 API を呼び、結果を保存する」処理があるとします。
アプリケーションコードの例
まず、実際の処理を担う Python は次のようなイメージです。
import json
import os
import sys
from typing import Any
def crawl_page(url: str) -> str:
# 実際には HTTP 取得やヘッドレスブラウザ処理を書く
return f"content from {url}"
def summarize(text: str) -> dict[str, Any]:
# 実際には外部 API 呼び出しを書く
return {"summary": text[:20], "score": 0.87}
def save_result(target_id: str, result: dict[str, Any]) -> None:
# 実際には S3 / DB / 外部 API 保存などを書く
print("saving:", json.dumps({"target_id": target_id, "result": result}))
def main() -> int:
target_id = os.environ["TARGET_ID"]
target_url = os.environ["TARGET_URL"]
page = crawl_page(target_url)
summary = summarize(page)
save_result(target_id, summary)
return 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except Exception as exc:
print(f"batch failed: {exc}", file=sys.stderr)
raise SystemExit(1)
このコードが担当しているのは、まさに先ほどの 3 点です。
- 何を計算するか
- どの API を呼ぶか
- どのデータを保存するか
逆に、このコードの中では次のことはあまり考えたくありません。
- 3 回まで再試行する
- 30 分超えたら打ち切る
- 失敗したら通知フローへ進める
- 成功したら後続ジョブを呼ぶ
それは Step Functions 側へ寄せます。
Step Functions の定義例
同じ処理を Step Functions 側から見ると、責務はこうなります。
- 対象 URL を受け取る
- Fargate タスクを起動する
- 失敗したら 2 回までやり直す
- それでも失敗したら失敗で終わる
- 成功したらワークフロー全体を成功にする
最小の ASL はこう書けます。
{
"StartAt": "RunWorker",
"States": {
"RunWorker": {
"Type": "Task",
"Resource": "arn:aws:states:::ecs:runTask.sync",
"Parameters": {
"Cluster": "arn:aws:ecs:ap-northeast-1:123456789012:cluster/main",
"TaskDefinition": "example-batch:12",
"LaunchType": "FARGATE",
"Overrides": {
"ContainerOverrides": [
{
"Name": "worker",
"Environment": [
{
"Name": "TARGET_ID",
"Value.$": "$.targetId"
},
{
"Name": "TARGET_URL",
"Value.$": "$.targetUrl"
}
]
}
]
}
},
"Retry": [
{
"ErrorEquals": ["States.TaskFailed"],
"IntervalSeconds": 10,
"MaxAttempts": 2,
"BackoffRate": 2
}
],
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Next": "BatchFailed"
}
],
"End": true
},
"BatchFailed": {
"Type": "Fail",
"Error": "BatchError",
"Cause": "batch task failed"
}
}
}
この定義に書かれているのは、業務ロジックそのものではありません。書かれているのは制御です。
-
StartAt: どこから始めるか -
Resource: 何を起動するか -
Environment: 実行入力をどう渡すか -
Retry: 何回やり直すか -
Catch: 失敗したらどこへ進むか -
End/Fail: どこで成功・失敗を確定するか
Terraform で書くとどう見えるか
本編のように Terraform で管理するなら、見た目はこうなります。
resource "aws_sfn_state_machine" "batch" {
name = "example-batch"
role_arn = aws_iam_role.sfn_execution.arn
definition = jsonencode({
StartAt = "RunWorker"
States = {
RunWorker = {
Type = "Task"
Resource = "arn:aws:states:::ecs:runTask.sync"
Parameters = {
Cluster = aws_ecs_cluster.main.arn
TaskDefinition = aws_ecs_task_definition.batch.arn
LaunchType = "FARGATE"
}
Retry = [{
ErrorEquals = ["States.TaskFailed"]
IntervalSeconds = 10
MaxAttempts = 2
BackoffRate = 2
}]
Catch = [{
ErrorEquals = ["States.ALL"]
Next = "BatchFailed"
}]
End = true
}
BatchFailed = {
Type = "Fail"
Error = "BatchError"
Cause = "batch task failed"
}
}
})
}
この Terraform の中にも Python の処理本体は出てきません。出てくるのは、
- どのタスク定義を使うか
- どう再試行するか
- 失敗時にどう終わるか
です。
境界を 1 枚の図で見る
この図では、Step Functions とバッチ本体の責務を分けて見ます。
- Step Functions は「仕事をする人」ではない
- Python バッチは「順番を管理する人」ではない
それぞれの責務を分けることで、「オンデマンドバッチを必要なときだけ起動し、完了まで待つ」構成が読みやすくなります。
例外として、Step Functions だけで完結することはあるか
あります。ただし限定的です。
たとえば:
- DynamoDB に 1 レコード書く
- SNS に通知する
- 軽い入力整形をする
程度なら、Lambda やコンテナを書かず、Step Functions のサービス統合や組み込み関数だけで完結することがあります。
ただし、本編のような:
- ヘッドレスブラウザを動かす
- 外部 API を呼ぶ
- 重い処理を何分も走らせる
といったバッチでは、実処理はほぼ確実に Fargate や Lambda のコード側に置きます。
状態機械とは何か
Step Functions は、処理フローを「状態機械」として定義します。状態機械という言い方は少し固いですが、ここでは「処理の手順書」くらいに考えて差し支えありません。
最低限の要素はこれだけです。
-
StartAt: 最初にどのステップから始めるか -
States: 各ステップの定義 -
NextまたはEnd: 次にどこへ進むか、あるいはここで終わるか
たとえば「ジョブを起動して、失敗したら失敗で終わる」だけなら、概念的にはこうです。
{
"StartAt": "RunJob",
"States": {
"RunJob": {
"Type": "Task",
"Resource": "何かを実行する",
"End": true
}
}
}
コードにすると JSON や YAML や Terraform の jsonencode で少し長く見えますが、考え方はシンプルです。
大事なのは、Step Functions ではこのフロー自体が AWS 上の実行単位になることです。実行ごとに入力があり、途中経過があり、成功か失敗かが記録されます。
State の基本: Task / Choice / Wait / Succeed / Fail
本編を読むうえで、まず押さえておきたい State は次の 5 つです。
Task
何かを実行する状態です。Lambda を呼ぶ、ECS タスクを起動する、別の AWS API を叩く、といった「本体」はほぼここに入ります。
バッチ記事で主役になるのはこれです。
Choice
条件分岐です。入力や前段の結果を見て、どの State に進むかを決めます。
Wait
一定時間待つ State です。リトライの間隔を空けたいときや、外部システムの都合で少し待ちたいときに使います。
Succeed
成功で終わる State です。
Fail
失敗で終わる State です。原因のラベルを持たせて終了できます。
本編では、ECS タスクの失敗を Catch で拾って Fail State に送っています。これが「最終的に失敗した実行」として管理画面やログに残るので、運用時に追跡しやすくなります。
実行の流れ: 入力、出力、状態遷移
Step Functions は、各実行に対して JSON 入力を受け取ります。これが次の State に流れていきます。
たとえば入力がこうだとします。
{
"targetId": "job-123",
"urls": [
"https://example.com/a",
"https://example.com/b"
]
}
この JSON を、Task State の中で使います。本編では ECS タスク起動時の ContainerOverrides.Environment に差し込んでいました。
ここで大事なのは、Step Functions は単に「順番に関数を呼ぶもの」ではなく、入力を持った状態遷移の仕組みだということです。
アプリケーションコードの感覚だと、次に近いです。
- HTTP リクエストボディを受け取る
- 途中で少し形を変える
- 下流の関数やジョブへ渡す
- 失敗時の情報も状態として持つ
この「実行ごとに入力があり、状態が残る」性質のおかげで、後からどの入力で失敗したかを追いやすくなります。
エラー処理の基本: Retry と Catch
本編を理解するうえで特に重要なのが Retry と Catch です。
Retry
一時的な失敗をやり直す仕組みです。
たとえば以下は:
- 失敗したら
- 10 秒待って
- 最大 2 回まで再試行し
- 間隔を倍にする
という意味です。
"Retry": [{
"ErrorEquals": ["States.TaskFailed"],
"IntervalSeconds": 10,
"MaxAttempts": 2,
"BackoffRate": 2
}]
これはアプリケーションコードの for ループや sleep を状態機械側へ出したものです。
Catch
最終的に失敗したとき、どこへ遷移するかを定義します。
"Catch": [{
"ErrorEquals": ["States.ALL"],
"Next": "BatchFailed"
}]
ここで重要なのは、Retry と Catch は競合ではなく役割分担だということです。
-
Retry: まだ回復可能かもしれないのでやり直す -
Catch: もうこの実行は失敗として扱う
この考え方が入ると、本編の「起動・完了待ち・リトライ・失敗確定が 1 つの定義にまとまる」が理解しやすくなります。
Standard と Express の違い
Step Functions には大きく 2 種類あります。
- Standard
- Express
この記事では詳細比較はしませんが、本編を読むために最低限覚えるならこうです。
Standard
- 長めの処理を扱いやすい
- 実行履歴がしっかり残る
- 外部ジョブの完了待ちのような用途に向く
Express
- 短時間・高頻度向き
- 大量イベントをさばきやすい
- 長時間ジョブの完了待ち前提には向きにくい
本編が Standard 前提なのは、Fargate のバッチタスクを起動して、その完了まで待つからです。ここは「ジョブキューを投げてすぐ返す」世界とは違います。
他サービス連携の考え方: Lambda だけではなく ECS も呼べる
Step Functions というと Lambda を思い浮かべることが多いですが、実際にはそれだけではありません。AWS のいろいろなサービスと統合できます。
代表例:
- Lambda を呼ぶ
- DynamoDB を更新する
- SNS / SQS を使う
- ECS タスクを起動する
- Glue ジョブを起動する
ここでの見方は、「Step Functions 自体は処理本体ではない」ということです。処理本体は別のサービスにあり、Step Functions はそのオーケストレーションを担当します。
本編の主役はまさにこれで、処理本体は Fargate 上のコンテナにあり、Step Functions はその起動と監視を担当しています。
.sync とは何か
ここからが本題です。.sync は、この構成を理解するうえで中心になる概念です。
ECS タスクを Step Functions から起動する場合、大きく 2 つの考え方があります。
- 起動だけして次へ進む
- 起動して、終わるまで待つ
この違いがリソース ARN に現れます。
arn:aws:states:::ecs:runTaskarn:aws:states:::ecs:runTask.sync
.sync なし
.sync がない場合、Step Functions は ECS タスクの起動 API 呼び出しが通った時点で「この State は成功」とみなします。
つまり、次のような振る舞いになります。
- タスクを起動する
- Step Functions はすぐ次へ進む
- その後タスクが失敗しても、その失敗はこの State の失敗としては扱われない
これは「非同期に投げっぱなし」の感覚に近いです。
.sync あり
.sync が付くと、Step Functions は起動後に待機します。ECS タスクが終わるまで実行状態を保持し、終了コードなどを見て成功か失敗かを判断します。
つまり:
- タスクを起動する
- Step Functions が完了まで待つ
- タスクが成功終了なら State 成功
- タスクが非ゼロ終了なら State 失敗
となります。
なぜ .sync がそんなに重要なのか
これは、「ジョブをキューに投げた」だけなのか、「ジョブの完了結果まで受け取った」のかの違いです。
オンデマンドバッチでほしいのは後者です。なぜなら運用上知りたいのは、
- 起動できたか
- 最後まで完了したか
- 失敗したならどこで落ちたか
だからです。
もし .sync がなければ、
- 起動だけ成功
- 本体はあとで失敗
- しかし状態機械は成功扱い
という、運用上望ましくない状態が起きます。
どういうときに .sync が向いているか
- 完了を見届けたいバッチ
- 失敗を状態機械全体の失敗として扱いたい処理
- 次の処理を、前の処理の本当の成功後にだけ進めたいケース
どういうときに .sync なしでもよいか
- 起動できれば十分
- 結果は別の仕組みで回収する
- 完了待ちを別のイベント連携で扱う
本編では後者を扱うため、.sync が中心になります。
バッチ処理で Step Functions を使う利点
ここも本編に直結するポイントです。
最初に疑問になりやすいのは、「バッチなら単にスクリプトを実行すればよいのではないか」という点です。小規模であればその理解で問題ありません。Step Functions が有効になるのは、処理の重さや運用責任が増えたときです。
1. 制御フローをコードから切り離せる
バッチ本体のコンテナは「仕事をする」ことに集中できます。
- スクレイピングする
- API を呼ぶ
- 解析する
- 保存する
一方で、Step Functions は:
- 起動する
- 待つ
- リトライする
- 失敗で終わる
を担当します。
この分離によって、アプリケーションコードの見通しは大きく改善します。
2. 実行履歴が追いやすい
「昨日 14:03 の実行は何の入力で失敗したか」が追いやすくなります。
単なるスクリプト実行だと、ログやジョブ管理を自前で組み合わせないとこの視点が持ちにくいです。Step Functions だと、実行単位で開始、成功、失敗が見えるので、運用時に追跡しやすい構造になります。
3. リトライを宣言的に書ける
外部 API がたまに落ちる、ネットワークが一瞬不安定、起動直後だけ失敗しやすい、といった「一時的失敗」は現実によくあります。
それをアプリコードに毎回書き込むより、ワークフロー側に寄せるほうが筋がよいことが多いです。
4. 常駐サービスにしなくてよい
本編の大きな主張はここです。
月に数回しか動かない重い処理のために、desired_count = 1 の ECS サービスを常駐させるのは、待機コストを払い続けることになります。
Step Functions + Fargate にすると:
- 必要なときだけ起動
- 終わったら停止
- 完了までは状態機械が持つ
ので、オンデマンドバッチとの相性がよくなります。
5. タイムアウトや失敗確定の境界が明確になる
「どこまで失敗を許容するか」「何回までやり直すか」「最終的に誰が失敗と判断するか」が、定義として見える形になります。
これはコードレビューや障害調査でも効きます。
6. 後から分岐や前処理を足しやすい
最初は 1 Task State だけでも、後から:
- 予算チェック
- 入力検証
- 通知
- 成果物の後処理
を State として追加できます。
本編で「状態機械だけでは足りない」としているのは、最初の 1 ステップは簡単でも、周辺の運用責務が増えていくからです。
Fargate と組み合わせるときに見るポイント
ここまでの Step Functions 側の話だけでも、.sync、Retry、Catch、入出力の流れは追えるようになります。一方で、Step Functions が Fargate を起動する構成になると、次の 3 点も実行成否に直結します。
- どのタスク定義を起動するか
- どのネットワークで動かすか
- どの権限で起動し、アプリがどの権限で動くか
本編では TaskDefinition、Subnets、SecurityGroups、AssignPublicIp、execution role、task role が出てきます。これらは Step Functions の外側に見えますが、実際にはワークフローが正しく完了するかどうかを大きく左右します。
この部分は Fargate 側の前提知識に寄るので、詳細は Fargate の入門記事で補います。ここでは、「Step Functions の .sync が分かっても、それだけでは Fargate バッチ全体は読めない」という橋渡しだけ押さえておけば十分です。
小さな例: 「対象 ID を受け取ってバッチを 1 回走らせる」
ここまでの関係を簡略化すると、以下のようになります。
この関係が見えていると、本編の Step Functions + Fargate のコードも読みやすくなります。
まとめ
Step Functions は、アプリケーション処理そのものを置く場所ではなく、処理フローを管理する場所です。
オンデマンドバッチとの相性がよい理由は、次の 3 つに集約できます。
- 起動、完了待ち、再試行、失敗確定を 1 か所で管理できる
- 実行履歴が残るので運用しやすい
-
.syncを使うと Fargate タスクの実際の完了まで責任を持てる
本編では、この基礎の上にさらに
- ECS 起動権限をどう絞るか
-
iam:PassRoleをどう限定するか - execution role と task role をどう分けるか
まで踏み込みます。この入門記事によって「Step Functions は何をしていて、なぜ .sync が重要なのか」が把握できれば、その先の専門的な内容も追いやすくなります。