📁 完全なコードはGitHubで公開:GitHub: pipeiac02
シリーズ一覧(全12回)
Phase 1: 基盤構築
Phase 2: ワークフロー
Phase 3: セキュリティ・運用
Phase 4: 開発効率化
1. はじめに
1-1. 今回のゴール
EventBridgeを作成して、S3アップロードでパイプラインを自動起動します。
| ゴール | 内容 |
|---|---|
| 設計 | イベント駆動の仕組みを理解する |
| 実装 | TerraformでEventBridgeを作成する |
| 確認 | S3アップロードで自動起動することを確認する |
1-2. 前回の振り返り
前回(第5回)では、Step Functionsを作成しました。
1-3. 現在の課題
パイプラインは完成しましたが、まだ 手動で実行 しています。
これを S3アップロードで自動起動 したい!
1-4. この記事で作るもの
2. 比喩で理解する
2-1. EventBridgeを「ホールスタッフ」で考える
お客さんが注文したら、誰かが厨房に伝えないと料理は始まりません。
2-2. 比喩の図解(レストラン)
2-3. ホールスタッフの仕事
| 手順 | 内容 |
|---|---|
| 1. 注文を受ける | お客さんが「パスタ1つ」 |
| 2. 内容を確認 | パスタの注文だな |
| 3. 厨房に伝える | 調理長に伝票を渡す |
2-4. なぜホールスタッフが必要?
2-5. 技術の図解(AWS)
2-6. 対応関係
| レストラン | AWS | 役割 |
|---|---|---|
| お客さんの注文 | S3アップロード | イベント発生 |
| ホールスタッフ | EventBridge | イベント検知・転送 |
| 伝票 | イベントデータ | 注文内容(ファイル名など) |
| 調理長 | Step Functions | 処理の開始 |
2-7. イベント駆動とは?
| 方式 | 例え | 特徴 |
|---|---|---|
| ポーリング | 5分ごとに注文確認 | 遅延あり、無駄な確認 |
| イベント駆動 | 注文があれば即対応 | 即時、効率的 |
3. EventBridgeの役割
3-1. なぜEventBridgeが必要か
S3から直接Step Functionsを呼び出すこともできますが、EventBridgeを挟むメリットがあります。
3-2. EventBridgeのメリット
| メリット | 説明 | 料理で例えると |
|---|---|---|
| 一元管理 | ルールを1箇所で管理 | 注文管理システム |
| フィルタリング | 特定の条件だけ処理 | パスタの注文だけ厨房Aへ |
| 複数ターゲット | 1イベントで複数起動 | 注文で厨房と会計に同時通知 |
| 履歴 | イベント履歴を記録 | 注文履歴 |
3-3. イベントの流れ
3-4. イベントルールの設計
今回のルール:
| 条件 | 値 | 理由 |
|---|---|---|
| バケット | dp-raw-* |
Rawバケットのみ対象 |
| プレフィックス | input/ |
入力フォルダのみ対象 |
| サフィックス | .json |
JSONファイルのみ対象 |
3-5. S3イベント通知の有効化
EventBridgeでS3イベントを受け取るには、S3側で通知を有効化する必要があります。
4. 実装
4-1. ファイル構成
今回作成・修正するファイル:
pipeiac02/
├── test-data/
│ └── ec-sales-06.json # ★テスト用サンプルデータ
└── tf/
├── s3.tf # ★EventBridge通知を追加
├── eventbridge.tf # ★今回作成
└── iam.tf # ★EventBridge用ロール追加
4-2. EventBridge用IAM(iam.tf に追加)
EventBridgeがStep Functionsを起動するための権限を設定します。
# iam.tf に追加
# ================================
# EventBridge用IAM
# ================================
# EventBridge用IAMロール
# EventBridgeがStep Functionsを起動するための権限を付与
resource "aws_iam_role" "eventbridge" {
name = "${var.project}-eventbridge-role"
# EventBridge用の信頼ポリシー
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = { Service = "events.amazonaws.com" }
}]
})
tags = merge(local.common_tags, {
Name = "${var.project}-eventbridge-role"
})
}
# EventBridge用ポリシー(最小権限)
# 特定のStep Functions State Machineの実行開始のみ許可
resource "aws_iam_policy" "eventbridge" {
name = "${var.project}-eventbridge-policy"
description = "EventBridge用ポリシー"
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "StartStepFunctions"
Effect = "Allow"
Action = ["states:StartExecution"]
Resource = [aws_sfn_state_machine.pipeline.arn]
}
]
})
tags = local.common_tags
}
# EventBridgeポリシーをアタッチ
resource "aws_iam_role_policy_attachment" "eventbridge" {
role = aws_iam_role.eventbridge.name
policy_arn = aws_iam_policy.eventbridge.arn
}
4-3. EventBridgeルール(eventbridge.tf)
# eventbridge.tf
# ================================
# EventBridge(イベント駆動)
# ================================
# S3バケットのEventBridge通知を有効化
# S3へのアップロードをEventBridgeに通知する設定
resource "aws_s3_bucket_notification" "raw" {
bucket = aws_s3_bucket.raw.id
eventbridge = true
}
# EventBridge Rule
# S3へのファイルアップロードを検知するルール
resource "aws_cloudwatch_event_rule" "s3_upload" {
name = "${var.project}-s3-upload"
description = "S3 Rawバケットへのファイルアップロードを検知"
# イベントパターン:input/配下へのオブジェクト作成を検知
event_pattern = jsonencode({
source = ["aws.s3"]
detail-type = ["Object Created"]
detail = {
bucket = { name = [aws_s3_bucket.raw.id] }
object = { key = [{ prefix = "input/" }] }
}
})
tags = merge(local.common_tags, {
Name = "${var.project}-s3-upload"
})
}
# EventBridge Target
# 検知したイベントでStep Functionsを起動
resource "aws_cloudwatch_event_target" "sfn" {
rule = aws_cloudwatch_event_rule.s3_upload.name
arn = aws_sfn_state_machine.pipeline.arn
role_arn = aws_iam_role.eventbridge.arn
# 入力変換:S3イベントからinput_keyを抽出してStep Functionsに渡す
input_transformer {
input_paths = {
s3_key = "$.detail.object.key"
}
input_template = "{\"input_key\": <s3_key>}"
}
}
4-4. コード解説
| 設定 | 意味 | 料理で例えると |
|---|---|---|
event_pattern |
検知するイベント条件 | 「パスタの注文だけ」 |
source |
イベント発生元 | 注文元(S3) |
detail-type |
イベント種類 | 新規注文(Object Created) |
detail.bucket |
対象バケット | 対象テーブル |
detail.object.key |
対象ファイルパス | input/ フォルダのみ |
4-5. input_transformer の解説
S3イベントから必要な情報を抽出してStep Functionsに渡します。
| 設定 | 意味 |
|---|---|
input_paths |
イベントから抽出するパス |
input_template |
出力のテンプレート |
5. 動作確認
5-1. 計画(プレビュー)
cd pipeiac02/tf
terraform plan
期待される出力:
Plan: 4 to add, 0 to change, 0 to destroy.
追加されるリソース:
- S3バケット通知設定
- IAMロール(EventBridge用)
- IAMロールポリシー
- EventBridgeルール
- EventBridgeターゲット
5-2. 適用(作成)
terraform apply
Enter a value: が出たら yes を入力。
期待される出力:
Apply complete! Resources: 4 added, 0 changed, 0 destroyed.
5-3. EventBridgeルールの確認
aws events list-rules --query 'Rules[?contains(Name, `dp`)].{Name:Name,State:State}'
期待される出力:
[
{
"Name": "dp-s3-trigger",
"State": "ENABLED"
}
]
5-4. 自動起動テスト
リポジトリに含まれている test-data/ec-sales-06.json を使用します。
テストデータをS3にアップロード:
# S3にアップロード(これでパイプラインが自動起動!)
aws s3 cp test-data/ec-sales-06.json s3://dp-raw-$(terraform output -raw bucket_suffix)/input/
5-5. 実行状態の確認
# 少し待ってから確認(5〜10秒)
SFN_ARN=$(aws stepfunctions list-state-machines --query 'stateMachines[?contains(name, `dp-pipeline`)].stateMachineArn' --output text)
aws stepfunctions list-executions --state-machine-arn $SFN_ARN --max-results 1 --query 'executions[0].{Status:status,StartDate:startDate}'
期待される出力:
{
"Status": "SUCCEEDED",
"StartDate": "2025-01-15T12:00:00.000000+09:00"
}
5-6. 完全自動化の確認
すべて 手動操作なし で完了!
5-7. 確認チェックリスト
| 確認項目 | 期待値 |
|---|---|
| EventBridgeルール |
dp-s3-trigger が存在 |
| ルール状態 | ENABLED |
| S3アップロード | 自動でパイプライン起動 |
| 実行ステータス | SUCCEEDED |
| 出力ファイル | Parquetが生成されている |
6. まとめ
6-1. この記事でやったこと
| 項目 | 内容 |
|---|---|
| 設計 | イベント駆動の仕組みを理解 |
| 実装 | TerraformでEventBridgeを作成 |
| 確認 | S3アップロードで自動起動を確認 |
6-2. 比喩の振り返り
| レストラン | AWS | 今回やったこと |
|---|---|---|
| 冷蔵庫 | S3 Raw | ✅ 作成済み |
| ホールスタッフ | EventBridge | ✅ 今回作成 |
| 調理長 | Step Functions | ✅ 作成済み |
| 料理人 | Lambda | ✅ 作成済み |
| 盛り付け台 | S3 Processed | ✅ 作成済み |
| メニュー係 | Glue Crawler | ✅ 作成済み |
| メニュー表 | Glue Database | ✅ 作成済み |
6-3. 完成したパイプライン
6-4. 基本パイプライン完成!
ここまでで、データパイプラインの基本機能が完成しました。
| 機能 | 状態 |
|---|---|
| データ取り込み | ✅ S3 Raw |
| 自動トリガー | ✅ EventBridge |
| ワークフロー管理 | ✅ Step Functions |
| データ変換 | ✅ Lambda ETL |
| メタデータ管理 | ✅ Glue Crawler |
| 分析 | ✅ Athena |
6-5. 次回予告
第7回: IAM最小権限 では、セキュリティを強化していきます。
- 現在の権限の見直し
- 最小権限の原則
- ポリシーの分離
レストランで言うと「スタッフ権限の整理」をしていきます。