0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【第3弾】S3 Tables × Icebergレイクハウス ── Redshift Serverless × Iceberg × Glue Data QualityをTerraformで構築

0
Last updated at Posted at 2026-05-08

はじめに

本記事はS3 Tables × Icebergシリーズの第3弾(完結編)です。

# テーマ 記事
1 S3 Tables + Apache Icebergのテーブルを構築、Athenaからのクエリ、Glue JobでのETLを実施 前々回
2 EventBridge + Step Functionsで自動ETLパイプラインを構築 前回
3 Redshift Serverlessでレイクハウスクエリ + Glue Data Qualityで品質検査 本記事

第1弾でデータの入れ物を作り、第2弾でデータを自動で流す仕組みを構築しました。

しかし、本番運用ではまだ2つの課題が残っています。

課題1: クエリ性能と分析の柔軟性
Athenaは手軽ですが、大量データの結合やウィンドウ関数を多用する複雑な分析には限界があります。Redshift Serverlessから同じIcebergテーブルを直接クエリできれば、データのコピーなしで高速な分析基盤が手に入ります。

課題2: データ品質の担保
現在のパイプラインはファイルが来たら投入するだけです。中身が不正なCSVや型が壊れたデータもそのままIcebergテーブルに入ってしまいます。投入前に品質チェックを入れ、不合格データはブロックする仕組みが必要です。

本記事では、この2つをTerraformで構築し、レイクハウスを完成させます。

この記事でやること

  1. Redshift ServerlessをTerraformでプロビジョニング
  2. RedshiftからS3 TablesのIcebergテーブルを直接クエリ(External Database経由)
  3. RedshiftからIcebergテーブルへの書き込み(INSERT / CTAS)
  4. Glue Job内にGlue Data Quality(DQDL)の品質チェックを組み込み
  5. 品質チェック不合格時の自動ブロック + SNS通知
  6. 全リソースをTerraformでIaC化

用語・技術をざっくり理解する

記事全体で登場する用語を、先にまとめて解説します。

Redshift 関連

用語 解説
Amazon Redshift AWSのデータウェアハウス。大量データに対する複雑なSQL分析を高速に処理するための専用エンジン。列指向ストレージ・大規模並列処理(MPP)が特徴
Redshift Serverless サーバー管理不要のRedshift。ノード数やクラスタ構成を気にせず使える。使った分だけ課金(RPU 単位)。
RPU Redshiftの処理能力の単位。Redshift Processing Unit。ServerlessではベースキャパシティをRPUで指定する(最小 4 RPU、1 RPU = 16 GB メモリ)。
Namespace テーブル・スキーマ・ユーザー・暗号化キー等を管理する論理グループ
Workgroup RPU 数・VPC・セキュリティグループ等を管理する論理グループ。Namespaceと1:1で紐づく。
External Database 外部カタログへの参照。Redshiftの外にあるGlue Data CatalogのデータベースをRedshiftから直接参照する仕組み

データ品質関連

用語 解説
AWS Glue Data Quality データ品質チェックサービス。DQDLでルールを定義し、データが条件を満たすか自動評価する。Glue ETLジョブ内で実行可能。
DQDL Data Quality Definition Language。AWS 独自のデータ品質ルール定義言語。IsComplete, IsUnique, ColumnValues 等のルールを記述する
品質ゲート 品質チェックに不合格のデータを下流に流さない仕組み
品質スコア ルール通過率。定義したルールのうち、何%がパスしたかの指標(例: 8/10 ルール通過 = 80%)

前提条件

項目 要件
第1弾記事 S3 Tables + Icebergテーブルが構築済み
第2弾記事 EventBridge + Step Functionsパイプラインが構築済み
リージョン ap-northeast-1(東京)
Terraform >= 1.5.0
AWS Provider >= 5.82.0

アーキテクチャ(完成版)

Glue JobにGlue Data Qualityによるデータ品質チェックを組み込みます。合格ならS3TablesのIcebergテーブルへINSERTを行い、不合格ならS3の隔離フォルダに対象の生データを移行します。また、Redshift ServerlessでのIcebergテーブルに対するクエリを実行します。

image.png

Terraformディレクトリ構成

前回記事のディレクトリ構成に今記事用のリソースを追加しました。

s3-tables-iceberg-lab/
├── main.tf          ← モジュールの呼び出し
├── providers.tf     ← プロバイダー情報
├── variables.tf     ← 変数
└── modules/         ← 各モジュールを格納
    ├── s3_tables/   ← S3テーブル関連
    │   ├── main.tf
    │   ├── variables.tf
    │   └── outputs.tf
    ├── s3_landing/  ← 生データ格納S3関連
    │   ├── main.tf
    │   ├── variables.tf
    │   └── outputs.tf
    ├── iam/         ← IAM権限関連
    │   ├── main.tf
    │   ├── variables.tf
    │   └── outputs.tf
    ├── glue/        ← glue関連
    │   ├── main.tf
    │   ├── variables.tf
    │   ├── outputs.tf
    │   └── scripts/
    ├── step_functions/ ← step functions関連
    │   ├── main.tf
    │   ├── variables.tf
    │   └── outputs.tf
    └── redshift/ ← redshift関連
        ├── main.tf
        └── variables.tf

Part A: Redshift Serverless × Iceberg

Step 1:Redshift ServerlessをTerraformでプロビジョニング

modules/redshift/main.tf

Redshift Serverlessの各種設定を定義します。

# Namespace(データベース・ユーザーの入れ物)
resource "aws_redshiftserverless_namespace" "main" {
  namespace_name      = "${var.project_name}-ns"
  db_name             = "dev"
  admin_username      = "admin"
  admin_user_password = var.redshift_admin_password

  # RedshiftがS3/Glue/Lake FormationにアクセスするためのIAMロール
  iam_roles = [var.redshift_iam_role_arn]

  lifecycle {
    ignore_changes = [admin_user_password] # 初回作成後はパスワードの差分を無視する
  }

  tags = {
    Project = var.project_name
  }
}

# Workgroup(計算リソースの入れ物)
resource "aws_redshiftserverless_workgroup" "main" {
  namespace_name = aws_redshiftserverless_namespace.main.namespace_name
  workgroup_name = "${var.project_name}-wg"

  # ベースキャパシティ: 4RPU(最小値、検証用)
  # 制限: 最大ストレージ32TB、テーブルあたり最大100カラム、メモリ64GB
  base_capacity = 4

  # パブリックアクセス(検証用。本番ではfalseにしてVPC内からのみ)
  publicly_accessible = true

  tags = {
    Project = var.project_name
  }
}

# 使用量制限(コスト暴走防止)
resource "aws_redshiftserverless_usage_limit" "daily" {
  resource_arn  = aws_redshiftserverless_workgroup.main.arn
  usage_type    = "serverless-compute"
  amount        = 8  # 1日あたり最大8RPU時間
  period        = "daily"
  breach_action = "deactivate"  # 上限到達でクエリ停止
}

modules/redshift/variables.tf

redshiftモジュールのmain.tfで使用する変数を定義します。

variable "project_name" {
  description = "プロジェクト名"
  type        = string
  default     = "iceberg-lab"
}

variable "redshift_admin_password" {
  description = "Redshift Serverless管理者パスワード"
  type        = string
  sensitive   = true
}

variable "redshift_iam_role_arn" {
  description = "redshift用IAMロールARN"
  type        = string
}

modules/iam/main.tf

今記事で必要な各リソースが処理を実行するためのIAM権限を定義します。

#glue用IAMロール
resource "aws_iam_role" "glue" {
  name = "${var.project_name}-glue-role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Action    = "sts:AssumeRole"
      Effect    = "Allow"
      Principal = { Service = "glue.amazonaws.com" }
    }]
  })
}

#glue用IAMポリシーアタッチ
resource "aws_iam_role_policy_attachment" "glue_service" {
  role       = aws_iam_role.glue.name
  policy_arn = "arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole"
}

#glue用IAMポリシー
resource "aws_iam_role_policy" "glue_data_access" {
  name = "${var.project_name}-glue-data-access"
  role = aws_iam_role.glue.id

  # 生データランディング用バケット、S3Tables用バケット、Lakeformation、Glueに対する権限
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid    = "S3LandingAccess"
        Effect = "Allow"
        Action = ["s3:GetObject", "s3:ListBucket"]
        Resource = [
          var.landing_bucket_arn,
          "${var.landing_bucket_arn}/*",
          var.scripts_bucket_arn,
          "${var.scripts_bucket_arn}/*",
        ]
      },
      {
        Sid      = "S3TablesAccess"
        Effect   = "Allow"
        Action   = ["s3tables:*"]
        Resource = [
          var.s3tables_bucket_arn,
          "${var.s3tables_bucket_arn}/*"
        ]
      },
      {
        Sid      = "GlueAndLakeFormation"
        Effect   = "Allow"
        Action   = ["glue:*", "lakeformation:GetDataAccess"]
        Resource = ["*"]
      }
    ]
  })
}

# EventBridge用ロール
# EventBridgeがStep Functionsを起動するために必要
resource "aws_iam_role" "eventbridge" {
  name = "${var.project_name}-eventbridge-role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = {
          Service = "events.amazonaws.com"
        }
      }
    ]
  })
}

resource "aws_iam_role_policy" "eventbridge_start_sfn" {
  name = "start-step-functions"
  role = aws_iam_role.eventbridge.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect   = "Allow"
        Action   = "states:StartExecution"
        Resource = var.etl_pipeline_statemachine_arn
      }
    ]
  })
}

# Step Functions用ロール
# Step FunctionsがGlue Job を起動し、SNSに通知するために必要
resource "aws_iam_role" "step_functions" {
  name = "${var.project_name}-sfn-role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = {
          Service = "states.amazonaws.com"
        }
      }
    ]
  })
}

resource "aws_iam_role_policy" "sfn_glue_sns" {
  name = "glue-and-sns"
  role = aws_iam_role.step_functions.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        # Glue Jobの起動・ステータス確認
        Effect = "Allow"
        Action = [
          "glue:StartJobRun",
          "glue:GetJobRun",
          "glue:GetJobRuns",
          "glue:BatchStopJobRun"
        ]
        Resource = var.csv_to_iceberg_glue_job_arn
      },
      {
        # SNSへの通知
        Effect   = "Allow"
        Action   = "sns:Publish"
        Resource = var.pipeline_notification_sns_topic_arn
      }
    ]
  })
}



################################以下今回追加箇所#####################################



# Redshift用ロール
resource "aws_iam_role" "redshift" {
  name = "${var.project_name}-redshift-role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = {
          Service = "redshift.amazonaws.com"
        }
      }
    ]
  })
}

resource "aws_iam_role_policy" "redshift_lakehouse" {
  name = "lakehouse-access"
  role = aws_iam_role.redshift.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        # Glue Data Catalogへのアクセス
        Effect = "Allow"
        Action = [
          "glue:GetDatabase",
          "glue:GetDatabases",
          "glue:GetTable",
          "glue:GetTables",
          "glue:GetPartitions",
          "glue:GetCatalog",
          "glue:GetCatalogs",
          "glue:CreateTable",
          "glue:UpdateTable"
        ]
        Resource = "*"
      },
      {
        # S3 Tables(Iceberg テーブル)へのアクセス
        Effect = "Allow"
        Action = [
          "s3tables:*"
        ]
        Resource = "*"
      },
      {
        # S3データ読み書き
        Effect = "Allow"
        Action = [
          "s3:GetObject",
          "s3:PutObject",
          "s3:ListBucket",
          "s3:GetBucketLocation"
        ]
        Resource = "*"
      },
      {
        # Lake Formation権限
        Effect = "Allow"
        Action = [
          "lakeformation:GetDataAccess"
        ]
        Resource = "*"
      }
    ]
  })
}

modules/iam/output.tf

他モジュールで使用する出力を定義します。

output "glue_iamrole_arn" {
  value = aws_iam_role.glue.arn
}

output "eventbridge_iamrole_arn" {
  value = aws_iam_role.eventbridge.arn
}

output "step_functions_iam_role_arn" {
  value = aws_iam_role.step_functions.arn
}

################################以下今回追加箇所#####################################

output "redshift_iam_role_arn" {
  value = aws_iam_role.redshift.arn
}

main.tf

モジュールの呼び出し側となるtfファイルを定義します。

# S3テーブル関連
module "s3_tables" {
  source        = "./modules/s3_tables/"
  project_name  = var.project_name
}

# 生データ格納S3関連
module "s3_landing" {
  source        = "./modules/s3_landing/"

  project_name                  = var.project_name
  etl_pipeline_statemachine_arn = module.step_functions.etl_pipeline_statemachine_arn
  eventbridge_iamrole_arn       = module.iam.eventbridge_iamrole_arn
}

# IAM関連
module "iam" {
  source          = "./modules/iam/"

  project_name                        = var.project_name
  landing_bucket_arn                  = module.s3_landing.landing_bucket_arn
  s3tables_bucket_arn                 = module.s3_tables.s3tables_bucket_arn
  scripts_bucket_arn                  = module.glue.scripts_bucket_arn
  etl_pipeline_statemachine_arn       = module.step_functions.etl_pipeline_statemachine_arn
  csv_to_iceberg_glue_job_arn         = module.glue.csv_to_iceberg_glue_job_arn
  pipeline_notification_sns_topic_arn = module.step_functions.pipeline_notification_sns_topic_arn
}

# Glue関連
module "glue" {
  source        = "./modules/glue/"

  project_name         = var.project_name
  s3tables_bucket_name = module.s3_tables.s3tables_bucket_name
  glue_iamrole_arn     = module.iam.glue_iamrole_arn
}

# StepFunctions関連
module "step_functions" {
  source = "./modules/step_functions/"

  project_name                 = var.project_name
  step_functions_iam_role_arn  = module.iam.step_functions_iam_role_arn
  csv_to_iceberg_glue_job_name = module.glue.csv_to_iceberg_glue_job_name
  s3tables_bucket_name         = module.s3_tables.s3tables_bucket_name
  notification_email           = var.notification_email
}

################################以下今回追加箇所#####################################


# Redshift関連
module "redshift" {
  source = "./modules/redshift/"

  project_name                 = var.project_name
  redshift_admin_password      = var.redshift_admin_password
  redshift_iam_role_arn        = module.iam.redshift_iam_role_arn
}

デプロイ

まずは新しく作成したモジュールを読み込むためにterraform initを実行します。

C:\s3-tables-iceberg-lab > terraform init
Initializing the backend...
Initializing modules...
- redshift in modules\redshift
Initializing provider plugins...
- Reusing previous version of hashicorp/aws from the dependency lock file
- Using previously-installed hashicorp/aws v6.33.0

Terraform has been successfully initialized!

You may now begin working with Terraform. Try running "terraform plan" to see
any changes that are required for your infrastructure. All Terraform commands
should now work.

If you ever set or change modules or backend configuration for Terraform,
rerun this command to reinitialize your working directory. If you forget, other
commands will detect it and remind you to do so if necessary.

次にterraform planを実行します。実行前にTF_VAR_変数名という環境変数を設定します。これにより、モジュール呼び出しのmain.tfにて使用している変数redshift_admin_passwordに機微情報であるパスワードをコードに記載せずに値を設定できます。

C:\s3-tables-iceberg-lab > set TF_VAR_redshift_admin_password=xxx

C:\s3-tables-iceberg-lab > terraform plan
module.s3_tables.data.aws_caller_identity.current: Reading...
module.s3_landing.data.aws_caller_identity.current: Reading...
module.glue.data.aws_caller_identity.current: Reading...
module.step_functions.data.aws_caller_identity.current: Reading...
module.iam.aws_iam_role.eventbridge: Refreshing state... [id=iceberg-lab-eventbridge-role]
module.iam.aws_iam_role.step_functions: Refreshing state... [id=iceberg-lab-sfn-role]
module.step_functions.aws_sns_topic.pipeline_notification: Refreshing state... [id=arn:aws:sns:ap-northeast-1:{AccoundId}:iceberg-lab-pipeline-notification]
module.iam.aws_iam_role.glue: Refreshing state... [id=iceberg-lab-glue-role]
module.s3_tables.aws_s3tables_table_bucket.main: Refreshing state... [name=iceberg-lab]
module.step_functions.data.aws_caller_identity.current: Read complete after 0s [id={AccoundId}]
module.s3_tables.data.aws_caller_identity.current: Read complete after 0s [id={AccoundId}]
module.glue.data.aws_caller_identity.current: Read complete after 0s [id={AccoundId}]
module.s3_landing.data.aws_caller_identity.current: Read complete after 0s [id={AccoundId}]
module.glue.aws_s3_bucket.glue_scripts: Refreshing state... [id=iceberg-lab-glue-scripts-{AccoundId}]
module.s3_landing.aws_s3_bucket.landing: Refreshing state... [id=iceberg-lab-landing-{AccoundId}]
module.step_functions.aws_sns_topic_subscription.email: Refreshing state... [id=arn:aws:sns:ap-northeast-1:{AccoundId}:iceberg-lab-pipeline-notification:e0e63b7a-82f3-44ea-8a59-ba38e5f2af88]
module.s3_tables.aws_s3tables_namespace.analytics: Refreshing state...
module.s3_tables.data.aws_iam_policy_document.table_bucket: Reading...
module.s3_tables.data.aws_iam_policy_document.table_bucket: Read complete after 0s [id=3931547792]
module.s3_tables.aws_s3tables_table_bucket_policy.main: Refreshing state...
module.s3_tables.aws_s3tables_table.orders: Refreshing state... [name=orders]
module.s3_tables.aws_s3tables_table.customers: Refreshing state... [name=customers]
module.glue.aws_s3_object.glue_script: Refreshing state... [id=iceberg-lab-glue-scripts-{AccoundId}/scripts/glue_csv_to_iceberg.py]
module.s3_landing.aws_s3_bucket_notification.landing_eventbridge: Refreshing state... [id=iceberg-lab-landing-{AccoundId}]
module.s3_landing.aws_cloudwatch_event_rule.s3_csv_arrived: Refreshing state... [id=iceberg-lab-csv-arrived]
module.s3_landing.aws_s3_bucket_versioning.landing: Refreshing state... [id=iceberg-lab-landing-{AccoundId}]
module.iam.aws_iam_role_policy_attachment.glue_service: Refreshing state... [id=iceberg-lab-glue-role/arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole]
module.iam.aws_iam_role_policy.glue_data_access: Refreshing state... [id=iceberg-lab-glue-role:iceberg-lab-glue-data-access]
module.glue.aws_glue_job.csv_to_iceberg: Refreshing state... [id=iceberg-lab-csv-to-iceberg]
module.iam.aws_iam_role_policy.sfn_glue_sns: Refreshing state... [id=iceberg-lab-sfn-role:glue-and-sns]
module.step_functions.aws_sfn_state_machine.etl_pipeline: Refreshing state... [id=arn:aws:states:ap-northeast-1:{AccoundId}:stateMachine:iceberg-lab-etl-pipeline]
module.iam.aws_iam_role_policy.eventbridge_start_sfn: Refreshing state... [id=iceberg-lab-eventbridge-role:start-step-functions]
module.s3_landing.aws_cloudwatch_event_target.step_functions: Refreshing state... [id=iceberg-lab-csv-arrived-terraform-20260225153517622700000001]

Terraform used the selected providers to generate the following execution plan. Resource actions are indicated with the
following symbols:
  + create

Terraform will perform the following actions:

  # module.iam.aws_iam_role.redshift will be created
  + resource "aws_iam_role" "redshift" {
      + arn                   = (known after apply)
      + assume_role_policy    = jsonencode(
            {
              + Statement = [
                  + {
                      + Action    = "sts:AssumeRole"
                      + Effect    = "Allow"
                      + Principal = {
                          + Service = "redshift.amazonaws.com"
                        }
                    },
                ]
              + Version   = "2012-10-17"
            }
        )
      + create_date           = (known after apply)
      + force_detach_policies = false
      + id                    = (known after apply)
      + managed_policy_arns   = (known after apply)
      + max_session_duration  = 3600
      + name                  = "iceberg-lab-redshift-role"
      + name_prefix           = (known after apply)
      + path                  = "/"
      + tags_all              = (known after apply)
      + unique_id             = (known after apply)

      + inline_policy (known after apply)
    }

  # module.iam.aws_iam_role_policy.redshift_lakehouse will be created
  + resource "aws_iam_role_policy" "redshift_lakehouse" {
      + id          = (known after apply)
      + name        = "lakehouse-access"
      + name_prefix = (known after apply)
      + policy      = jsonencode(
            {
              + Statement = [
                  + {
                      + Action   = [
                          + "glue:GetDatabase",
                          + "glue:GetDatabases",
                          + "glue:GetTable",
                          + "glue:GetTables",
                          + "glue:GetPartitions",
                          + "glue:GetCatalog",
                          + "glue:GetCatalogs",
                          + "glue:CreateTable",
                          + "glue:UpdateTable",
                        ]
                      + Effect   = "Allow"
                      + Resource = "*"
                    },
                  + {
                      + Action   = [
                          + "s3tables:*",
                        ]
                      + Effect   = "Allow"
                      + Resource = "*"
                    },
                  + {
                      + Action   = [
                          + "s3:GetObject",
                          + "s3:PutObject",
                          + "s3:ListBucket",
                          + "s3:GetBucketLocation",
                        ]
                      + Effect   = "Allow"
                      + Resource = "*"
                    },
                  + {
                      + Action   = [
                          + "lakeformation:GetDataAccess",
                        ]
                      + Effect   = "Allow"
                      + Resource = "*"
                    },
                ]
              + Version   = "2012-10-17"
            }
        )
      + role        = (known after apply)
    }

  # module.redshift.aws_redshiftserverless_namespace.main will be created
  + resource "aws_redshiftserverless_namespace" "main" {
      + admin_password_secret_arn        = (known after apply)
      + admin_password_secret_kms_key_id = (known after apply)
      + admin_user_password              = (sensitive value)
      + admin_user_password_wo           = (write-only attribute)
      + admin_username                   = (sensitive value)
      + arn                              = (known after apply)
      + db_name                          = "dev"
      + iam_roles                        = (known after apply)
      + id                               = (known after apply)
      + kms_key_id                       = (known after apply)
      + namespace_id                     = (known after apply)
      + namespace_name                   = "iceberg-lab-ns"
      + region                           = "ap-northeast-1"
      + tags                             = {
          + "Project" = "iceberg-lab"
        }
      + tags_all                         = {
          + "Project" = "iceberg-lab"
        }
    }

  # module.redshift.aws_redshiftserverless_usage_limit.daily will be created
  + resource "aws_redshiftserverless_usage_limit" "daily" {
      + amount        = 8
      + arn           = (known after apply)
      + breach_action = "deactivate"
      + id            = (known after apply)
      + period        = "daily"
      + region        = "ap-northeast-1"
      + resource_arn  = (known after apply)
      + usage_type    = "serverless-compute"
    }

  # module.redshift.aws_redshiftserverless_workgroup.main will be created
  + resource "aws_redshiftserverless_workgroup" "main" {
      + arn                 = (known after apply)
      + base_capacity       = 4
      + endpoint            = (known after apply)
      + id                  = (known after apply)
      + namespace_name      = "iceberg-lab-ns"
      + port                = (known after apply)
      + publicly_accessible = true
      + region              = "ap-northeast-1"
      + security_group_ids  = (known after apply)
      + subnet_ids          = (known after apply)
      + tags                = {
          + "Project" = "iceberg-lab"
        }
      + tags_all            = {
          + "Project" = "iceberg-lab"
        }
      + track_name          = (known after apply)
      + workgroup_id        = (known after apply)
      + workgroup_name      = "iceberg-lab-wg"

      + config_parameter (known after apply)

      + price_performance_target (known after apply)
    }

Plan: 5 to add, 0 to change, 0 to destroy.

───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────

Note: You didn't use the -out option to save this plan, so Terraform can't guarantee to take exactly these actions if
you run "terraform apply" now.

terraform applyを実行します。

C:\s3-tables-iceberg-lab > terraform apply
module.s3_tables.data.aws_caller_identity.current: Reading...
module.s3_landing.data.aws_caller_identity.current: Reading...
module.step_functions.data.aws_caller_identity.current: Reading...
module.glue.data.aws_caller_identity.current: Reading...
module.iam.aws_iam_role.glue: Refreshing state... [id=iceberg-lab-glue-role]
module.iam.aws_iam_role.eventbridge: Refreshing state... [id=iceberg-lab-eventbridge-role]
module.iam.aws_iam_role.step_functions: Refreshing state... [id=iceberg-lab-sfn-role]
module.step_functions.aws_sns_topic.pipeline_notification: Refreshing state... [id=arn:aws:sns:ap-northeast-1:{AccoundId}:iceberg-lab-pipeline-notification]
module.s3_tables.aws_s3tables_table_bucket.main: Refreshing state... [name=iceberg-lab]
module.s3_tables.data.aws_caller_identity.current: Read complete after 0s [id={AccoundId}]
module.s3_landing.data.aws_caller_identity.current: Read complete after 0s [id={AccoundId}]
module.s3_landing.aws_s3_bucket.landing: Refreshing state... [id=iceberg-lab-landing-{AccoundId}]
module.glue.data.aws_caller_identity.current: Read complete after 0s [id={AccoundId}]
module.glue.aws_s3_bucket.glue_scripts: Refreshing state... [id=iceberg-lab-glue-scripts-{AccoundId}]
module.step_functions.data.aws_caller_identity.current: Read complete after 0s [id={AccoundId}]
module.step_functions.aws_sns_topic_subscription.email: Refreshing state... [id=arn:aws:sns:ap-northeast-1:{AccoundId}:iceberg-lab-pipeline-notification:e0e63b7a-82f3-44ea-8a59-ba38e5f2af88]
module.s3_tables.aws_s3tables_namespace.analytics: Refreshing state...
module.s3_tables.data.aws_iam_policy_document.table_bucket: Reading...
module.s3_tables.data.aws_iam_policy_document.table_bucket: Read complete after 0s [id=3931547792]
module.s3_tables.aws_s3tables_table_bucket_policy.main: Refreshing state...
module.s3_tables.aws_s3tables_table.orders: Refreshing state... [name=orders]
module.s3_tables.aws_s3tables_table.customers: Refreshing state... [name=customers]
module.s3_landing.aws_s3_bucket_notification.landing_eventbridge: Refreshing state... [id=iceberg-lab-landing-{AccoundId}]
module.s3_landing.aws_s3_bucket_versioning.landing: Refreshing state... [id=iceberg-lab-landing-{AccoundId}]
module.s3_landing.aws_cloudwatch_event_rule.s3_csv_arrived: Refreshing state... [id=iceberg-lab-csv-arrived]
module.glue.aws_s3_object.glue_script: Refreshing state... [id=iceberg-lab-glue-scripts-{AccoundId}/scripts/glue_csv_to_iceberg.py]
module.iam.aws_iam_role_policy_attachment.glue_service: Refreshing state... [id=iceberg-lab-glue-role/arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole]
module.iam.aws_iam_role_policy.glue_data_access: Refreshing state... [id=iceberg-lab-glue-role:iceberg-lab-glue-data-access]
module.glue.aws_glue_job.csv_to_iceberg: Refreshing state... [id=iceberg-lab-csv-to-iceberg]
module.iam.aws_iam_role_policy.sfn_glue_sns: Refreshing state... [id=iceberg-lab-sfn-role:glue-and-sns]
module.step_functions.aws_sfn_state_machine.etl_pipeline: Refreshing state... [id=arn:aws:states:ap-northeast-1:{AccoundId}:stateMachine:iceberg-lab-etl-pipeline]
module.iam.aws_iam_role_policy.eventbridge_start_sfn: Refreshing state... [id=iceberg-lab-eventbridge-role:start-step-functions]
module.s3_landing.aws_cloudwatch_event_target.step_functions: Refreshing state... [id=iceberg-lab-csv-arrived-terraform-20260225153517622700000001]

Terraform used the selected providers to generate the following execution plan. Resource actions are indicated with the
following symbols:
  + create

Terraform will perform the following actions:

  # module.iam.aws_iam_role.redshift will be created
  + resource "aws_iam_role" "redshift" {
      + arn                   = (known after apply)
      + assume_role_policy    = jsonencode(
            {
              + Statement = [
                  + {
                      + Action    = "sts:AssumeRole"
                      + Effect    = "Allow"
                      + Principal = {
                          + Service = "redshift.amazonaws.com"
                        }
                    },
                ]
              + Version   = "2012-10-17"
            }
        )
      + create_date           = (known after apply)
      + force_detach_policies = false
      + id                    = (known after apply)
      + managed_policy_arns   = (known after apply)
      + max_session_duration  = 3600
      + name                  = "iceberg-lab-redshift-role"
      + name_prefix           = (known after apply)
      + path                  = "/"
      + tags_all              = (known after apply)
      + unique_id             = (known after apply)

      + inline_policy (known after apply)
    }

  # module.iam.aws_iam_role_policy.redshift_lakehouse will be created
  + resource "aws_iam_role_policy" "redshift_lakehouse" {
      + id          = (known after apply)
      + name        = "lakehouse-access"
      + name_prefix = (known after apply)
      + policy      = jsonencode(
            {
              + Statement = [
                  + {
                      + Action   = [
                          + "glue:GetDatabase",
                          + "glue:GetDatabases",
                          + "glue:GetTable",
                          + "glue:GetTables",
                          + "glue:GetPartitions",
                          + "glue:GetCatalog",
                          + "glue:GetCatalogs",
                          + "glue:CreateTable",
                          + "glue:UpdateTable",
                        ]
                      + Effect   = "Allow"
                      + Resource = "*"
                    },
                  + {
                      + Action   = [
                          + "s3tables:*",
                        ]
                      + Effect   = "Allow"
                      + Resource = "*"
                    },
                  + {
                      + Action   = [
                          + "s3:GetObject",
                          + "s3:PutObject",
                          + "s3:ListBucket",
                          + "s3:GetBucketLocation",
                        ]
                      + Effect   = "Allow"
                      + Resource = "*"
                    },
                  + {
                      + Action   = [
                          + "lakeformation:GetDataAccess",
                        ]
                      + Effect   = "Allow"
                      + Resource = "*"
                    },
                ]
              + Version   = "2012-10-17"
            }
        )
      + role        = (known after apply)
    }

  # module.redshift.aws_redshiftserverless_namespace.main will be created
  + resource "aws_redshiftserverless_namespace" "main" {
      + admin_password_secret_arn        = (known after apply)
      + admin_password_secret_kms_key_id = (known after apply)
      + admin_user_password              = (sensitive value)
      + admin_user_password_wo           = (write-only attribute)
      + admin_username                   = (sensitive value)
      + arn                              = (known after apply)
      + db_name                          = "dev"
      + iam_roles                        = (known after apply)
      + id                               = (known after apply)
      + kms_key_id                       = (known after apply)
      + namespace_id                     = (known after apply)
      + namespace_name                   = "iceberg-lab-ns"
      + region                           = "ap-northeast-1"
      + tags                             = {
          + "Project" = "iceberg-lab"
        }
      + tags_all                         = {
          + "Project" = "iceberg-lab"
        }
    }

  # module.redshift.aws_redshiftserverless_usage_limit.daily will be created
  + resource "aws_redshiftserverless_usage_limit" "daily" {
      + amount        = 8
      + arn           = (known after apply)
      + breach_action = "deactivate"
      + id            = (known after apply)
      + period        = "daily"
      + region        = "ap-northeast-1"
      + resource_arn  = (known after apply)
      + usage_type    = "serverless-compute"
    }

  # module.redshift.aws_redshiftserverless_workgroup.main will be created
  + resource "aws_redshiftserverless_workgroup" "main" {
      + arn                 = (known after apply)
      + base_capacity       = 4
      + endpoint            = (known after apply)
      + id                  = (known after apply)
      + namespace_name      = "iceberg-lab-ns"
      + port                = (known after apply)
      + publicly_accessible = true
      + region              = "ap-northeast-1"
      + security_group_ids  = (known after apply)
      + subnet_ids          = (known after apply)
      + tags                = {
          + "Project" = "iceberg-lab"
        }
      + tags_all            = {
          + "Project" = "iceberg-lab"
        }
      + track_name          = (known after apply)
      + workgroup_id        = (known after apply)
      + workgroup_name      = "iceberg-lab-wg"

      + config_parameter (known after apply)

      + price_performance_target (known after apply)
    }

Plan: 5 to add, 0 to change, 0 to destroy.

Do you want to perform these actions?
  Terraform will perform the actions described above.
  Only 'yes' will be accepted to approve.

  Enter a value: yes

module.iam.aws_iam_role.redshift: Creating...
module.iam.aws_iam_role.redshift: Creation complete after 2s [id=iceberg-lab-redshift-role]
module.iam.aws_iam_role_policy.redshift_lakehouse: Creating...
module.redshift.aws_redshiftserverless_namespace.main: Creating...
module.redshift.aws_redshiftserverless_namespace.main: Creation complete after 0s [id=iceberg-lab-ns]
module.redshift.aws_redshiftserverless_workgroup.main: Creating...
module.iam.aws_iam_role_policy.redshift_lakehouse: Creation complete after 0s [id=iceberg-lab-redshift-role:lakehouse-access]
module.redshift.aws_redshiftserverless_workgroup.main: Still creating... [00m10s elapsed]
module.redshift.aws_redshiftserverless_workgroup.main: Still creating... [00m20s elapsed]
module.redshift.aws_redshiftserverless_workgroup.main: Still creating... [00m30s elapsed]
module.redshift.aws_redshiftserverless_workgroup.main: Still creating... [00m40s elapsed]
module.redshift.aws_redshiftserverless_workgroup.main: Still creating... [00m50s elapsed]
module.redshift.aws_redshiftserverless_workgroup.main: Still creating... [01m00s elapsed]
module.redshift.aws_redshiftserverless_workgroup.main: Still creating... [01m10s elapsed]
module.redshift.aws_redshiftserverless_workgroup.main: Still creating... [01m20s elapsed]
module.redshift.aws_redshiftserverless_workgroup.main: Creation complete after 1m26s [id=iceberg-lab-wg]
module.redshift.aws_redshiftserverless_usage_limit.daily: Creating...
module.redshift.aws_redshiftserverless_usage_limit.daily: Creation complete after 0s [id=57431a2c-1c31-45e0-9a73-2ac3e771c0bc]

Apply complete! Resources: 5 added, 0 changed, 0 destroyed.

リソースの作成が完了しました。

上記のようにSETで設定した環境変数はセッション限りの一時的なもの。システム環境変数に設定することで新たなセッションを作成時、同様の作業を避けることができる。

4RPU構成の制限事項

項目 制限
ベースメモリ 64GB(4 RPU × 16 GB)
最大マネージドストレージ 32TB
テーブルあたり最大カラム数 100カラム
VACUUM BOOST(テーブルの不要領域を解放・ソートするVACUUM操作を、専用の追加リソースを割り当てて高速化する) 非対応(8RPU以上が必要)

検証用途であれば4RPUで十分です。本番で100カラム超のテーブルを扱う場合は8RPU以上を検討してください。


Step 2:Lake FormationでRedshiftにアクセス権限を付与

RedshiftよりS3 Tablesにクエリを実行するためのカタログ、データベース、テーブルに対するアクセス権限をLake Formationで設定します。

まずはカタログへの権限を設定します。

項目 設定値
Principals Redshiftサービスロール(RedshiftからLake Formationにアクセス可能にするため。Redshiftがカタログを見れる状態にする。)
LF-Tags or catalog resources Named Data Catalog resources
Catalogs s3tablescatalog/<テーブルバケット名>
permissions DESCRIBE

image.png
image.png

次にデータベースへの権限を設定します。

項目 設定値
Principals 今回作成したRedshiftが使用するロール
LF-Tags or catalog resources Named Data Catalog resources
Catalogs s3tablescatalog/<テーブルバケット名>
Databases analytics
permissions DESCRIBE, CREATE_TABLE

image.png
image.png

最後にテーブルへの権限を設定します。

項目 設定値
Principals 今回作成したRedshiftが使用するロール
LF-Tags or catalog resources Named Data Catalog resources
Catalogs s3tablescatalog/<テーブルバケット名>
Databases analytics
permissions SELECT, INSERT, DELETE, DESCRIBE, ALTER
Tables orders

image.png
image.png

3レベルでの権限付与が完了しました。
image.png

image.png

Step 3:RedshiftからIceberg テーブルをクエリ

3-1. Redshift Query Editor v2に接続

RedShiftのコンソール画面から、ワークグループを選択し、クエリエディタを開始します。
image.png
image.png

画面左のワークグループを選択すると、接続開始設定の画面が出るので、接続方法はfederated user、接続先のデータベースはdevとします。
image.png

外部データベースとして、Lake Formationに登録されたs3tablescatalogがマウントされていることを確認できます。また、s3tablescatalogに登録されたテーブルバケット、その中のデータベース、テーブルも確認できます。
image.png

3-2. Icebergテーブルへのクエリ

いくつかのクエリを実行してみます。

〇S3 TablesのIcebergテーブルを直接クエリ

SELECT
  order_id,
  customer_id,
  order_date,
  status
FROM
    "iceberg-lab@s3tablescatalog"."analytics"."orders"
ORDER BY order_date DESC
LIMIT 10;

-- 実行結果
order_id customer_id order_date  status
1008     3           2024-10-01  completed
1007     2           2024-09-01  completed
1005     4           2024-07-15  completed
1004     1           2024-07-01  completed
1003     3           2024-06-10  completed
1002     2           2024-06-05  completed
1001     1           2024-06-01  refunded
1014     2003        2024-01-17  pending
1011     2003        2024-01-17  pending
1014     2003        2024-01-17  pending

〇RedshiftローカルテーブルとIcebergテーブルの結合(ローカルにマスタ、Icebergにトランザクションデータ)

Redshiftのローカルデータベースdevにcustomersテーブルを作成、データを挿入します。

CREATE TABLE customers (
  customer_id   BIGINT,
  customer_name VARCHAR(256),
  email         VARCHAR(256),
  city          VARCHAR(256),
  country       VARCHAR(256),
  segment       VARCHAR(256),
  created_at    TIMESTAMP
);

INSERT INTO customers VALUES (
  1,
  'Taro Yamada',
  'taro.yamada@example.com',
  'Tokyo',
  'Japan',
  'Consumer',
  '2024-01-15 09:00:00'
);

クエリエディタでテーブルを確認できます。
image.png

RedshiftローカルテーブルcustomersとIcebergテーブルordersを結合し、データの参照を行います。

SELECT
  o.order_id,
  c.customer_name,
  o.order_date,
  o.status
FROM
    "iceberg-lab@s3tablescatalog"."analytics"."orders" o
JOIN
  dev.public.customers c ON o.customer_id = c.customer_id;

-- 実行結果
order_id  customer_name order_date  status
1001      Taro Yamada   2024-06-01  refunded
1004      Taro Yamada   2024-07-01  completed

AthenaとRedshiftで同じIcebergテーブルを参照できる。データのコピーは一切不要。Athenaはアドホック分析向き、Redshiftは複雑な結合や大量集計向き、と使い分けられる。

Step 4:RedshiftからIcebergテーブルへの書き込み

RedshiftはIcebergテーブルへの直接書き込みもサポートしています。

4-1. RedshiftからIcebergテーブルにINSERT

RedshiftのSQLで直接Icebergテーブルに書き込み、参照してみます。

INSERT INTO "iceberg-lab@s3tablescatalog"."analytics"."orders"
(order_id, customer_id, order_date, total_amount, status)
VALUES
(9001, 1, DATE '2026-02-28', 50000, 'completed'),
(9002, 2, DATE '2026-02-28', 32000, 'processing');

Icebergテーブルにデータが挿入されていることが確認できました。

SELECT
  order_id,
  customer_id,
  order_date,
  status
FROM
    "iceberg-lab@s3tablescatalog"."analytics"."orders"
ORDER BY order_date DESC
LIMIT 10;

-- 実行結果
order_id customer_id order_date  status
1008     3           2024-10-01  completed
1007     2           2024-09-01  completed
1005     4           2024-07-15  completed
1004     1           2024-07-01  completed
1003     3           2024-06-10  completed
1002     2           2024-06-05  completed
1001     1           2024-06-01  refunded
1014     2003        2024-01-17  pending
1011     2003        2024-01-17  pending
1014     2003        2024-01-17  pending
9001     1           2026-02-28  completed
9002     2           2026-02-28  processing

4-2. CTAS(集計結果を新しいIcebergテーブルとして保存)

Icebergテーブルから集計した日次集計結果をIcebergテーブルとして作成します。

CREATE TABLE "iceberg-lab@s3tablescatalog"."analytics"."daily_summary"
USING ICEBERG
AS
SELECT
  order_date,
  COUNT(*)                                              AS order_count,
  SUM(total_amount)                                     AS total_amount,
  CAST(AVG(total_amount) AS DECIMAL(12,2))              AS avg_amount,
  SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) AS completed_count
FROM
  "iceberg-lab@s3tablescatalog"."analytics"."orders"
GROUP BY order_date;

クエリエディタでテーブルを確認できます。
image.png

S3のテーブルバケットのコンソール画面からも確認できます。
image.png

作成したテーブルは想定通りのデータを持っていました。

SELECT * FROM "iceberg-lab@s3tablescatalog"."analytics"."daily_summary"
ORDER BY order_date;

-- 実行結果
order_date order_count total_amount avg_amount completed_count
2024-01-15 3           294000.00    98000.00   3
2024-01-16 3           22500.00     7500.00    3
2024-01-17 3           34800.00     11600.00   0
2024-06-01 1           359.88       359.88     0
2024-06-05 1           599.97       599.97     1
2024-06-10 1           499.99       499.99     1
2024-07-01 1           719.76       719.76     1
2024-07-15 1           189.99       189.99     1
2024-09-01 1           849.96       849.96     1
2024-10-01 1           299.99       299.99     1

RedshiftのIceberg書き込みでサポートされるDMLはCREATE TABLE(CTAS含む)とINSERTのみであり、DELETE, UPDATE, MERGE, ALTER TABLEは未サポート。行レベルの更新・削除が必要な場合は、引き続きAthena / Glueなどを使用する。

Part B: Glue Data Quality品質ゲート

Step 5:Glue Data Qualityルールの設計

AWS Glue Data QualityはS3 Tables、Lake Formation管理のIcebergテーブルにも対応しています。ETLパイプラインに品質チェックを組み込みます。CSVデータが以下のルール全てを満たさない場合、Icebergテーブルへの投入をブロックします。

品質ルール設計

Rules = [
    # 1. レコード数: 空ファイルでないこと
    RowCount > 0,

    # 2. order_id: 全行に値があること(NULL禁止)
    IsComplete "order_id",

    # 3. order_id: 重複がないこと
    IsUnique "order_id",

    # 4. total_amount: 全行に値があること
    IsComplete "total_amount",

    # 5. total_amount: 0以上であること(マイナス金額はNG)
    ColumnValues "total_amount" >= 0,

    # 6. status: 許可された値のみ
    ColumnValues "status" in ["completed", "pending", "processing", "cancelled"],

    # 7. order_date: 全行に値があること
    IsComplete "order_date",

    # 8. customer_id: 全行に値があること
    IsComplete "customer_id"
]

ルール設計のポイント

ルール種別 目的 失敗時の影響
RowCount 空ファイル検知 Glue Jobの無駄な実行を防止
IsComplete NULLチェック 下流のJOINやフィルタでの予期しない結果を防止
IsUnique 重複チェック 集計の二重カウントを防止
ColumnValues (範囲) 値域チェック 異常値の混入を防止
ColumnValues (許可リスト) カテゴリ値チェック 未定義ステータスの混入を防止

Step 6:Glue Jobに品質チェックを組み込み

第2弾記事のGlue Jobスクリプト(scripts/glue_csv_to_iceberg.py)を拡張し、品質チェックを追加します。

scripts/glue_csv_to_iceberg.py(品質チェック追加版)

import sys
import json
import boto3
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from awsgluedq.transforms import EvaluateDataQuality  # AWS Glue Data Quality
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

# ── パラメータ取得 ──
args = getResolvedOptions(sys.argv, [
    'JOB_NAME', 'SOURCE_PATH', 'TARGET_TABLE',
    'CATALOG_ID', 'WAREHOUSE_PATH', 'SNS_TOPIC_ARN'
])

# ── Sparkセッション設定 ──
spark = (SparkSession.builder
    .config("spark.sql.extensions",
            "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.defaultCatalog", "s3tablescatalog")
    .config("spark.sql.catalog.s3tablescatalog",
            "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.s3tablescatalog.catalog-impl",
            "org.apache.iceberg.aws.glue.GlueCatalog")
    .config("spark.sql.catalog.s3tablescatalog.glue.id", args['CATALOG_ID'])
    .config("spark.sql.catalog.s3tablescatalog.warehouse", args['WAREHOUSE_PATH'])
    .config("spark.sql.iceberg.handle-timestamp-without-timezone", "true")
    .getOrCreate()
)

glueContext = GlueContext(spark.sparkContext)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

sns_client = boto3.client('sns')

# ── Step 1: CSV読み込み ──
print(f"Reading CSV from: {args['SOURCE_PATH']}")
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(args['SOURCE_PATH'])

record_count = df.count()
print(f"Record count: {record_count}")

# DynamicFrame に変換(EvaluateDataQualityがDynamicFrameを受け取るため)
dyf = DynamicFrame.fromDF(df, glueContext, "incoming_orders")

# ── Step 2: DQDLでデータ品質ルールを定義 ──
DQDL_RULESET = """
Rules = [
    RowCount > 0,
    IsComplete "order_id",
    IsUnique "order_id",
    IsComplete "total_amount",
    ColumnValues "total_amount" >= 0,
    ColumnValues "status" in ["completed", "pending", "processing", "cancelled"],
    IsComplete "order_date",
    IsComplete "customer_id"
]
"""

# ── Step 3: EvaluateDataQualityで品質チェック実行 ──
dq_results = EvaluateDataQuality().process_rows(
    frame=dyf,
    ruleset=DQDL_RULESET,
    publishing_options={
        "dataQualityEvaluationContext": "orders_quality_check",
        "enableDataQualityCloudWatchMetrics": True,   # CloudWatchにメトリクス送信
        "enableDataQualityResultsPublishing": True,   # Glueコンソールに結果表示
    },
    additional_options={
        "performanceTuning.caching": "CACHE_NOTHING"
    }
)

# ルールごとの合否結果を取得
rule_outcomes_dyf = SelectFromCollection.apply(
    dfc=dq_results,
    key="ruleOutcomes",
    transformation_ctx="ruleOutcomes"
)
rule_outcomes_df = rule_outcomes_dyf.toDF()

# ── Step 4: 品質スコア計算&レポート出力 ──
total_rules = rule_outcomes_df.count()
passed_rules = rule_outcomes_df.filter(
    rule_outcomes_df["Outcome"] == "Passed"
).count()
quality_score = (passed_rules / total_rules) * 100

print(f"\n{'='*50}")
print(f"DATA QUALITY REPORT")
print(f"{'='*50}")
print(f"Score: {quality_score:.1f}% ({passed_rules}/{total_rules} rules passed)")
rule_outcomes_df.select("Rule", "Outcome", "FailureReason").show(truncate=False)
print(f"{'='*50}\n")

# ── Step 5: 品質ゲート判定 ──
if quality_score < 100:
    # 不合格:隔離フォルダに退避
    quarantine_path = args['SOURCE_PATH'].replace('/raw/', '/quarantine/')
    df.write.mode("overwrite").option("header", "true").csv(quarantine_path)
    print(f"Quarantined data to: {quarantine_path}")

    # 失敗ルール一覧を収集してSNS通知
    failed_rules = rule_outcomes_df.filter(
        rule_outcomes_df["Outcome"] == "Failed"
    ).select("Rule", "FailureReason").collect()

    failure_detail = "\n".join(
        f"  - {row['Rule']}: {row['FailureReason']}" for row in failed_rules
    )
    sns_client.publish(
        TopicArn=args['SNS_TOPIC_ARN'],
        Subject="Data Quality Check FAILED",
        Message=(
            f"Data Quality FAILED for {args['SOURCE_PATH']}\n"
            f"Score: {quality_score:.1f}%\n"
            f"Failed rules:\n{failure_detail}"
        )
    )

    # ジョブを失敗させる → Step FunctionsのCatchに遷移
    raise Exception(f"Data Quality check failed: {quality_score:.1f}%")

# ── Step 6: 品質チェック合格 → Icebergテーブルに投入 ──
print("Data Quality PASSED. Writing to Iceberg table...")

df.createOrReplaceTempView("incoming_orders")

spark.sql(f"""
    INSERT INTO {args['TARGET_TABLE']}
    SELECT *
    FROM incoming_orders
""")

print(f"Successfully wrote {record_count} records to {args['TARGET_TABLE']}")

# 合格通知
sns_client.publish(
    TopicArn=args['SNS_TOPIC_ARN'],
    Subject="Data Quality Check PASSED",
    Message=(
        f"File: {args['SOURCE_PATH']}\n"
        f"Records: {record_count}\n"
        f"Quality Score: {quality_score:.1f}%\n"
        f"All {total_rules} rules passed."
    )
)

job.commit()

Step 7:TerraformでGlue Data Qualityルールセットを定義

Glue Data Catalogに品質ルールセットを登録しておくと、コンソールからルールの確認・手動実行ができます。

modules/glue/main.tf

# 現在の実行アカウント情報を取得
data "aws_caller_identity" "current" {}

# gluescript格納用バケット
resource "aws_s3_bucket" "glue_scripts" {
  bucket = "${var.project_name}-glue-scripts-${data.aws_caller_identity.current.account_id}"
}

# gluescriptをS3バケットへ配置
resource "aws_s3_object" "glue_script" {
  bucket = aws_s3_bucket.glue_scripts.id
  key    = "scripts/glue_csv_to_iceberg.py"
  source = "${path.module}/scripts/glue_csv_to_iceberg.py"
  etag   = filemd5("${path.module}/scripts/glue_csv_to_iceberg.py")
}

# gluejob
resource "aws_glue_job" "csv_to_iceberg" {
  name     = "${var.project_name}-csv-to-iceberg"
  role_arn = var.glue_iamrole_arn

  command {
    script_location = "s3://${aws_s3_bucket.glue_scripts.id}/scripts/glue_csv_to_iceberg.py"
    python_version  = "3"
  }

  glue_version      = "5.0"
  number_of_workers = 2
  worker_type       = "G.1X"
  timeout           = 60

  default_arguments = {
    "--job-language"                    = "python"
    "--enable-metrics"                  = "true"
    "--enable-continuous-cloudwatch-log" = "true"
    "--datalake-formats"                = "iceberg"
    "--CATALOG_ID"                      = "${data.aws_caller_identity.current.account_id}:s3tablescatalog/${var.s3tables_bucket_name}"
    "--WAREHOUSE_PATH"                  = "s3://${var.s3tables_bucket_name}/warehouse/"
  }
}

################################以下今回追加箇所#####################################

locals {
  orders_ruleset = join(", ", [
    "Rules = [RowCount > 0",
    "IsComplete \"order_id\"",
    "IsUnique \"order_id\"",
    "IsComplete \"total_amount\"",
    "ColumnValues \"total_amount\" >= 0",
    "ColumnValues \"status\" in [\"completed\",\"pending\",\"processing\",\"cancelled\"]",
    "IsComplete \"order_date\"",
    "IsComplete \"customer_id\"]"
  ])
}

resource "aws_glue_data_quality_ruleset" "orders" {
  name        = "${var.project_name}-orders-quality-rules"
  description = "ordersテーブルのデータ品質ルール"

  # ルールの対象テーブル(Glue Data Catalog)
  target_table {
    database_name = "analytics"
    table_name    = "orders"
    # S3 Tablesカタログの場合、catalog_idを指定
    catalog_id = "${data.aws_caller_identity.current.account_id}:s3tablescatalog/${var.s3tables_bucket_name}"
  }

  ruleset = local.orders_ruleset # localで定義したローカル変数を使用
}

Step 8:Glue Jobの設定を更新

modules/glue/main.tf

SNS Topic ARNをGlue Jobに渡します。

resource "aws_glue_job" "csv_to_iceberg" {
  name     = "${var.project_name}-csv-to-iceberg"
  role_arn = aws_iam_role.glue.arn

  command {
    script_location = "s3://${aws_s3_bucket.landing.id}/scripts/glue_csv_to_iceberg.py"
    python_version  = "3"
  }

  glue_version      = "5.0"
  number_of_workers = 2
  worker_type       = "G.1X"
  timeout           = 60

  default_arguments = {
    "--job-language"                     = "python"
    "--enable-metrics"                   = "true"
    "--enable-continuous-cloudwatch-log" = "true"
    "--datalake-formats"                 = "iceberg"
    "--CATALOG_ID"                       = "${data.aws_caller_identity.current.account_id}:s3tablescatalog/${var.s3tables_bucket_name}"
    "--WAREHOUSE_PATH"                   = "s3://${var.s3tables_bucket_name}/warehouse/"
    "--SNS_TOPIC_ARN"                    = var.pipeline_notification_sns_topic_arn
  }
}

modules/glue/variables.tf

main.tfにて使用する変数を定義します。

variable "project_name" {
  description = "プロジェクト名"
  type        = string
  default     = "iceberg-lab"
}

variable "glue_iamrole_arn" {
  description = "glue用IAMロールARN"
  type        = string
}

variable "s3tables_bucket_name" {
  description = "S3テーブル用バケット名"
  type        = string
}

################################以下今回追加箇所#####################################

variable "pipeline_notification_sns_topic_arn" {
  description = "ETLパイプライン通知用SNSトピックARN"
  type        = string
}

modules/iam/main.tf

GlueロールにSNS発行権限、ランディング用バケットへのオブジェクト作成権限を追加します。

#glue用IAMポリシー
resource "aws_iam_role_policy" "glue_data_access" {
  name = "${var.project_name}-glue-data-access"
  role = aws_iam_role.glue.id

  # 生データランディング用バケット、S3Tables用バケット、Lakeformation、Glueに対する権限
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid    = "S3LandingAccess"
        Effect = "Allow"
        Action = ["s3:GetObject", "s3:ListBucket", "s3:PutObject"] # 変更箇所
        Resource = [
          var.landing_bucket_arn,
          "${var.landing_bucket_arn}/*",
          var.scripts_bucket_arn,
          "${var.scripts_bucket_arn}/*",
        ]
      },
      {
        Sid      = "S3TablesAccess"
        Effect   = "Allow"
        Action   = ["s3tables:*"]
        Resource = [
          var.s3tables_bucket_arn,
          "${var.s3tables_bucket_arn}/*"
        ]
      },
      {
        Sid      = "GlueAndLakeFormation"
        Effect   = "Allow"
        Action   = ["glue:*", "lakeformation:GetDataAccess"]
        Resource = ["*"]
      },

################################以下今回追加箇所#####################################
      {
        Sid      = "GlueSnsPublish"
        Effect   = "Allow"
        Action   = "sns:Publish"
        Resource = var.pipeline_notification_sns_topic_arn
      }
    ]
  })
}

main.tf

モジュール呼び出しを行うmain.tfに変更を加えます。

# Glue関連
module "glue" {
  source        = "./modules/glue/"

  project_name                        = var.project_name
  s3tables_bucket_name                = module.s3_tables.s3tables_bucket_name
  glue_iamrole_arn                    = module.iam.glue_iamrole_arn
  # 変更箇所
  pipeline_notification_sns_topic_arn = module.step_functions.pipeline_notification_sns_topic_arn
}

デプロイ

terraform planを実行します。

C:\s3-tables-iceberg-lab > terraform plan
module.step_functions.data.aws_caller_identity.current: Reading...
module.glue.data.aws_caller_identity.current: Reading...
module.s3_tables.data.aws_caller_identity.current: Reading...
module.s3_landing.data.aws_caller_identity.current: Reading...
module.iam.aws_iam_role.redshift: Refreshing state... [id=iceberg-lab-redshift-role]
module.iam.aws_iam_role.step_functions: Refreshing state... [id=iceberg-lab-sfn-role]
module.iam.aws_iam_role.eventbridge: Refreshing state... [id=iceberg-lab-eventbridge-role]
module.step_functions.aws_sns_topic.pipeline_notification: Refreshing state... [id=arn:aws:sns:ap-northeast-1:{AccountId}:iceberg-lab-pipeline-notification]
module.iam.aws_iam_role.glue: Refreshing state... [id=iceberg-lab-glue-role]
module.s3_tables.aws_s3tables_table_bucket.main: Refreshing state... [name=iceberg-lab]
module.s3_landing.data.aws_caller_identity.current: Read complete after 0s [id={AccountId}]
module.s3_landing.aws_s3_bucket.landing: Refreshing state... [id=iceberg-lab-landing-{AccountId}]
module.glue.data.aws_caller_identity.current: Read complete after 0s [id={AccountId}]
module.glue.aws_s3_bucket.glue_scripts: Refreshing state... [id=iceberg-lab-glue-scripts-{AccountId}]
module.s3_tables.data.aws_caller_identity.current: Read complete after 0s [id={AccountId}]
module.step_functions.data.aws_caller_identity.current: Read complete after 0s [id={AccountId}]
module.step_functions.aws_sns_topic_subscription.email: Refreshing state... [id=arn:aws:sns:ap-northeast-1:{AccountId}:iceberg-lab-pipeline-notification:e0e63b7a-82f3-44ea-8a59-ba38e5f2af88]
module.s3_tables.aws_s3tables_namespace.analytics: Refreshing state...
module.s3_tables.data.aws_iam_policy_document.table_bucket: Reading...
module.s3_tables.data.aws_iam_policy_document.table_bucket: Read complete after 0s [id=3931547792]
module.s3_tables.aws_s3tables_table_bucket_policy.main: Refreshing state...
module.s3_tables.aws_s3tables_table.customers: Refreshing state... [name=customers]
module.s3_tables.aws_s3tables_table.orders: Refreshing state... [name=orders]
module.s3_landing.aws_s3_bucket_versioning.landing: Refreshing state... [id=iceberg-lab-landing-{AccountId}]
module.s3_landing.aws_cloudwatch_event_rule.s3_csv_arrived: Refreshing state... [id=iceberg-lab-csv-arrived]
module.s3_landing.aws_s3_bucket_notification.landing_eventbridge: Refreshing state... [id=iceberg-lab-landing-{AccountId}]
module.glue.aws_s3_object.glue_script: Refreshing state... [id=iceberg-lab-glue-scripts-{AccountId}/scripts/glue_csv_to_iceberg.py]
module.iam.aws_iam_role_policy.redshift_lakehouse: Refreshing state... [id=iceberg-lab-redshift-role:lakehouse-access]
module.redshift.aws_redshiftserverless_namespace.main: Refreshing state... [id=iceberg-lab-ns]
module.iam.aws_iam_role_policy_attachment.glue_service: Refreshing state... [id=iceberg-lab-glue-role/arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole]
module.iam.aws_iam_role_policy.glue_data_access: Refreshing state... [id=iceberg-lab-glue-role:iceberg-lab-glue-data-access]
module.glue.aws_glue_job.csv_to_iceberg: Refreshing state... [id=iceberg-lab-csv-to-iceberg]
module.redshift.aws_redshiftserverless_workgroup.main: Refreshing state... [id=iceberg-lab-wg]
module.iam.aws_iam_role_policy.sfn_glue_sns: Refreshing state... [id=iceberg-lab-sfn-role:glue-and-sns]
module.step_functions.aws_sfn_state_machine.etl_pipeline: Refreshing state... [id=arn:aws:states:ap-northeast-1:{AccountId}:stateMachine:iceberg-lab-etl-pipeline]
module.redshift.aws_redshiftserverless_usage_limit.daily: Refreshing state... [id=57431a2c-1c31-45e0-9a73-2ac3e771c0bc]
module.iam.aws_iam_role_policy.eventbridge_start_sfn: Refreshing state... [id=iceberg-lab-eventbridge-role:start-step-functions]
module.s3_landing.aws_cloudwatch_event_target.step_functions: Refreshing state... [id=iceberg-lab-csv-arrived-terraform-20260225153517622700000001]

Terraform used the selected providers to generate the following execution plan. Resource actions are indicated with the
following symbols:
  + create
  ~ update in-place

Terraform will perform the following actions:

  # module.glue.aws_glue_data_quality_ruleset.orders will be created
  + resource "aws_glue_data_quality_ruleset" "orders" {
      + arn                   = (known after apply)
      + created_on            = (known after apply)
      + description           = "ordersテーブルのデータ品質ルール"
      + id                    = (known after apply)
      + last_modified_on      = (known after apply)
      + name                  = "iceberg-lab-orders-quality-rules"
      + recommendation_run_id = (known after apply)
      + region                = "ap-northeast-1"
      + ruleset               = "Rules = [RowCount > 0, IsComplete \"order_id\", IsUnique \"order_id\", IsComplete \"total_amount\", ColumnValues \"total_amount\" >= 0, ColumnValues \"status\" in [\"completed\",\"pending\",\"processing\",\"cancelled\"], IsComplete \"order_date\", IsComplete \"customer_id\"]"
      + tags_all              = (known after apply)

      + target_table {
          + catalog_id    = "{AccountId}:s3tablescatalog/iceberg-lab"
          + database_name = "analytics"
          + table_name    = "orders"
        }
    }

  # module.glue.aws_glue_job.csv_to_iceberg will be updated in-place
  ~ resource "aws_glue_job" "csv_to_iceberg" {
      ~ default_arguments         = {
          + "--SNS_TOPIC_ARN"                    = "arn:aws:sns:ap-northeast-1:{AccountId}:iceberg-lab-pipeline-notification"
            # (6 unchanged elements hidden)
        }
        id                        = "iceberg-lab-csv-to-iceberg"
        name                      = "iceberg-lab-csv-to-iceberg"
        # (17 unchanged attributes hidden)

        # (2 unchanged blocks hidden)
    }

  # module.glue.aws_s3_object.glue_script will be updated in-place
  ~ resource "aws_s3_object" "glue_script" {
      ~ etag                          = "df9c58f8917007d62a5b4d2fd36d4b23" -> "f5ad76623540211725c61d05595732db"
        id                            = "iceberg-lab-glue-scripts-{AccountId}/scripts/glue_csv_to_iceberg.py"
        tags                          = {}
      + version_id                    = (known after apply)
        # (25 unchanged attributes hidden)
    }

  # module.iam.aws_iam_role_policy.glue_data_access will be updated in-place
  ~ resource "aws_iam_role_policy" "glue_data_access" {
        id          = "iceberg-lab-glue-role:iceberg-lab-glue-data-access"
        name        = "iceberg-lab-glue-data-access"
      ~ policy      = jsonencode(
          ~ {
              ~ Statement = [
                    # (2 unchanged elements hidden)
                    {
                        Action   = [
                            "glue:*",
                            "lakeformation:GetDataAccess",
                        ]
                        Effect   = "Allow"
                        Resource = [
                            "*",
                        ]
                        Sid      = "GlueAndLakeFormation"
                    },
                  + {
                      + Action   = "sns:Publish"
                      + Effect   = "Allow"
                      + Resource = "arn:aws:sns:ap-northeast-1:{AccountId}:iceberg-lab-pipeline-notification"
                      + Sid      = "GlueSnsPublish"
                    },
                ]
                # (1 unchanged attribute hidden)
            }
        )
        # (2 unchanged attributes hidden)
    }

Plan: 1 to add, 3 to change, 0 to destroy.

───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────

Note: You didn't use the -out option to save this plan, so Terraform can't guarantee to take exactly these actions if
you run "terraform apply" now.

terraform applyを実行します。

C:\s3-tables-iceberg-lab > terraform apply
module.step_functions.data.aws_caller_identity.current: Reading...
module.glue.data.aws_caller_identity.current: Reading...
module.s3_landing.data.aws_caller_identity.current: Reading...
module.s3_tables.data.aws_caller_identity.current: Reading...
module.iam.aws_iam_role.redshift: Refreshing state... [id=iceberg-lab-redshift-role]
module.iam.aws_iam_role.step_functions: Refreshing state... [id=iceberg-lab-sfn-role]
module.iam.aws_iam_role.glue: Refreshing state... [id=iceberg-lab-glue-role]
module.step_functions.aws_sns_topic.pipeline_notification: Refreshing state... [id=arn:aws:sns:ap-northeast-1:{AccountId}:iceberg-lab-pipeline-notification]
module.iam.aws_iam_role.eventbridge: Refreshing state... [id=iceberg-lab-eventbridge-role]
module.s3_tables.aws_s3tables_table_bucket.main: Refreshing state... [name=iceberg-lab]
module.step_functions.data.aws_caller_identity.current: Read complete after 0s [id={AccountId}]
module.s3_tables.data.aws_caller_identity.current: Read complete after 0s [id={AccountId}]
module.glue.data.aws_caller_identity.current: Read complete after 0s [id={AccountId}]
module.glue.aws_s3_bucket.glue_scripts: Refreshing state... [id=iceberg-lab-glue-scripts-{AccountId}]
module.s3_landing.data.aws_caller_identity.current: Read complete after 0s [id={AccountId}]
module.s3_landing.aws_s3_bucket.landing: Refreshing state... [id=iceberg-lab-landing-{AccountId}]
module.step_functions.aws_sns_topic_subscription.email: Refreshing state... [id=arn:aws:sns:ap-northeast-1:{AccountId}:iceberg-lab-pipeline-notification:e0e63b7a-82f3-44ea-8a59-ba38e5f2af88]
module.s3_tables.aws_s3tables_namespace.analytics: Refreshing state...
module.s3_tables.data.aws_iam_policy_document.table_bucket: Reading...
module.s3_tables.data.aws_iam_policy_document.table_bucket: Read complete after 0s [id=3931547792]
module.s3_tables.aws_s3tables_table_bucket_policy.main: Refreshing state...
module.s3_tables.aws_s3tables_table.orders: Refreshing state... [name=orders]
module.s3_tables.aws_s3tables_table.customers: Refreshing state... [name=customers]
module.s3_landing.aws_s3_bucket_versioning.landing: Refreshing state... [id=iceberg-lab-landing-{AccountId}]
module.s3_landing.aws_s3_bucket_notification.landing_eventbridge: Refreshing state... [id=iceberg-lab-landing-{AccountId}]
module.s3_landing.aws_cloudwatch_event_rule.s3_csv_arrived: Refreshing state... [id=iceberg-lab-csv-arrived]
module.glue.aws_s3_object.glue_script: Refreshing state... [id=iceberg-lab-glue-scripts-{AccountId}/scripts/glue_csv_to_iceberg.py]
module.iam.aws_iam_role_policy_attachment.glue_service: Refreshing state... [id=iceberg-lab-glue-role/arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole]
module.iam.aws_iam_role_policy.glue_data_access: Refreshing state... [id=iceberg-lab-glue-role:iceberg-lab-glue-data-access]
module.glue.aws_glue_job.csv_to_iceberg: Refreshing state... [id=iceberg-lab-csv-to-iceberg]
module.iam.aws_iam_role_policy.redshift_lakehouse: Refreshing state... [id=iceberg-lab-redshift-role:lakehouse-access]
module.redshift.aws_redshiftserverless_namespace.main: Refreshing state... [id=iceberg-lab-ns]
module.redshift.aws_redshiftserverless_workgroup.main: Refreshing state... [id=iceberg-lab-wg]
module.iam.aws_iam_role_policy.sfn_glue_sns: Refreshing state... [id=iceberg-lab-sfn-role:glue-and-sns]
module.step_functions.aws_sfn_state_machine.etl_pipeline: Refreshing state... [id=arn:aws:states:ap-northeast-1:{AccountId}:stateMachine:iceberg-lab-etl-pipeline]
module.redshift.aws_redshiftserverless_usage_limit.daily: Refreshing state... [id=57431a2c-1c31-45e0-9a73-2ac3e771c0bc]
module.iam.aws_iam_role_policy.eventbridge_start_sfn: Refreshing state... [id=iceberg-lab-eventbridge-role:start-step-functions]
module.s3_landing.aws_cloudwatch_event_target.step_functions: Refreshing state... [id=iceberg-lab-csv-arrived-terraform-20260225153517622700000001]

Terraform used the selected providers to generate the following execution plan. Resource actions are indicated with the
following symbols:
  + create
  ~ update in-place

Terraform will perform the following actions:

  # module.glue.aws_glue_data_quality_ruleset.orders will be created
  + resource "aws_glue_data_quality_ruleset" "orders" {
      + arn                   = (known after apply)
      + created_on            = (known after apply)
      + description           = "ordersテーブルのデータ品質ルール"
      + id                    = (known after apply)
      + last_modified_on      = (known after apply)
      + name                  = "iceberg-lab-orders-quality-rules"
      + recommendation_run_id = (known after apply)
      + region                = "ap-northeast-1"
      + ruleset               = "Rules = [RowCount > 0, IsComplete \"order_id\", IsUnique \"order_id\", IsComplete \"total_amount\", ColumnValues \"total_amount\" >= 0, ColumnValues \"status\" in [\"completed\",\"pending\",\"processing\",\"cancelled\"], IsComplete \"order_date\", IsComplete \"customer_id\"]"
      + tags_all              = (known after apply)

      + target_table {
          + catalog_id    = "{AccountId}:s3tablescatalog/iceberg-lab"
          + database_name = "analytics"
          + table_name    = "orders"
        }
    }

  # module.glue.aws_glue_job.csv_to_iceberg will be updated in-place
  ~ resource "aws_glue_job" "csv_to_iceberg" {
      ~ default_arguments         = {
          + "--SNS_TOPIC_ARN"                    = "arn:aws:sns:ap-northeast-1:{AccountId}:iceberg-lab-pipeline-notification"
            # (6 unchanged elements hidden)
        }
        id                        = "iceberg-lab-csv-to-iceberg"
        name                      = "iceberg-lab-csv-to-iceberg"
        # (17 unchanged attributes hidden)

        # (2 unchanged blocks hidden)
    }

  # module.glue.aws_s3_object.glue_script will be updated in-place
  ~ resource "aws_s3_object" "glue_script" {
      ~ etag                          = "df9c58f8917007d62a5b4d2fd36d4b23" -> "f5ad76623540211725c61d05595732db"
        id                            = "iceberg-lab-glue-scripts-{AccountId}/scripts/glue_csv_to_iceberg.py"
        tags                          = {}
      + version_id                    = (known after apply)
        # (25 unchanged attributes hidden)
    }

  # module.iam.aws_iam_role_policy.glue_data_access will be updated in-place
  ~ resource "aws_iam_role_policy" "glue_data_access" {
        id          = "iceberg-lab-glue-role:iceberg-lab-glue-data-access"
        name        = "iceberg-lab-glue-data-access"
      ~ policy      = jsonencode(
          ~ {
              ~ Statement = [
                    # (2 unchanged elements hidden)
                    {
                        Action   = [
                            "glue:*",
                            "lakeformation:GetDataAccess",
                        ]
                        Effect   = "Allow"
                        Resource = [
                            "*",
                        ]
                        Sid      = "GlueAndLakeFormation"
                    },
                  + {
                      + Action   = "sns:Publish"
                      + Effect   = "Allow"
                      + Resource = "arn:aws:sns:ap-northeast-1:{AccountId}:iceberg-lab-pipeline-notification"
                      + Sid      = "GlueSnsPublish"
                    },
                ]
                # (1 unchanged attribute hidden)
            }
        )
        # (2 unchanged attributes hidden)
    }

Plan: 1 to add, 3 to change, 0 to destroy.

Do you want to perform these actions?
  Terraform will perform the actions described above.
  Only 'yes' will be accepted to approve.

  Enter a value: yes

module.glue.aws_glue_data_quality_ruleset.orders: Creating...
module.iam.aws_iam_role_policy.glue_data_access: Modifying... [id=iceberg-lab-glue-role:iceberg-lab-glue-data-access]
module.glue.aws_s3_object.glue_script: Modifying... [id=iceberg-lab-glue-scripts-{AccountId}/scripts/glue_csv_to_iceberg.py]
module.glue.aws_glue_job.csv_to_iceberg: Modifying... [id=iceberg-lab-csv-to-iceberg]
module.glue.aws_s3_object.glue_script: Modifications complete after 1s [id=iceberg-lab-glue-scripts-{AccountId}/scripts/glue_csv_to_iceberg.py]
module.glue.aws_glue_job.csv_to_iceberg: Modifications complete after 1s [id=iceberg-lab-csv-to-iceberg]
module.glue.aws_glue_data_quality_ruleset.orders: Creation complete after 1s [id=iceberg-lab-orders-quality-rules]
module.iam.aws_iam_role_policy.glue_data_access: Modifications complete after 1s [id=iceberg-lab-glue-role:iceberg-lab-glue-data-access]

Apply complete! Resources: 1 added, 3 changed, 0 destroyed.

リソースの作成が完了しました。

Step 9:動作確認

Data Qualityを用いて設定したデータの品質チェックの動作を確認します。

9-1. 正常データのテスト

まずは正常なデータを用いた動作確認を行います。

品質チェック合格データ(good_orders.csv)

order_id,customer_id,order_date,product_name,quantity,unit_price,total_amount,status,region,created_at,discount_rate,note
2001,1,2024-06-01,Cloud Storage 1TB,12,29.99,359.88,completed,ap-northeast-1,2024-06-01 10:00:00.000000,0.00,
2002,2,2024-06-05,Compute Instance m5.xl,3,199.99,599.97,completed,ap-northeast-1,2024-06-05 11:30:00.000000,0.00,
2003,3,2024-06-10,Database RDS,1,499.99,499.99,completed,us-east-1,2024-06-10 09:15:00.000000,0.00,

上記CSVファイルをS3にアップロードします。
image.png

パイプラインが起動され、ETLジョブが正常に実行されました。
image.png

品質チェックの結果とETLパイプライン成功のメールがSNSで送信されています。
image.png

image.png

また、glueの出力ログ、コンソール画面から品質チェックの結果を詳細に確認できます。

Reading CSV from: s3://iceberg-lab-landing-{AccountId}/raw/orders/good-orders.csv
Record count: 3
/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py:147: UserWarning: DataFrame constructor is internal. Do not directly use it.

==================================================
DATA QUALITY REPORT
==================================================
Score: 100.0% (8/8 rules passed)
+-------------------------------------------------------------------------+-------+-------------+
|Rule                                                                     |Outcome|FailureReason|
+-------------------------------------------------------------------------+-------+-------------+
|RowCount > 0                                                             |Passed |NULL         |
|IsComplete "order_id"                                                    |Passed |NULL         |
|IsUnique "order_id"                                                      |Passed |NULL         |
|IsComplete "total_amount"                                                |Passed |NULL         |
|ColumnValues "total_amount" >= 0                                         |Passed |NULL         |
|ColumnValues "status" in ["completed","pending","processing","cancelled"]|Passed |NULL         |
|IsComplete "order_date"                                                  |Passed |NULL         |
|IsComplete "customer_id"                                                 |Passed |NULL         |
+-------------------------------------------------------------------------+-------+-------------+

==================================================

Data Quality PASSED. Writing to Iceberg table...
Successfully wrote 3 records to analytics.orders
Running autoDebugger shutdown hook.

image.png

パイプライン実行前と実行後のデータを比較し、品質チェックに合格したデータがIcebergテーブルに挿入されていることを確認します。

実行前のテーブルデータ

select * from orders;

-- 実行結果
order_id  customer_id  order_date	product_name	        quantity  unit_price	total_amount  status	region	        created_at	                discount_rate	note
1001	  1	           2024-06-01	Cloud Storage 1TB	    12	      29.99	        359.88	      refunded	ap-northeast-1	2024-06-01 10:00:00.000000	                返金処理済
1002	  2	           2024-06-05	Compute Instance m5.xl	3	      199.99	    599.97	      completed	ap-northeast-1	2024-06-05 11:30:00.000000		
1003	  3	           2024-06-10	Database RDS	        1	      499.99	    499.99	      completed	us-east-1	    2024-06-10 09:15:00.000000

実行後のデータ

select * from orders;

-- 実行結果
order_id  customer_id  order_date	product_name	        quantity  unit_price   total_amount	status	    region	        created_at	                discount_rate  note
1001	  1	           2024-06-01	Cloud Storage 1TB	    12	      29.99	       359.88	    refunded	ap-northeast-1	2024-06-01 10:00:00.000000		           返金処理済
1002	  2	           2024-06-05	Compute Instance m5.xl	3	      199.99	   599.97	    completed	ap-northeast-1	2024-06-05 11:30:00.000000		
1003	  3	           2024-06-10	Database RDS	        1	      499.99	   499.99	    completed	us-east-1	    2024-06-10 09:15:00.000000		
2001	  1	           2024-06-01	Cloud Storage 1TB	    12	      29.99	       359.88	    completed	ap-northeast-1	2024-06-01 10:00:00.000000	0.00	
2002	  2	           2024-06-05	Compute Instance m5.xl	3	      199.99	   599.97	    completed	ap-northeast-1	2024-06-05 11:30:00.000000	0.00	
2003	  3	           2024-06-10	Database RDS	        1	      499.99	   499.99	    completed	us-east-1	    2024-06-10 09:15:00.000000	0.00

CSVのデータが挿入されていました。

9-2. 不正データのテスト

次には不正なデータを用いた動作確認を行います。品質チェックで100%でない場合、エラーとなり、ETL処理を行わずS3バケットの隔離フォルダに格納されます。

品質チェック不合格データ(重複order_id + マイナス金額 + 不正ステータス)(bad_orders.csv)

order_id,customer_id,order_date,product_name,quantity,unit_price,total_amount,status,region,created_at,discount_rate,note
4001,1,2026-02-28,Cloud Storage 1TB,1,15000.00,15000.00,completed,ap-northeast-1,2026-02-28 10:00:00.000000,0.00,
4001,2,2026-02-28,Compute Instance,1,500.00,-500.00,unknown_status,ap-northeast-1,2026-02-28 11:00:00.000000,0.00,
4003,,2026-02-28,Database RDS,1,8000.00,8000.00,pending,us-east-1,2026-02-28 09:00:00.000000,0.00,

上記CSVファイルをS3にアップロードします。
image.png

パイプラインが起動され、ETLジョブが失敗しました。
image.png

品質チェックの結果とETLパイプライン失敗のメールがSNSで送信されています。
image.png

image.png

glueの出力ログ、コンソール画面から品質チェックの結果を詳細に確認します。50%の割合でルールを合格しています。

Reading CSV from: s3://iceberg-lab-landing-{AccountId}/raw/orders/bad-orders.csv
Record count: 3
/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py:147: UserWarning: DataFrame constructor is internal. Do not directly use it.

==================================================
DATA QUALITY REPORT
==================================================
Score: 50.0% (4/8 rules passed)
+-------------------------------------------------------------------------+-------+-------------------------------------------------------------------+
|Rule                                                                     |Outcome|FailureReason                                                      |
+-------------------------------------------------------------------------+-------+-------------------------------------------------------------------+
|RowCount > 0                                                             |Passed |NULL                                                               |
|IsComplete "order_id"                                                    |Passed |NULL                                                               |
|IsUnique "order_id"                                                      |Failed |Value: 0.3333333333333333 does not meet the constraint requirement!|
|IsComplete "total_amount"                                                |Passed |NULL                                                               |
|ColumnValues "total_amount" >= 0                                         |Failed |Value: -500.0 does not meet the constraint requirement!            |
|ColumnValues "status" in ["completed","pending","processing","cancelled"]|Failed |Value: 0.6666666666666666 does not meet the constraint requirement!|
|IsComplete "order_date"                                                  |Passed |NULL                                                               |
|IsComplete "customer_id"                                                 |Failed |Value: 0.6666666666666666 does not meet the constraint requirement!|
+-------------------------------------------------------------------------+-------+-------------------------------------------------------------------+

==================================================

Running autoDebugger shutdown hook.

image.png

S3の隔離フォルダに不正データファイルが配置されています。
image.png

パイプライン実行前と実行後のデータを比較し、品質チェックに不合格となったデータがIcebergテーブルに挿入されていないことを確認します。

実行前のテーブルデータ

select * from orders;

-- 実行結果
order_id  customer_id  order_date	product_name	        quantity  unit_price   total_amount	status	    region	        created_at	                discount_rate  note
1001	  1	           2024-06-01	Cloud Storage 1TB	    12	      29.99	       359.88	    refunded	ap-northeast-1	2024-06-01 10:00:00.000000		           返金処理済
1002	  2	           2024-06-05	Compute Instance m5.xl	3	      199.99	   599.97	    completed	ap-northeast-1	2024-06-05 11:30:00.000000		
1003	  3	           2024-06-10	Database RDS	        1	      499.99	   499.99	    completed	us-east-1	    2024-06-10 09:15:00.000000		
2001	  1	           2024-06-01	Cloud Storage 1TB	    12	      29.99	       359.88	    completed	ap-northeast-1	2024-06-01 10:00:00.000000	0.00	
2002	  2	           2024-06-05	Compute Instance m5.xl	3	      199.99	   599.97	    completed	ap-northeast-1	2024-06-05 11:30:00.000000	0.00	
2003	  3	           2024-06-10	Database RDS	        1	      499.99	   499.99	    completed	us-east-1	    2024-06-10 09:15:00.000000	0.00

実行後のデータ

select * from orders;

-- 実行結果
order_id  customer_id  order_date	product_name	        quantity  unit_price   total_amount	status	    region	        created_at	                discount_rate  note
1001	  1	           2024-06-01	Cloud Storage 1TB	    12	      29.99	       359.88	    refunded	ap-northeast-1	2024-06-01 10:00:00.000000		           返金処理済
1002	  2	           2024-06-05	Compute Instance m5.xl	3	      199.99	   599.97	    completed	ap-northeast-1	2024-06-05 11:30:00.000000		
1003	  3	           2024-06-10	Database RDS	        1	      499.99	   499.99	    completed	us-east-1	    2024-06-10 09:15:00.000000		
2001	  1	           2024-06-01	Cloud Storage 1TB	    12	      29.99	       359.88	    completed	ap-northeast-1	2024-06-01 10:00:00.000000	0.00	
2002	  2	           2024-06-05	Compute Instance m5.xl	3	      199.99	   599.97	    completed	ap-northeast-1	2024-06-05 11:30:00.000000	0.00	
2003	  3	           2024-06-10	Database RDS	        1	      499.99	   499.99	    completed	us-east-1	    2024-06-10 09:15:00.000000	0.00

差異がないことを確認できました。

まとめ

今回Redshift Serverlessでのレイクハウスクエリ、Glue Data Qualityでの品質検査を検証することができました。以下のように第1弾、第2弾の記事で実装した構成を更に拡張し、レイクハウスとして実際のユースケースに近いものを構築しました。

観点 第1弾 第2弾 第3弾(本記事)
データストレージ S3 Tables + Iceberg
クエリエンジン Athena Redshift Serverless
ETL 手動Glue Job EventBridge → Step Functions → Glue Data Quality品質チェック
通知 SNS(成功/失敗) SNS(品質レポート付き)
IaC Terraform Terraform Terraform

この記事が皆様のお役に立てば幸いです。質問やフィードバックがあれば、コメント欄でお気軽にお寄せください!

参考リンク

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?