0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

AWS Lambda と ServerlessAdvent Calendar 2023

Day 3

Amazon DynamoDBとAmazon OpenSearch Serverlessのzero-ETL統合をTerraformで自動構築する

Last updated at Posted at 2023-12-03

はじめに

2023年のre:Inventで発表されたAmazon DynamoDBとAmazon OpenSearch Serviceのzero-ETL統合は、複雑なETLを作り込むことなくお手軽にAmazon OpenSearch ServiceにAmazon DynamoDBの情報を取り込むことができる優れモノだ。

が、お手軽と言いつつ、IAMのアクセス権の設定やAmazon OpenSearch ServiceのYAML定義にはハマりどころが多く、マニュアルも散らばっていてお手軽感が薄かったのでもっとお手軽にするためにIaCで冪等性を高めて動作するようにした。また、今回はAmazon OpenSearchはServerlessで動作をさせる。

基本は最初に貼ったAWS公式のブログの流れで作っていくことにする。

Amazon DynamoDBの準備

Amazon DynamoDBの準備は特に難しいところはない。
AWS公式のブログに記載の通り、DynamoDB StreamsとPoint-in-time-recoveryを有効化しておこう。
server_side_encryptionはお作法としてやっておく。

resource "aws_dynamodb_table" "example" {
  name         = local.dynamodb_table_name
  billing_mode = "PAY_PER_REQUEST"
  hash_key     = "id"

  attribute {
    name = "id"
    type = "S"
  }

  stream_enabled   = true
  stream_view_type = "NEW_AND_OLD_IMAGES"

  server_side_encryption {
    enabled = true
  }

  point_in_time_recovery {
    enabled = true
  }
}

S3バケットの準備

S3バケットは、OpenSearch Ingestionがパイプライン中で一旦データをexportするために使用する。
特に難しい設定はない。S3はデフォルトでオブジェクトの暗号化が有効になっているため、今回は細かく書かないが、必要に応じてKMSのカスタムキーを使うようにしておこう。

resource "aws_s3_bucket" "example" {
  bucket = local.s3_bucket_name
}

resource "aws_s3_bucket_ownership_controls" "example" {
  bucket = aws_s3_bucket.example.id

  rule {
    object_ownership = "BucketOwnerEnforced"
  }
}

resource "aws_s3_bucket_public_access_block" "example" {
  bucket = aws_s3_bucket.example.id

  block_public_acls       = false
  block_public_policy     = false
  ignore_public_acls      = false
  restrict_public_buckets = false
}

CloudWatch Logsの準備

CloudWatch LogsはOpenSearch Ingestionのログ出力に使用する。
上手く動作しなかったときにログが無いと詰むため、設定しておこう。

なお、zero-ETL統合に関連したドキュメントには全然登場しないが、AWS公式のデベロッパーガイドではちゃっかりと

名前は、/aws/vendedlogs/OpenSearchIngestion/pipeline-name/audit-logs のようなパスの形式にすることが推奨されます。この形式を使用すれば、/aws/vendedlogs/OpenSearchService/OpenSearchIngestion のような特定のパスを持つすべてのロググループにアクセス権限を付与する、CloudWatch アクセスポリシーの適用が容易になります。

重要
ロググループ名には、プレフィックス vendedlogs を含めます。さもないと作成に失敗します。

とか大事なことが書いてあったりする。今回は上記に従い、以下のように作成する。

resource "aws_cloudwatch_log_group" "example" {
  name = "/aws/vendedlogs/OpenSearchIngestion/${local.osis_pipeline_name}/audit-logs"
}

IAMの準備

IAMはAWS公式のデベロッパーガイドに従いosis-pipelines.amazonaws.comに対して権限を設定していく。後半、最小権限と言うには少し雑な内容になっているがご容赦いただきたい。

なお、local.osis_s3_export_prefixはこの後、OpenSearch Ingestionの中で同じ値を使う部分があるので変数化しておく方が都合が良い。

S3でKMSのカスタムキーを設定している場合は、KMSへのアクセス権も設定しておく必要があることに留意する。

resource "aws_iam_role" "osis" {
  name               = local.iam_osis_role_name
  assume_role_policy = data.aws_iam_policy_document.osis_assume.json
}

data "aws_iam_policy_document" "osis_assume" {
  statement {
    effect = "Allow"

    actions = [
      "sts:AssumeRole",
    ]

    principals {
      type = "Service"
      identifiers = [
        "osis-pipelines.amazonaws.com",
      ]
    }
  }
}

resource "aws_iam_role_policy" "osis_custom" {
  name   = local.iam_osis_policy_name
  role   = aws_iam_role.osis.name
  policy = data.aws_iam_policy_document.osis_custom.json
}

data "aws_iam_policy_document" "osis_custom" {
  statement {
    sid = "allowRunExportJob"

    effect = "Allow"

    actions = [
      "dynamodb:DescribeTable",
      "dynamodb:DescribeContinuousBackups",
      "dynamodb:ExportTableToPointInTime",
    ]

    resources = [
      aws_dynamodb_table.example.arn,
    ]
  }

  statement {
    sid = "allowCheckExportjob"

    effect = "Allow"

    actions = [
      "dynamodb:DescribeExport",
    ]

    resources = [
      "${aws_dynamodb_table.example.arn}/export/*",
    ]
  }

  statement {
    sid = "allowReadFromStream"

    effect = "Allow"

    actions = [
      "dynamodb:DescribeStream",
      "dynamodb:GetRecords",
      "dynamodb:GetShardIterator",
    ]

    resources = [
      "${aws_dynamodb_table.example.arn}/stream/*",
    ]
  }

  statement {
    sid = "allowReadAndWriteToS3ForExport"

    effect = "Allow"

    actions = [
      "s3:GetObject",
      "s3:AbortMultipartUpload",
      "s3:PutObject",
      "s3:PutObjectAcl"
    ]

    resources = [
      "${aws_s3_bucket.example.arn}/${local.osis_s3_export_prefix}*",
    ]
  }

  statement {
    effect = "Allow"

    actions = [
      "aoss:APIAccessAll",
      "aoss:BatchGetCollection",
      "aoss:CreateSecurityPolicy",
      "aoss:GetSecurityPolicy",
      "aoss:UpdateSecurityPolicy",
      "es:DescribeDomain",
      "es:ESHttp*",
      "logs:CreateLogGroup",
      "logs:CreateLogStream",
      "logs:PutLogEvents",
    ]

    resources = [
      "*",
    ]
  }
}

OpenSearch Serverlessを作成する

OpenSearch Serverlessはコレクションと、暗号化ポリシー、ネットワークポリシー、アクセスポリシーのリソースからなる。
今回は、簡易にするためにネットワークポリシーはパブリックアクセスを有効化するが、必要に応じてVPCエンドポイント経由にしよう。

また、今回は、DynamoDBの情報を別のキーで検索するのを試してみたいので、コレクションタイプをSEARCHにする。

aws_opensearchserverless_collectionの依存関係
aws_opensearchserverless_collectiondepends_onを設定するのは、セキュリティポリシーが設定されていないと作成がエラーになるにもかかわらず依存関係となる情報の設定がコレクション側にないためである。

Resourceの設定
RulesResourceは、この文字列にマッチしたコレクション名に適用されるというものである。
異なっていると正しく動作しないため、冪等性を高めるために変数で設定するようにしよう。今回は、変数prefix_shortに、コレクション名の前半を設定している(32文字を超えるとバリデーションエラーになるため、前半部分だけ設定して残りをワイルドカードにした)。

データアクセスポリシーのPrincipal
これは、OpenSearch Dashboardを参照するために自分のユーザをdata.aws_caller_identity.current.arnで設定しておく。また、osis-pipelines.amazonaws.comのサービスロールにもアクセス権が必要なので設定をする。
※この辺りがドキュメントからなかなか読み取れなくてパーミッションが通らず苦労した。

resource "aws_opensearchserverless_collection" "example" {
  depends_on = [aws_opensearchserverless_security_policy.example_encryption]

  name = local.oss_collection_name
  type = "SEARCH"
}

resource "aws_opensearchserverless_security_policy" "example_encryption" {
  name        = local.oss_sec_encryption_policy_name
  description = "KMS Encryption"

  type = "encryption"

  policy = jsonencode({
    Rules = [
      {
        ResourceType = "collection",
        Resource = [
          "collection/${var.prefix_short}*"
        ]
      }
    ],
    AWSOwnedKey = true
  })
}

resource "aws_opensearchserverless_security_policy" "example_network" {
  name        = local.oss_sec_network_policy_name
  description = "Public access"

  type = "network"

  policy = jsonencode([
    {
      Description = "Public access to collection and Dashboards endpoint for example collection",
      Rules = [
        {
          ResourceType = "collection",
          Resource = [
            "collection/${var.prefix_short}*"
          ]
        },
        {
          ResourceType = "dashboard",
          Resource = [
            "collection/${var.prefix_short}*"
          ]
        },
      ],
      AllowFromPublic = true
    }
  ])
}

resource "aws_opensearchserverless_access_policy" "example_access_policy" {
  name        = local.oss_sec_access_policy_name
  description = "read and write permissions"

  type = "data"

  policy = jsonencode([
    {
      Rules = [
        {
          ResourceType = "index",
          Resource = [
            "index/${var.prefix_short}*/*"
          ],
          Permission = [
            "aoss:*"
          ]
        },
        {
          ResourceType = "collection",
          Resource = [
            "collection/${var.prefix_short}*"
          ],
          Permission = [
            "aoss:*"
          ]
        }
      ],
      Principal = [
        data.aws_caller_identity.current.arn,
        aws_iam_role.osis.arn,
      ]
    }
  ])
}

OpenSearch Ingestionのパイプラインの設定

OpenSearch Ingestionのパイプラインには、TerraformのCloud Control APIのプロバイダを使用する。

まずは、プロバイダで以下の設定を行おう。

provider "awscc" {
  region = "ap-northeast-1"
}

パイプラインではYAML定義が必要だが、冪等性を高めるためにはいろいろリソースを参照しながら設定せねばならず、Terraformのtemplate_fileを使うにしても大変なので、yamlencodeを使用して設定を行う。

上述したとおり、log_publishing_optionsはしておかないとエラーになったとき詰むので設定をしておく。

このYAMLはマネージメントコンソールのブループリントを使うとお手軽に設定ができるかと思いきや、OpenSearch Serverlessの場合は、index_typeの制限があり、デフォルトのタイプだと正しく動作しない。これもAWS公式のデベロッパーガイド

hosts のオプションがサーバーレスコレクションのエンドポイントである場合は、serverless のオプションを true に設定する必要があります。さらに、YAML 定義ファイルに index_type のオプションが含まれている場合は、これを management_disabled に設定する必要があります。そうしないと検証に失敗します。

とだけ書かれていて非常につらい。しかもなぜか、management_disabledを設定してもエラーになるので、ここは未設定にすると動作した……。謎である……。

resource "awscc_osis_pipeline" "example" {
  pipeline_name = local.osis_pipeline_name
  min_units     = 1
  max_units     = 4

  pipeline_configuration_body = yamlencode({
    version = "2"
    dynamodb-pipeline = {
      source = {
        dynamodb = {
          acknowledgments = true
          tables = [
            {
              table_arn = aws_dynamodb_table.example.arn,
              stream = {
                start_position = "LATEST"
              }
              export = {
                s3_bucket = aws_s3_bucket.example.bucket,
                s3_region = data.aws_region.current.name
                s3_prefix = local.osis_s3_export_prefix
              }
            }
          ]
          aws = {
            sts_role_arn = aws_iam_role.osis.arn
            region       = data.aws_region.current.name
          }
        }
      }
      sink = [
        {
          opensearch = {
            hosts = [
              aws_opensearchserverless_collection.example.collection_endpoint
            ]
            index       = "index_${aws_dynamodb_table.example.name}"
            document_id = "$${getMetadata(\"primary_key\")}"
            action      = "$${getMetadata(\"opensearch_action\")}"
            aws = {
              sts_role_arn = aws_iam_role.osis.arn
              region       = data.aws_region.current.name
              serverless   = true
              serverless_options = {
                network_policy_name = local.oss_sec_network_policy_name
              }
            }
          }
        }
      ]
    }
  })

  log_publishing_options = {
    is_logging_enabled = true
    cloudwatch_log_destination = {
      log_group = aws_cloudwatch_log_group.example.name
    }
  }
}

なぜかこのIaCだと、vpc_endpointsvpc_optionsで毎回差分が出てしまい、ignore_changesも効かない。差分検知しない方法を募集中。

いざ、動かす!

まずはDynamoDBに以下のような感じでデータを仕込む。

image.png

さて、これでOpenSearch Dashboardに移動して、実際に取り込んだ情報がどうなっているかを確認しよう。

image.png

今回、ageをNumber型にしているので、betweenで範囲を絞って検索をしてみた。
しっかりとプライマリキー以外での検索ができている!
これでDynamoDBの全文検索を容易にできるようになった!

また、同様の方法でベクトル型のコレクションタイプにもエクスポートができるようなので、BedrockやOpenAIのRAGにDynamoDBの情報を渡せるようになるので、さらに利用の幅がはずだ!

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?