0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【AWS×Terraform ⑤】データパイプライン構築シリーズ第5回|Step Functions

Last updated at Posted at 2026-01-23

📁 完全なコードはGitHubで公開GitHub: pipeiac02

シリーズ一覧(全12回)

Phase 1: 基盤構築

Phase 2: ワークフロー

Phase 3: セキュリティ・運用

Phase 4: 開発効率化


1. はじめに

1-1. 今回のゴール

Step Functionsを作成して、Lambda → Crawler の順序制御を実装します。

ゴール 内容
設計 ワークフローの流れを理解する
実装 TerraformでStep Functionsを作成する
確認 パイプライン全体が動くことを確認する

1-2. 前回の振り返り

前回(第4回)では、Glue Crawlerを作成しました。

1-3. 自動実行

各コンポーネントを 自動で順番に実行 できるようにしていきます。

1-4. この記事で作るもの


2. 比喩で理解する

2-1. Step Functionsを「調理長」で考える

料理人やメニュー係がいても、誰かが指示を出さないとバラバラに動いてしまいます。

2-2. 比喩の図解(レストラン)

2-3. 調理長の仕事

手順 内容
1. 注文を受ける 「パスタ1つ」
2. 料理人に指示 「パスタを作って」
3. 完了を確認 「できました」
4. メニュー係に指示 「メニュー表を更新して」
5. 完了を確認 「更新しました」
6. 報告 「全工程完了」

2-4. なぜ調理長が必要?

2-5. 技術の図解(AWS)

2-6. 対応関係

レストラン AWS 役割
調理長 Step Functions 工程管理・順序制御
調理指示 Lambda Task Lambdaを実行
メニュー更新指示 Glue Task Crawlerを実行
工程表 ステートマシン定義 実行順序の定義
完了報告 ステータス 成功/失敗の確認

2-7. ステートマシンとは?

用語 料理で例えると
ステート 1つの工程(調理、盛り付け)
トランジション 次の工程への移行
ステートマシン 工程表全体

3. Step Functionsの役割

3-1. なぜStep Functionsが必要か

個別にサービスを呼び出すと、管理が大変になります。

3-2. Step Functionsのメリット

メリット 説明 料理で例えると
順序制御 タスクを順番に実行 工程表通りに進める
可視化 実行状況をGUIで確認 調理長が全体を見渡せる
エラー処理 失敗時のリトライや分岐 失敗したら作り直し
ログ 実行履歴を記録 作業日報

3-3. 今回のワークフロー

3-4. ステートの種類

ステート 用途 今回の使用
Task サービスを実行 Lambda, Crawler
Pass データを渡すだけ -
Wait 待機 -
Choice 条件分岐 -
Parallel 並列実行 -
Succeed 成功終了
Fail 失敗終了 -

3-5. ASL(Amazon States Language)とは

Step Functionsの定義はJSONで記述します。

{
  "StartAt": "ETL",
  "States": {
    "ETL": {
      "Type": "Task",
      "Next": "Crawl"
    },
    "Crawl": {
      "Type": "Task",
      "End": true
    }
  }
}
要素 意味 料理で例えると
StartAt 最初のステート 最初の工程
States ステートの一覧 全工程のリスト
Type ステートの種類 工程の種類(調理、確認)
Next 次のステート 次の工程
End 終了フラグ 最後の工程

4. 実装

4-1. ファイル構成

今回作成するファイル:

pipeiac02/
├── test-data/
│   └── ec-sales-05.json  # ★テスト用サンプルデータ
└── tf/
    ├── stepfunctions.tf  # ★今回作成
    └── iam.tf            # ★Step Functions用ロール追加

4-2. StepFunctions用IAM(iam.tf に追加)

Step FunctionsがLambdaとGlueを実行するための権限を設定します。

コードの骨格:

# IAMロール(信頼ポリシー)
resource "aws_iam_role" "sfn" {
  name               = "${var.project}-sfn-role"
  assume_role_policy = jsonencode({...})  # states.amazonaws.com を許可
}

# IAMポリシー(権限ポリシー)
resource "aws_iam_policy" "sfn" {
  name   = "${var.project}-sfn-policy"
  policy = jsonencode({
    Statement = [
      { Action = ["lambda:InvokeFunction"], Resource = [...] },  # Lambda実行
      { Action = ["glue:StartCrawler", "glue:GetCrawler"], Resource = [...] },  # Crawler起動
      { Action = ["sns:Publish"], Resource = [...] }  # SNS通知
    ]
  })
}

# ポリシーをロールにアタッチ
resource "aws_iam_role_policy_attachment" "sfn" {...}

フルコード:

# iam.tf に追加

# ================================
# Step Functions用IAM
# ================================

# Step Functions用IAMロール
# Step FunctionsがLambda、Glue、SNSを操作するための権限を付与
resource "aws_iam_role" "sfn" {
  name = "${var.project}-sfn-role"

  # Step Functions用の信頼ポリシー
  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Action = "sts:AssumeRole"
      Effect = "Allow"
      Principal = { Service = "states.amazonaws.com" }
    }]
  })

  tags = merge(local.common_tags, {
    Name = "${var.project}-sfn-role"
  })
}

# Step Functions用ポリシー(最小権限)
# Lambda実行、Glue Crawler起動、SNS通知のみ許可
resource "aws_iam_policy" "sfn" {
  name        = "${var.project}-sfn-policy"
  description = "Step Functions用ポリシー"

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid      = "InvokeLambda"
        Effect   = "Allow"
        Action   = ["lambda:InvokeFunction"]
        Resource = [aws_lambda_function.etl.arn]
      },
      {
        Sid      = "StartGlueCrawler"
        Effect   = "Allow"
        Action   = ["glue:StartCrawler", "glue:GetCrawler"]
        Resource = ["arn:aws:glue:${var.aws_region}:*:crawler/${aws_glue_crawler.main.name}"]
      },
      {
        Sid      = "PublishSNS"
        Effect   = "Allow"
        Action   = ["sns:Publish"]
        Resource = [aws_sns_topic.alert.arn]
      }
    ]
  })

  tags = local.common_tags
}

# Step Functionsポリシーをアタッチ
resource "aws_iam_role_policy_attachment" "sfn" {
  role       = aws_iam_role.sfn.name
  policy_arn = aws_iam_policy.sfn.arn
}

4-3. 最小権限の原則

IAMポリシーは 最小権限の原則 に従って設計します。

なぜ最小権限が重要か

リスク 説明
誤操作 権限が広いと、意図しないリソースを削除・変更する可能性
セキュリティ侵害 攻撃者がアクセス権を取得した場合の被害範囲が拡大
コンプライアンス 監査要件を満たせない可能性

ベストプラクティス

プラクティス 説明
リソースを限定 * を避けて具体的なARNを指定 特定のS3バケットだけ
アクションを限定 必要なアクションだけ許可 s3:GetObject だけ
条件を追加 特定条件でのみ許可 特定のIPからのみ

ワイルドカード(*)の危険性

# ❌ 危険:全リソースへのアクセス
Resource = "*"

# ✅ 安全:特定リソースのみ
Resource = "arn:aws:glue:ap-northeast-1:*:crawler/dp-crawler"
パターン リスク 推奨度
"*" 全リソースにアクセス可能 ❌ 避ける
arn:aws:glue:*:*:crawler/* 全Crawler ⚠️ 注意
arn:aws:glue:ap-northeast-1:*:crawler/dp-* dpで始まるCrawler △ 許容
arn:aws:glue:...:crawler/${name} 特定のCrawlerのみ ✅ 推奨

4-4. 権限の設計

権限 対象 リソース指定
InvokeFunction Lambda aws_lambda_function.etl.arn(特定のLambdaのみ)
StartCrawler Glue crawler/${aws_glue_crawler.main.name}(特定のCrawlerのみ)
Publish SNS aws_sns_topic.alert.arn(特定のトピックのみ)

4-5. Step Functions(stepfunctions.tf)

コードの骨格:

resource "aws_sfn_state_machine" "pipeline" {
  name     = "${var.project}-pipeline"
  role_arn = aws_iam_role.sfn.arn

  definition = jsonencode({
    StartAt = "ETL"
    States = {
      ETL          = { Type = "Task", Resource = "...", Next = "StartCrawler" }
      StartCrawler = { Type = "Task", Resource = "...", Next = "Success" }
      Success      = { Type = "Succeed" }
      NotifyFailure = { Type = "Task", ... }  # エラー時SNS通知
      Fail         = { Type = "Fail" }
    }
  })
}

フルコード:

# stepfunctions.tf

# ================================
# Step Functions
# ================================

# State Machine(パイプライン)
# Lambda ETL → Glue Crawler の順序でワークフローを実行
resource "aws_sfn_state_machine" "pipeline" {
  name     = "${var.project}-pipeline"
  role_arn = aws_iam_role.sfn.arn

  # ワークフロー定義(ASL: Amazon States Language)
  definition = jsonencode({
    Comment = "Data Pipeline Workflow"
    StartAt = "ETL"
    States = {
      # ETL処理(Lambda実行)
      ETL = {
        Type     = "Task"
        Resource = aws_lambda_function.etl.arn
        Next     = "StartCrawler"
        Catch = [{
          ErrorEquals = ["States.ALL"]
          Next        = "NotifyFailure"
        }]
      }
      # Glue Crawler起動(同期実行)
      StartCrawler = {
        Type     = "Task"
        Resource = "arn:aws:states:::glue:startCrawler.sync"
        Parameters = {
          Name = aws_glue_crawler.main.name
        }
        Next = "Success"
        Catch = [{
          ErrorEquals = ["States.ALL"]
          Next        = "NotifyFailure"
        }]
      }
      # 成功終了
      Success = {
        Type = "Succeed"
      }
      # 失敗通知(SNS)
      NotifyFailure = {
        Type     = "Task"
        Resource = "arn:aws:states:::sns:publish"
        Parameters = {
          TopicArn    = aws_sns_topic.alert.arn
          "Message.$" = "States.Format('Pipeline failed: {}', $.Error)"
        }
        Next = "Fail"
      }
      # 失敗終了
      Fail = {
        Type = "Fail"
      }
    }
  })

  tags = merge(local.common_tags, {
    Name = "${var.project}-pipeline"
  })
}

4-6. コード解説

設定 意味 料理で例えると
StartAt 最初のステート 最初の工程「ETL」
Resource 実行するサービス 指示を出す相手
Parameters 渡すパラメータ 注文内容
ResultPath 結果の格納先 完了報告の記録場所
Next 次のステート 次の工程
End 終了フラグ 最後の工程

4-7. パラメータの流れ

4-8. $.$ の違い

記法 意味
$ 入力データ全体 $ = 全データ
$.key 特定のキー $.input_key = input_key の値
"key.$" 動的に値を設定 入力から値を取得して設定
// 入力
{ "input_key": "input/ec-sales.json" }

// Parameters での使用
{
  "Payload": {
    "input_key.$": "$.input_key"  //  "input/ec-sales.json" が入る
  }
}

5. 動作確認

5-1. 計画(プレビュー)

cd pipeiac02/tf
terraform plan

期待される出力:

Plan: 3 to add, 0 to change, 0 to destroy.

追加されるリソース:

  • IAMロール(Step Functions用)
  • IAMロールポリシー
  • Step Functions ステートマシン

5-2. 適用(作成)

terraform apply

Enter a value: が出たら yes を入力。

期待される出力:

Apply complete! Resources: 3 added, 0 changed, 0 destroyed.

5-3. ステートマシンの確認

aws stepfunctions list-state-machines --query 'stateMachines[?contains(name, `dp`)].{Name:name,Arn:stateMachineArn}'

期待される出力:

[
    {
        "Name": "dp-pipeline",
        "Arn": "arn:aws:states:ap-northeast-1:xxxx:stateMachine:dp-pipeline"
    }
]

5-4. テストデータの準備

リポジトリに含まれている test-data/ec-sales-05.json を使用します。

# S3にアップロード
aws s3 cp test-data/ec-sales-05.json s3://dp-raw-$(terraform output -raw bucket_suffix)/input/ec-sales.json

5-5. ステートマシン実行

# ARNを取得
SFN_ARN=$(aws stepfunctions list-state-machines --query 'stateMachines[?contains(name, `dp-pipeline`)].stateMachineArn' --output text)

# 実行
aws stepfunctions start-execution \
  --state-machine-arn $SFN_ARN \
  --input '{"input_key": "input/ec-sales.json"}'

期待される出力:

{
    "executionArn": "arn:aws:states:ap-northeast-1:xxxx:execution:dp-pipeline:xxxx",
    "startDate": "2026-01-15T12:00:00.000000+09:00"
}

5-6. 実行状態の確認

# 実行ARNを取得
EXEC_ARN=$(aws stepfunctions list-executions --state-machine-arn $SFN_ARN --query 'executions[0].executionArn' --output text)

# 状態を確認
aws stepfunctions describe-execution --execution-arn $EXEC_ARN --query '{Status:status,StartDate:startDate,StopDate:stopDate}'

期待される出力:

{
    "Status": "SUCCEEDED",
    "StartDate": "2026-01-15T12:00:00.000000+09:00",
    "StopDate": "2026-01-15T12:00:30.000000+09:00"
}

5-7. AWSコンソールで可視化を確認

  1. Step Functions → ステートマシン → dp-pipeline
  2. 実行履歴から実行をクリック
  3. グラフビューで各ステートの状態を確認

5-8. 出力ファイルの確認

aws s3 ls s3://dp-processed-$(terraform output -raw bucket_suffix)/processed/ --recursive

5-9. 確認チェックリスト

確認項目 期待値
ステートマシン dp-pipeline が存在
実行ステータス SUCCEEDED
ETLステート 成功(Lambda実行)
Crawlerステート 成功(Crawler起動)
出力ファイル Parquetが生成されている

6. まとめ

6-1. この記事でやったこと

項目 内容
設計 ワークフローの流れを理解
実装 TerraformでStep Functionsを作成
確認 パイプライン全体が動くことを確認

6-2. 比喩の振り返り

レストラン AWS 今回やったこと
冷蔵庫 S3 Raw ✅ 作成済み
料理人 Lambda ✅ 作成済み
盛り付け台 S3 Processed ✅ 作成済み
メニュー係 Glue Crawler ✅ 作成済み
メニュー表 Glue Database ✅ 作成済み
調理長 Step Functions ✅ 今回作成

6-3. 作成したリソース

6-4. 現在のパイプライン

6-5. 次回予告

第6回: EventBridge では、自動トリガーを実装していきます。

  • S3アップロードを検知
  • Step Functionsを自動起動
  • イベントルールの設計

レストランで言うと「注文検知システム」を作っていきます。


0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?