📁 完全なコードはGitHubで公開:GitHub: pipeiac02
シリーズ一覧(全12回)
Phase 1: 基盤構築
Phase 2: ワークフロー
Phase 3: セキュリティ・運用
Phase 4: 開発効率化
1. はじめに
1-1. 今回のゴール
Step Functionsを作成して、Lambda → Crawler の順序制御を実装します。
| ゴール | 内容 |
|---|---|
| 設計 | ワークフローの流れを理解する |
| 実装 | TerraformでStep Functionsを作成する |
| 確認 | パイプライン全体が動くことを確認する |
1-2. 前回の振り返り
前回(第4回)では、Glue Crawlerを作成しました。
1-3. 自動実行
各コンポーネントを 自動で順番に実行 できるようにしていきます。
1-4. この記事で作るもの
2. 比喩で理解する
2-1. Step Functionsを「調理長」で考える
料理人やメニュー係がいても、誰かが指示を出さないとバラバラに動いてしまいます。
2-2. 比喩の図解(レストラン)
2-3. 調理長の仕事
| 手順 | 内容 |
|---|---|
| 1. 注文を受ける | 「パスタ1つ」 |
| 2. 料理人に指示 | 「パスタを作って」 |
| 3. 完了を確認 | 「できました」 |
| 4. メニュー係に指示 | 「メニュー表を更新して」 |
| 5. 完了を確認 | 「更新しました」 |
| 6. 報告 | 「全工程完了」 |
2-4. なぜ調理長が必要?
2-5. 技術の図解(AWS)
2-6. 対応関係
| レストラン | AWS | 役割 |
|---|---|---|
| 調理長 | Step Functions | 工程管理・順序制御 |
| 調理指示 | Lambda Task | Lambdaを実行 |
| メニュー更新指示 | Glue Task | Crawlerを実行 |
| 工程表 | ステートマシン定義 | 実行順序の定義 |
| 完了報告 | ステータス | 成功/失敗の確認 |
2-7. ステートマシンとは?
| 用語 | 料理で例えると |
|---|---|
| ステート | 1つの工程(調理、盛り付け) |
| トランジション | 次の工程への移行 |
| ステートマシン | 工程表全体 |
3. Step Functionsの役割
3-1. なぜStep Functionsが必要か
個別にサービスを呼び出すと、管理が大変になります。
3-2. Step Functionsのメリット
| メリット | 説明 | 料理で例えると |
|---|---|---|
| 順序制御 | タスクを順番に実行 | 工程表通りに進める |
| 可視化 | 実行状況をGUIで確認 | 調理長が全体を見渡せる |
| エラー処理 | 失敗時のリトライや分岐 | 失敗したら作り直し |
| ログ | 実行履歴を記録 | 作業日報 |
3-3. 今回のワークフロー
3-4. ステートの種類
| ステート | 用途 | 今回の使用 |
|---|---|---|
| Task | サービスを実行 | Lambda, Crawler |
| Pass | データを渡すだけ | - |
| Wait | 待機 | - |
| Choice | 条件分岐 | - |
| Parallel | 並列実行 | - |
| Succeed | 成功終了 | ✅ |
| Fail | 失敗終了 | - |
3-5. ASL(Amazon States Language)とは
Step Functionsの定義はJSONで記述します。
{
"StartAt": "ETL",
"States": {
"ETL": {
"Type": "Task",
"Next": "Crawl"
},
"Crawl": {
"Type": "Task",
"End": true
}
}
}
| 要素 | 意味 | 料理で例えると |
|---|---|---|
| StartAt | 最初のステート | 最初の工程 |
| States | ステートの一覧 | 全工程のリスト |
| Type | ステートの種類 | 工程の種類(調理、確認) |
| Next | 次のステート | 次の工程 |
| End | 終了フラグ | 最後の工程 |
4. 実装
4-1. ファイル構成
今回作成するファイル:
pipeiac02/
├── test-data/
│ └── ec-sales-05.json # ★テスト用サンプルデータ
└── tf/
├── stepfunctions.tf # ★今回作成
└── iam.tf # ★Step Functions用ロール追加
4-2. StepFunctions用IAM(iam.tf に追加)
Step FunctionsがLambdaとGlueを実行するための権限を設定します。
コードの骨格:
# IAMロール(信頼ポリシー)
resource "aws_iam_role" "sfn" {
name = "${var.project}-sfn-role"
assume_role_policy = jsonencode({...}) # states.amazonaws.com を許可
}
# IAMポリシー(権限ポリシー)
resource "aws_iam_policy" "sfn" {
name = "${var.project}-sfn-policy"
policy = jsonencode({
Statement = [
{ Action = ["lambda:InvokeFunction"], Resource = [...] }, # Lambda実行
{ Action = ["glue:StartCrawler", "glue:GetCrawler"], Resource = [...] }, # Crawler起動
{ Action = ["sns:Publish"], Resource = [...] } # SNS通知
]
})
}
# ポリシーをロールにアタッチ
resource "aws_iam_role_policy_attachment" "sfn" {...}
フルコード:
# iam.tf に追加
# ================================
# Step Functions用IAM
# ================================
# Step Functions用IAMロール
# Step FunctionsがLambda、Glue、SNSを操作するための権限を付与
resource "aws_iam_role" "sfn" {
name = "${var.project}-sfn-role"
# Step Functions用の信頼ポリシー
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = { Service = "states.amazonaws.com" }
}]
})
tags = merge(local.common_tags, {
Name = "${var.project}-sfn-role"
})
}
# Step Functions用ポリシー(最小権限)
# Lambda実行、Glue Crawler起動、SNS通知のみ許可
resource "aws_iam_policy" "sfn" {
name = "${var.project}-sfn-policy"
description = "Step Functions用ポリシー"
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "InvokeLambda"
Effect = "Allow"
Action = ["lambda:InvokeFunction"]
Resource = [aws_lambda_function.etl.arn]
},
{
Sid = "StartGlueCrawler"
Effect = "Allow"
Action = ["glue:StartCrawler", "glue:GetCrawler"]
Resource = ["arn:aws:glue:${var.aws_region}:*:crawler/${aws_glue_crawler.main.name}"]
},
{
Sid = "PublishSNS"
Effect = "Allow"
Action = ["sns:Publish"]
Resource = [aws_sns_topic.alert.arn]
}
]
})
tags = local.common_tags
}
# Step Functionsポリシーをアタッチ
resource "aws_iam_role_policy_attachment" "sfn" {
role = aws_iam_role.sfn.name
policy_arn = aws_iam_policy.sfn.arn
}
4-3. 最小権限の原則
IAMポリシーは 最小権限の原則 に従って設計します。
なぜ最小権限が重要か
| リスク | 説明 |
|---|---|
| 誤操作 | 権限が広いと、意図しないリソースを削除・変更する可能性 |
| セキュリティ侵害 | 攻撃者がアクセス権を取得した場合の被害範囲が拡大 |
| コンプライアンス | 監査要件を満たせない可能性 |
ベストプラクティス
| プラクティス | 説明 | 例 |
|---|---|---|
| リソースを限定 |
* を避けて具体的なARNを指定 |
特定のS3バケットだけ |
| アクションを限定 | 必要なアクションだけ許可 |
s3:GetObject だけ |
| 条件を追加 | 特定条件でのみ許可 | 特定のIPからのみ |
ワイルドカード(*)の危険性
# ❌ 危険:全リソースへのアクセス
Resource = "*"
# ✅ 安全:特定リソースのみ
Resource = "arn:aws:glue:ap-northeast-1:*:crawler/dp-crawler"
| パターン | リスク | 推奨度 |
|---|---|---|
"*" |
全リソースにアクセス可能 | ❌ 避ける |
arn:aws:glue:*:*:crawler/* |
全Crawler | ⚠️ 注意 |
arn:aws:glue:ap-northeast-1:*:crawler/dp-* |
dpで始まるCrawler | △ 許容 |
arn:aws:glue:...:crawler/${name} |
特定のCrawlerのみ | ✅ 推奨 |
4-4. 権限の設計
| 権限 | 対象 | リソース指定 |
|---|---|---|
| InvokeFunction | Lambda |
aws_lambda_function.etl.arn(特定のLambdaのみ) |
| StartCrawler | Glue |
crawler/${aws_glue_crawler.main.name}(特定のCrawlerのみ) |
| Publish | SNS |
aws_sns_topic.alert.arn(特定のトピックのみ) |
4-5. Step Functions(stepfunctions.tf)
コードの骨格:
resource "aws_sfn_state_machine" "pipeline" {
name = "${var.project}-pipeline"
role_arn = aws_iam_role.sfn.arn
definition = jsonencode({
StartAt = "ETL"
States = {
ETL = { Type = "Task", Resource = "...", Next = "StartCrawler" }
StartCrawler = { Type = "Task", Resource = "...", Next = "Success" }
Success = { Type = "Succeed" }
NotifyFailure = { Type = "Task", ... } # エラー時SNS通知
Fail = { Type = "Fail" }
}
})
}
フルコード:
# stepfunctions.tf
# ================================
# Step Functions
# ================================
# State Machine(パイプライン)
# Lambda ETL → Glue Crawler の順序でワークフローを実行
resource "aws_sfn_state_machine" "pipeline" {
name = "${var.project}-pipeline"
role_arn = aws_iam_role.sfn.arn
# ワークフロー定義(ASL: Amazon States Language)
definition = jsonencode({
Comment = "Data Pipeline Workflow"
StartAt = "ETL"
States = {
# ETL処理(Lambda実行)
ETL = {
Type = "Task"
Resource = aws_lambda_function.etl.arn
Next = "StartCrawler"
Catch = [{
ErrorEquals = ["States.ALL"]
Next = "NotifyFailure"
}]
}
# Glue Crawler起動(同期実行)
StartCrawler = {
Type = "Task"
Resource = "arn:aws:states:::glue:startCrawler.sync"
Parameters = {
Name = aws_glue_crawler.main.name
}
Next = "Success"
Catch = [{
ErrorEquals = ["States.ALL"]
Next = "NotifyFailure"
}]
}
# 成功終了
Success = {
Type = "Succeed"
}
# 失敗通知(SNS)
NotifyFailure = {
Type = "Task"
Resource = "arn:aws:states:::sns:publish"
Parameters = {
TopicArn = aws_sns_topic.alert.arn
"Message.$" = "States.Format('Pipeline failed: {}', $.Error)"
}
Next = "Fail"
}
# 失敗終了
Fail = {
Type = "Fail"
}
}
})
tags = merge(local.common_tags, {
Name = "${var.project}-pipeline"
})
}
4-6. コード解説
| 設定 | 意味 | 料理で例えると |
|---|---|---|
StartAt |
最初のステート | 最初の工程「ETL」 |
Resource |
実行するサービス | 指示を出す相手 |
Parameters |
渡すパラメータ | 注文内容 |
ResultPath |
結果の格納先 | 完了報告の記録場所 |
Next |
次のステート | 次の工程 |
End |
終了フラグ | 最後の工程 |
4-7. パラメータの流れ
4-8. $ と .$ の違い
| 記法 | 意味 | 例 |
|---|---|---|
$ |
入力データ全体 |
$ = 全データ |
$.key |
特定のキー |
$.input_key = input_key の値 |
"key.$" |
動的に値を設定 | 入力から値を取得して設定 |
// 入力
{ "input_key": "input/ec-sales.json" }
// Parameters での使用
{
"Payload": {
"input_key.$": "$.input_key" // → "input/ec-sales.json" が入る
}
}
5. 動作確認
5-1. 計画(プレビュー)
cd pipeiac02/tf
terraform plan
期待される出力:
Plan: 3 to add, 0 to change, 0 to destroy.
追加されるリソース:
- IAMロール(Step Functions用)
- IAMロールポリシー
- Step Functions ステートマシン
5-2. 適用(作成)
terraform apply
Enter a value: が出たら yes を入力。
期待される出力:
Apply complete! Resources: 3 added, 0 changed, 0 destroyed.
5-3. ステートマシンの確認
aws stepfunctions list-state-machines --query 'stateMachines[?contains(name, `dp`)].{Name:name,Arn:stateMachineArn}'
期待される出力:
[
{
"Name": "dp-pipeline",
"Arn": "arn:aws:states:ap-northeast-1:xxxx:stateMachine:dp-pipeline"
}
]
5-4. テストデータの準備
リポジトリに含まれている test-data/ec-sales-05.json を使用します。
# S3にアップロード
aws s3 cp test-data/ec-sales-05.json s3://dp-raw-$(terraform output -raw bucket_suffix)/input/ec-sales.json
5-5. ステートマシン実行
# ARNを取得
SFN_ARN=$(aws stepfunctions list-state-machines --query 'stateMachines[?contains(name, `dp-pipeline`)].stateMachineArn' --output text)
# 実行
aws stepfunctions start-execution \
--state-machine-arn $SFN_ARN \
--input '{"input_key": "input/ec-sales.json"}'
期待される出力:
{
"executionArn": "arn:aws:states:ap-northeast-1:xxxx:execution:dp-pipeline:xxxx",
"startDate": "2026-01-15T12:00:00.000000+09:00"
}
5-6. 実行状態の確認
# 実行ARNを取得
EXEC_ARN=$(aws stepfunctions list-executions --state-machine-arn $SFN_ARN --query 'executions[0].executionArn' --output text)
# 状態を確認
aws stepfunctions describe-execution --execution-arn $EXEC_ARN --query '{Status:status,StartDate:startDate,StopDate:stopDate}'
期待される出力:
{
"Status": "SUCCEEDED",
"StartDate": "2026-01-15T12:00:00.000000+09:00",
"StopDate": "2026-01-15T12:00:30.000000+09:00"
}
5-7. AWSコンソールで可視化を確認
- Step Functions → ステートマシン →
dp-pipeline - 実行履歴から実行をクリック
- グラフビューで各ステートの状態を確認
5-8. 出力ファイルの確認
aws s3 ls s3://dp-processed-$(terraform output -raw bucket_suffix)/processed/ --recursive
5-9. 確認チェックリスト
| 確認項目 | 期待値 |
|---|---|
| ステートマシン |
dp-pipeline が存在 |
| 実行ステータス | SUCCEEDED |
| ETLステート | 成功(Lambda実行) |
| Crawlerステート | 成功(Crawler起動) |
| 出力ファイル | Parquetが生成されている |
6. まとめ
6-1. この記事でやったこと
| 項目 | 内容 |
|---|---|
| 設計 | ワークフローの流れを理解 |
| 実装 | TerraformでStep Functionsを作成 |
| 確認 | パイプライン全体が動くことを確認 |
6-2. 比喩の振り返り
| レストラン | AWS | 今回やったこと |
|---|---|---|
| 冷蔵庫 | S3 Raw | ✅ 作成済み |
| 料理人 | Lambda | ✅ 作成済み |
| 盛り付け台 | S3 Processed | ✅ 作成済み |
| メニュー係 | Glue Crawler | ✅ 作成済み |
| メニュー表 | Glue Database | ✅ 作成済み |
| 調理長 | Step Functions | ✅ 今回作成 |
6-3. 作成したリソース
6-4. 現在のパイプライン
6-5. 次回予告
第6回: EventBridge では、自動トリガーを実装していきます。
- S3アップロードを検知
- Step Functionsを自動起動
- イベントルールの設計
レストランで言うと「注文検知システム」を作っていきます。