0
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

ETL・パイプライン(+モニタリング)

Last updated at Posted at 2022-01-14

ETL・パイプライン(+モニタリング)

フロー:DataLake → Glue → Data Ware House
image.png

DataLake(S3)に蓄積した(CSV)ファイルを、Glueで加工・修正し、Data Ware House(Athena/Redshift)へ連携します。
※Athena/Redshiftと記載しているのは、初期はAthenaで構築し、将来的にRedshiftに切り替える想定のためです。Redshiftの利用コストが高いためです。
最後に、Athena/Redshiftで抽出・集計したテーブルデータをQuickSightで可視化します。
順に紹介していきます。

DataLake(S3)

S3バケットにkaggleのタイタニックのデータセット(訓練データ)をアップロードします。
image.png

terraform/modules/s3/s3.tf
# ----------------------------------
# S3 private bucket3 for data
# ----------------------------------
resource "aws_s3_bucket" "s3-private-bucket-data" {
  bucket = "${var.project}-${var.enviroment}-private-bucket-data "
  acl    = "private"

  # Manege version of S3 source
  versioning {
    enabled = false
  }
  # Encryption
  server_side_encryption_configuration {
    rule {
      apply_server_side_encryption_by_default {
        sse_algorithm = "AES256"
      }
    }
  }
  # Delete rule
  lifecycle {
    prevent_destroy = false
  }
}
# Create directry
resource "aws_s3_bucket_object" "s3-private-bucket-data-object1" {
  key    = "athena/tmp/"
  bucket = aws_s3_bucket.s3-private-bucket3.id
  force_destroy = true
}
resource "aws_s3_bucket_object" "s3-private-bucket-data-object2" {
  key    = "athena/pure/"
  bucket = aws_s3_bucket.s3-private-bucket-data.id
  force_destroy = true
}
resource "aws_s3_bucket_object" "s3-private-bucket-data-object3" {
  key    = "athena/missing_proc/"
  bucket = aws_s3_bucket.s3-private-bucket-data.id
  force_destroy = true
}
resource "aws_s3_bucket_object" "s3-private-bucket-data-object4" {
  key    = "athena/cleansing_proc/"
  bucket = aws_s3_bucket.s3-private-bucket-data.id
  force_destroy = true
}
resource "aws_s3_bucket_object" "s3-private-bucket-data-object5" {
  key    = "athena_result/cleansing_proc/"
  bucket = aws_s3_bucket.s3-private-bucket-data.id
  force_destroy = true
}
# Access block
resource "aws_s3_bucket_public_access_block" "s3-private-bucket-data" {
  bucket                  = aws_s3_bucket.s3-private-bucket-data.id
  block_public_acls       = true
  block_public_policy     = true
  ignore_public_acls      = true
  restrict_public_buckets = true
}

resource "aws_s3_bucket"で、S3バケットの設定を定義します。
resource "aws_s3_bucket_object"で、構築したS3バケット内のフォルダ名を定義します。
※自動でフォルダを作成してくれます。
resource "aws_s3_bucket_public_access_block"で、パブリックアクセスのブロック設定をします。
コンソール画面の下記に該当します。
image.png

Glue(Glue DataCatalog & Job)

Glueテーブルは、Glue内で保持するスキーマ、インデックスなどを指しています。
GlueのETLジョブの作成と監視に使用します。
image.png
ETLジョブの中身は、上記のスクリプトです。
今回はPythonで記載しています。
S3上にプッシュした「cleansing_proc.py」を(Terraformで)読み込ませています。
image.png

ETLジョブの実行状況も確認することができます。
image.png

ワークフローへは、トリガーを設定し決められた条件で、先ほどの各ETLジョブが実行するよう設定しています。
image.png

terraform/modules/gleu.tf
# ----------------------------------
# IAM role
# ----------------------------------
resource "aws_iam_role" "role_glue" {
  name               = "role_glue"
  assume_role_policy = <<-EOF
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": "glue.amazonaws.com"
          },
          "Action": "sts:AssumeRole"
        }
      ]
    }
  EOF
}
resource "aws_iam_instance_profile" "instance_profile_glue" {
  name = "instance_profile_glue"
  role = aws_iam_role.role_glue.name
}
# ----------------------------------
# IAM policy
# ----------------------------------
resource "aws_iam_policy_attachment" "glue_lambda_exe" {
  name       = "AWSLambda_FullAccess"
  policy_arn = "arn:aws:iam::aws:policy/AWSLambda_FullAccess"
  roles      = ["${aws_iam_role.role_glue.name}"]
}
resource "aws_iam_role_policy" "role_policy_glue" {
  name = "role_policy_glue"
  role = aws_iam_role.role_glue.id

  policy = <<-EOF
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "glue:*",
                "s3:GetBucketLocation",
                "s3:ListBucket",
                "s3:ListAllMyBuckets",
                "s3:GetBucketAcl"

            ],
            "Resource": [
                "*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:CreateBucket"
            ],
            "Resource": [
                "arn:aws:s3:::aws-glue-*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:PutObject",
                "s3:DeleteObject"
            ],
            "Resource": [
                "arn:aws:s3:::aws-glue-*/*",
                "arn:aws:s3:::*/*aws-glue-*/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents",
                "logs:CreateExportTask",
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents",
                "logs:CreateExportTask",
                "logs:DescribeLogGroups"
            ],
            "Resource": [
                "arn:aws:logs:*:*:/aws-glue/*"
            ]
        }
    ]
}
  EOF
}

Glueを実行する際のロールの設定定義部分です。
(だいぶ大雑把な設定ですが)主にGlueから接続するS3バケットに対するロールと、実行ログをCloudWatchへ吐き出すためのロールを設定しています。

terraform/modules/gleu.tf
# ----------------------------------
# Data catalog create
# ----------------------------------
resource "aws_glue_catalog_database" "database_glue_test" {
  name = "database_glue_test"
}
# ----------------------------------
# Data catalog table for athena
# ----------------------------------
resource "aws_glue_catalog_table" "titanic_train" {
  database_name = aws_glue_catalog_database.glue_catalog_database_name.name
  name          = var.glue_catalog_table_name
  table_type    = "EXTERNAL_TABLE"
  storage_descriptor {
    location      = "s3://${var.glue_job_bucket}/athena/cleansing_proc/"
    input_format  = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"
    output_format = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"
    ser_de_info {
      serialization_library = "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
      parameters = {
        "serialization.format" = "1"
      }
    }
    columns {
      name = "PassengerId"
      type = "string"
    }
    columns {
      name = "Survived"
      type = "string"
    }
    columns {
      name = "Pclass"
      type = "string"
    }
    columns {
      name = "Name"
      type = "string"
    }
    columns {
      name = "Sex"
      type = "string"
    }
    columns {
      name = "Age"
      type = "string"
    }
    columns {
      name = "SibSp"
      type = "string"
    }
    columns {
      name = "Parch"
      type = "string"
    }
    columns {
      name = "Ticket"
      type = "string"
    }
    columns {
      name = "Fare"
      type = "string"
    }
    columns {
      name = "Cabin"
      type = "string"
    }
    columns {
      name = "Embarked"
      type = "string"
    }
  }
  partition_keys {
    name = "orderdate"
    type = "string"
  }
}

Data catalogのデータベースとテーブルの定義部分です。
resource "aws_glue_catalog_database"が、データベースの定義部分です。
resource "aws_glue_catalog_table"で、テーブルの細かな設定をしています。

※参考(Terraform公式)

terraform/modules/gleu.tf
# ----------------------------------
# ETL job missing value process
# ----------------------------------
resource "aws_glue_job" "glue_job_test_cleansing" {
  name     = "glue_job_test_cleansing"
  role_arn = aws_iam_role.role_glue.arn
  command {
    script_location = "s3://${var.s3_bucket_2}/python_shell/cleansing_proc.py"
    python_version  = 3
  } 
}

ETLジョブの設定定義です。
ARNは、先ほど定義した「aws_iam_role.role_glue」を付与しています。
script_locationで、S3にプッシュした「cleansing_proc.py」を取り込む設定しています。
python_versionで、python3を指定しています。

※参考(Terraform公式)

terraform/modules/gleu.tf
# ----------------------------------
# ETL job workflow
# ----------------------------------
resource "aws_glue_workflow" "glue_job_test_workflow" {
  name = "glue_job_test_workflow"
}
resource "aws_glue_trigger" "trigger" {
  name     = "glue_job_test_workflow_start"
  schedule = "cron(0/60 * * * ? *)"
  type     = "SCHEDULED"
  workflow_name = aws_glue_workflow.glue_job_test_workflow.name
  actions {
    job_name = aws_glue_job.glue_job_test_missing.name
  }
  actions {
    job_name = aws_glue_job.glue_job_test_cleansing.name
  }
}

ワークフローと、ワークフロー内のトリガーの設定定義です。
トリガーは、スケジュール起動モードで、1時間(60分:cron(0/60 * * * ? *))ごとの起動設定にしています。

※参考(Terraform公式)

Data Ware House(Athena/Redshift)

Glueで加工・修正したtitanic_trainテーブルのデータを、Athenaで抽出します。
kaggleのタイタニックのデータセットが表示されていることがわかります。
image.png

terraform/modules/athena/athena.tf
# ----------------------------------
# Athena database
# ----------------------------------
resource "aws_athena_database" "athena_glue_test" {
  name   = var.athena_database_name
  bucket = var.log_bucket_name
}
# ----------------------------------
# Athena workgroup
# ----------------------------------
resource "aws_athena_workgroup" "glue_test_athena_workgroup" {
  name = "glue_test_athena_workgroup"
  configuration {
    enforce_workgroup_configuration    = true
    publish_cloudwatch_metrics_enabled = false
    result_configuration {
      output_location = "s3://terraform-development-private-bucket-data/athena_result/cleansing_proc/"
    }
  }
}
# ----------------------------------
# Athena query
# ----------------------------------
data "template_file" "create_table_sql" {
  template = file("./src/queries/create_table.sql")
  vars = {
    athena_database_name = aws_athena_database.athena_glue_test.name
    athena_table_name    = var.athena_table_name
    log_bucket_name      = var.athena_log_bucket_name
  }
}
resource "aws_athena_named_query" "create_table" {
  name        = "Create table"
  workgroup   = aws_athena_workgroup.glue_test_athena_workgroup.id
  database    = aws_athena_database.athena_glue_test.name
  query       = data.template_file.create_table_sql.rendered
}

resource "aws_athena_database"は、Athenaのデータベース定義です。
resource "aws_athena_workgroup"は、Athenaのワークグループの定義です。
クエリ実行時、事後の設定や、クエリ自体の設定をしています。
data "template_file"で、S3にプッシュしたsqlファイルを取り込む設定をしています。
resource "aws_athena_named_query"で、ワークグループ・データベース・クエリを結びつけ、保存する設定をしています。よく使うクエリは保存しておくとよいでしょう。

※参考(Terraform公式)

QuickSight(BI)

最後にAthenaのデータを読み込み、QuickSightでグラフ描画しています。
画面上でグラフを作成しています。
image.png
※Terraformでも設定等はできるのですが、今回は管理対象外とした為、詳細は割愛させていただきます。

●実行ファイルとフロー
上記のフローで(main.tfを起点として)terraform apllyコマンドを実行します。
image.png

terraform/main.tf
# ----------------------------------
# S3
# ----------------------------------
module "S3" {
  source     = "./modules/s3"
  project    = var.project
  enviroment = var.enviroment
  region     = var.region
}
# ----------------------------------
# Glue
# ----------------------------------
module "glue" {
  source                     = "./modules/glue"
  glue_catalog_database_name = var.glue_catalog_database_name
  glue_catalog_table_name    = var.glue_catalog_table_name
  athena_result_bucket_name  = var.athena_result_bucket_name
  connect_athena_password    = var.connect_athena_password
  connect_athena_username    = var.connect_athena_username
  glue_job_python_bucket     = var.glue_job_python_bucket
  s3_bucket_2                = var.s3_bucket_2
  s3_bucket2_path            = var.s3_bucket2_path
  glue_job_bucket            = var.glue_job_bucket
  python_dir_name            = "var.python_dir_name"
}
# ----------------------------------
# Athena
# ----------------------------------
module "athena" {
  source                    = "./modules/athena"
  athena_database_name      = var.athena_database_name
  athena_table_name         = var.athena_table_name
  log_bucket_name           = var.log_bucket_name
  athena_result_bucket_name = var.athena_result_bucket_name
  athena_log_bucket_name    = var.athena_log_bucket_name
}

「S3, Glue, Athena」の各モジュールを呼び出しています。
呼び出し時にそれぞれ引数を渡しています。
引数の実体(値)は、terraform.tfvars等から参照しています。

索引ページ

下記でTerraformに関するテーマをまとめて紹介しています。

0
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?