はじめに
2023年のre:Inventで発表されたAmazon DynamoDBとAmazon OpenSearch Serviceのzero-ETL統合は、複雑なETLを作り込むことなくお手軽にAmazon OpenSearch ServiceにAmazon DynamoDBの情報を取り込むことができる優れモノだ。
が、お手軽と言いつつ、IAMのアクセス権の設定やAmazon OpenSearch ServiceのYAML定義にはハマりどころが多く、マニュアルも散らばっていてお手軽感が薄かったのでもっとお手軽にするためにIaCで冪等性を高めて動作するようにした。また、今回はAmazon OpenSearchはServerlessで動作をさせる。
基本は最初に貼ったAWS公式のブログの流れで作っていくことにする。
Amazon DynamoDBの準備
Amazon DynamoDBの準備は特に難しいところはない。
AWS公式のブログに記載の通り、DynamoDB StreamsとPoint-in-time-recoveryを有効化しておこう。
server_side_encryption
はお作法としてやっておく。
resource "aws_dynamodb_table" "example" {
name = local.dynamodb_table_name
billing_mode = "PAY_PER_REQUEST"
hash_key = "id"
attribute {
name = "id"
type = "S"
}
stream_enabled = true
stream_view_type = "NEW_AND_OLD_IMAGES"
server_side_encryption {
enabled = true
}
point_in_time_recovery {
enabled = true
}
}
S3バケットの準備
S3バケットは、OpenSearch Ingestionがパイプライン中で一旦データをexportするために使用する。
特に難しい設定はない。S3はデフォルトでオブジェクトの暗号化が有効になっているため、今回は細かく書かないが、必要に応じてKMSのカスタムキーを使うようにしておこう。
resource "aws_s3_bucket" "example" {
bucket = local.s3_bucket_name
}
resource "aws_s3_bucket_ownership_controls" "example" {
bucket = aws_s3_bucket.example.id
rule {
object_ownership = "BucketOwnerEnforced"
}
}
resource "aws_s3_bucket_public_access_block" "example" {
bucket = aws_s3_bucket.example.id
block_public_acls = false
block_public_policy = false
ignore_public_acls = false
restrict_public_buckets = false
}
CloudWatch Logsの準備
CloudWatch LogsはOpenSearch Ingestionのログ出力に使用する。
上手く動作しなかったときにログが無いと詰むため、設定しておこう。
なお、zero-ETL統合に関連したドキュメントには全然登場しないが、AWS公式のデベロッパーガイドではちゃっかりと
名前は、/aws/vendedlogs/OpenSearchIngestion/pipeline-name/audit-logs のようなパスの形式にすることが推奨されます。この形式を使用すれば、/aws/vendedlogs/OpenSearchService/OpenSearchIngestion のような特定のパスを持つすべてのロググループにアクセス権限を付与する、CloudWatch アクセスポリシーの適用が容易になります。
重要
ロググループ名には、プレフィックス vendedlogs を含めます。さもないと作成に失敗します。
とか大事なことが書いてあったりする。今回は上記に従い、以下のように作成する。
resource "aws_cloudwatch_log_group" "example" {
name = "/aws/vendedlogs/OpenSearchIngestion/${local.osis_pipeline_name}/audit-logs"
}
IAMの準備
IAMはAWS公式のデベロッパーガイドに従いosis-pipelines.amazonaws.com
に対して権限を設定していく。後半、最小権限と言うには少し雑な内容になっているがご容赦いただきたい。
なお、local.osis_s3_export_prefix
はこの後、OpenSearch Ingestionの中で同じ値を使う部分があるので変数化しておく方が都合が良い。
S3でKMSのカスタムキーを設定している場合は、KMSへのアクセス権も設定しておく必要があることに留意する。
resource "aws_iam_role" "osis" {
name = local.iam_osis_role_name
assume_role_policy = data.aws_iam_policy_document.osis_assume.json
}
data "aws_iam_policy_document" "osis_assume" {
statement {
effect = "Allow"
actions = [
"sts:AssumeRole",
]
principals {
type = "Service"
identifiers = [
"osis-pipelines.amazonaws.com",
]
}
}
}
resource "aws_iam_role_policy" "osis_custom" {
name = local.iam_osis_policy_name
role = aws_iam_role.osis.name
policy = data.aws_iam_policy_document.osis_custom.json
}
data "aws_iam_policy_document" "osis_custom" {
statement {
sid = "allowRunExportJob"
effect = "Allow"
actions = [
"dynamodb:DescribeTable",
"dynamodb:DescribeContinuousBackups",
"dynamodb:ExportTableToPointInTime",
]
resources = [
aws_dynamodb_table.example.arn,
]
}
statement {
sid = "allowCheckExportjob"
effect = "Allow"
actions = [
"dynamodb:DescribeExport",
]
resources = [
"${aws_dynamodb_table.example.arn}/export/*",
]
}
statement {
sid = "allowReadFromStream"
effect = "Allow"
actions = [
"dynamodb:DescribeStream",
"dynamodb:GetRecords",
"dynamodb:GetShardIterator",
]
resources = [
"${aws_dynamodb_table.example.arn}/stream/*",
]
}
statement {
sid = "allowReadAndWriteToS3ForExport"
effect = "Allow"
actions = [
"s3:GetObject",
"s3:AbortMultipartUpload",
"s3:PutObject",
"s3:PutObjectAcl"
]
resources = [
"${aws_s3_bucket.example.arn}/${local.osis_s3_export_prefix}*",
]
}
statement {
effect = "Allow"
actions = [
"aoss:APIAccessAll",
"aoss:BatchGetCollection",
"aoss:CreateSecurityPolicy",
"aoss:GetSecurityPolicy",
"aoss:UpdateSecurityPolicy",
"es:DescribeDomain",
"es:ESHttp*",
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
]
resources = [
"*",
]
}
}
OpenSearch Serverlessを作成する
OpenSearch Serverlessはコレクションと、暗号化ポリシー、ネットワークポリシー、アクセスポリシーのリソースからなる。
今回は、簡易にするためにネットワークポリシーはパブリックアクセスを有効化するが、必要に応じてVPCエンドポイント経由にしよう。
また、今回は、DynamoDBの情報を別のキーで検索するのを試してみたいので、コレクションタイプをSEARCH
にする。
aws_opensearchserverless_collectionの依存関係
aws_opensearchserverless_collection
にdepends_on
を設定するのは、セキュリティポリシーが設定されていないと作成がエラーになるにもかかわらず依存関係となる情報の設定がコレクション側にないためである。
Resourceの設定
Rules
のResource
は、この文字列にマッチしたコレクション名に適用されるというものである。
異なっていると正しく動作しないため、冪等性を高めるために変数で設定するようにしよう。今回は、変数prefix_short
に、コレクション名の前半を設定している(32文字を超えるとバリデーションエラーになるため、前半部分だけ設定して残りをワイルドカードにした)。
データアクセスポリシーのPrincipal
これは、OpenSearch Dashboardを参照するために自分のユーザをdata.aws_caller_identity.current.arn
で設定しておく。また、osis-pipelines.amazonaws.com
のサービスロールにもアクセス権が必要なので設定をする。
※この辺りがドキュメントからなかなか読み取れなくてパーミッションが通らず苦労した。
resource "aws_opensearchserverless_collection" "example" {
depends_on = [aws_opensearchserverless_security_policy.example_encryption]
name = local.oss_collection_name
type = "SEARCH"
}
resource "aws_opensearchserverless_security_policy" "example_encryption" {
name = local.oss_sec_encryption_policy_name
description = "KMS Encryption"
type = "encryption"
policy = jsonencode({
Rules = [
{
ResourceType = "collection",
Resource = [
"collection/${var.prefix_short}*"
]
}
],
AWSOwnedKey = true
})
}
resource "aws_opensearchserverless_security_policy" "example_network" {
name = local.oss_sec_network_policy_name
description = "Public access"
type = "network"
policy = jsonencode([
{
Description = "Public access to collection and Dashboards endpoint for example collection",
Rules = [
{
ResourceType = "collection",
Resource = [
"collection/${var.prefix_short}*"
]
},
{
ResourceType = "dashboard",
Resource = [
"collection/${var.prefix_short}*"
]
},
],
AllowFromPublic = true
}
])
}
resource "aws_opensearchserverless_access_policy" "example_access_policy" {
name = local.oss_sec_access_policy_name
description = "read and write permissions"
type = "data"
policy = jsonencode([
{
Rules = [
{
ResourceType = "index",
Resource = [
"index/${var.prefix_short}*/*"
],
Permission = [
"aoss:*"
]
},
{
ResourceType = "collection",
Resource = [
"collection/${var.prefix_short}*"
],
Permission = [
"aoss:*"
]
}
],
Principal = [
data.aws_caller_identity.current.arn,
aws_iam_role.osis.arn,
]
}
])
}
OpenSearch Ingestionのパイプラインの設定
OpenSearch Ingestionのパイプラインには、TerraformのCloud Control APIのプロバイダを使用する。
まずは、プロバイダで以下の設定を行おう。
provider "awscc" {
region = "ap-northeast-1"
}
パイプラインではYAML定義が必要だが、冪等性を高めるためにはいろいろリソースを参照しながら設定せねばならず、Terraformのtemplate_file
を使うにしても大変なので、yamlencode
を使用して設定を行う。
上述したとおり、log_publishing_options
はしておかないとエラーになったとき詰むので設定をしておく。
このYAMLはマネージメントコンソールのブループリントを使うとお手軽に設定ができるかと思いきや、OpenSearch Serverlessの場合は、index_typeの制限があり、デフォルトのタイプだと正しく動作しない。これもAWS公式のデベロッパーガイドに
hosts
のオプションがサーバーレスコレクションのエンドポイントである場合は、serverless
のオプションをtrue
に設定する必要があります。さらに、YAML 定義ファイルにindex_type
のオプションが含まれている場合は、これをmanagement_disabled
に設定する必要があります。そうしないと検証に失敗します。
とだけ書かれていて非常につらい。しかもなぜか、management_disabled
を設定してもエラーになるので、ここは未設定にすると動作した……。謎である……。
resource "awscc_osis_pipeline" "example" {
pipeline_name = local.osis_pipeline_name
min_units = 1
max_units = 4
pipeline_configuration_body = yamlencode({
version = "2"
dynamodb-pipeline = {
source = {
dynamodb = {
acknowledgments = true
tables = [
{
table_arn = aws_dynamodb_table.example.arn,
stream = {
start_position = "LATEST"
}
export = {
s3_bucket = aws_s3_bucket.example.bucket,
s3_region = data.aws_region.current.name
s3_prefix = local.osis_s3_export_prefix
}
}
]
aws = {
sts_role_arn = aws_iam_role.osis.arn
region = data.aws_region.current.name
}
}
}
sink = [
{
opensearch = {
hosts = [
aws_opensearchserverless_collection.example.collection_endpoint
]
index = "index_${aws_dynamodb_table.example.name}"
document_id = "$${getMetadata(\"primary_key\")}"
action = "$${getMetadata(\"opensearch_action\")}"
aws = {
sts_role_arn = aws_iam_role.osis.arn
region = data.aws_region.current.name
serverless = true
serverless_options = {
network_policy_name = local.oss_sec_network_policy_name
}
}
}
}
]
}
})
log_publishing_options = {
is_logging_enabled = true
cloudwatch_log_destination = {
log_group = aws_cloudwatch_log_group.example.name
}
}
}
なぜかこのIaCだと、vpc_endpoints
でvpc_options
で毎回差分が出てしまい、ignore_changes
も効かない。差分検知しない方法を募集中。
いざ、動かす!
まずはDynamoDBに以下のような感じでデータを仕込む。
さて、これでOpenSearch Dashboardに移動して、実際に取り込んだ情報がどうなっているかを確認しよう。
今回、ageをNumber型にしているので、betweenで範囲を絞って検索をしてみた。
しっかりとプライマリキー以外での検索ができている!
これでDynamoDBの全文検索を容易にできるようになった!
また、同様の方法でベクトル型のコレクションタイプにもエクスポートができるようなので、BedrockやOpenAIのRAGにDynamoDBの情報を渡せるようになるので、さらに利用の幅がはずだ!