はじめに
Kinesis Data Streamsはストリームをマネージドにスケールしてくれて強力だが、流れ込んできたデータを処理した後に参照するということが苦手(というかKinesis Data Streams単体ではできない)という欠点がある。
参照できるようにするために、Kinesis Data FirehoseでS3に流し込んでバックアップを行い、さらにAthenaで参照できるような処理を自動構築してみよう。
本記事の前提知識としては以下だ。
- Kinesis Data Streamsのサービスをある程度理解している
- S3のサービスをある程度理解している
- Terraformをそこそこ理解している
最後に参照するAthenaについては、以下の記事で作成するデータベースと同じ方法でパーティショニングできるようにしているため、本記事ではAthenaの構築までは含めない。
Well Architectedフレームワークに従うのであれば、セキュリティを強化するためにKMS等によるマネージドサービスのデータを暗号化することが推奨されているので、今回はその部分も構築に組み込んでいく。
Kinesis Data Firehoseに必要なリソース
上記の「はじめに」に記載の要件で作成していくには、以下が必要になる。
CloudWatch Logsは絶対に必要なわけではないが、うまく動かなかったり不測のエラーが発生した際にワケが分からない状態にならないよう、設定しておくほうが無難である。
- S3バケット
- KMS
- CloudWatch Logs
前提となるリソースの準備(IAM以外)
IAMの前に、必要なリソースを一通り揃えよう。
S3バケット
S3バケットはAWSマネージドのKMSキーで暗号化する設定をaws_s3_bucket_server_side_encryption_configuration
で入れておく。
分かりにくいが、この書き方でマネージドなKMSキーが選択される。
あとは、aws_s3_bucket_acl
とaws_s3_bucket_public_access_block
で外部アクセスを禁止しておこう。
################################################################################
# S3 Firehose #
################################################################################
resource "aws_s3_bucket" "example" {
bucket = local.s3_firehose_bucket_name
force_destroy = true
}
resource "aws_s3_bucket_acl" "example" {
bucket = aws_s3_bucket.example.id
acl = "private"
}
resource "aws_s3_bucket_public_access_block" "example" {
bucket = aws_s3_bucket.example.id
block_public_acls = true
block_public_policy = true
ignore_public_acls = true
restrict_public_buckets = true
}
resource "aws_s3_bucket_server_side_encryption_configuration" "example" {
bucket = aws_s3_bucket.example.bucket
rule {
apply_server_side_encryption_by_default {
sse_algorithm = "AES256"
}
bucket_key_enabled = false
}
}
KMS
KMSはマネージドなキーを使う場合はデータソースで参照してあげればよい。
################################################################################
# KMS #
################################################################################
data "aws_kms_alias" "s3" {
name = "alias/aws/s3"
}
CloudWatch Logs
CloudWatch Logsも特段細かい設定をする必要はないが、Firehoseはサービス側でログストリームを作らないので、ロググループとログストリームの両方を作成する。
なお、マネコンから作成すると/aws/kinesisfirehose/
でロググループ名が始まるのと、DestinationDelivery
というストリーム名になるので、今回は命名規則を合わせた。
################################################################################
# CloudWatch Logs #
################################################################################
resource "aws_cloudwatch_log_group" "firehose" {
name = "/aws/kinesisfirehose/${local.firehose_deliverystream_name}"
}
resource "aws_cloudwatch_log_stream" "firehose" {
log_group_name = aws_cloudwatch_log_group.firehose.name
name = "DestinationDelivery"
}
前提となるリソースの準備(IAM)
さて、これらのリソースが作成出来たらIAMロールを作る。Assume Roleを設定するサービス名は"firehose.amazonaws.com
だ。
ログストリームへのアクセスと、S3バケットへのアクセス、S3バケットにアクセスするためのKMSのアクセス権限をポリシーでAllowしてあげよう。
################################################################################
# IAM #
################################################################################
resource "aws_iam_role" "firehose" {
name = local.iam_role_name_firehose
assume_role_policy = data.aws_iam_policy_document.firehose_assume.json
}
data "aws_iam_policy_document" "firehose_assume" {
statement {
effect = "Allow"
actions = [
"sts:AssumeRole",
]
principals {
type = "Service"
identifiers = [
"firehose.amazonaws.com",
]
}
}
}
resource "aws_iam_role_policy" "firehose_custom" {
name = local.iam_policy_name_firehose
role = aws_iam_role.firehose.id
policy = data.aws_iam_policy_document.firehose_custom.json
}
data "aws_iam_policy_document" "firehose_custom" {
statement {
effect = "Allow"
actions = [
"logs:PutLogEvents",
"s3:AbortMultipartUpload",
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:PutObject",
]
resources = [
"*",
]
}
statement {
effect = "Allow"
actions = [
"kms:Decrypt",
"kms:GenerateDataKey",
]
resources = [
data.aws_kms_alias.s3.target_key_arn,
]
condition {
test = "StringEquals"
variable = "kms:ViaService"
values = [
"s3.${data.aws_region.current.name}.amazonaws.com"
]
}
condition {
test = "StringLike"
variable = "kms:EncryptionContext:aws:s3:arn"
values = [
aws_s3_bucket.example.arn,
"${aws_s3_bucket.example.arn}/*",
]
}
}
}
Kinesis Data Firehose
さて、いよいよKinesis Data Firehoseの作成だ。
今回はS3バケットに書き出したいので、destination = "extended_s3"
で以下のように作成する。
buffer_size
とbuffer_interval
は要件に合わせて適切に設定しておこう。
server_side_encryption
では、key_type = "AWS_OWNED_CMK"
にすれば追加の料金はかからないが、セキュリティ要件に合わせてセルフマネージド型のKMSキーを利用しよう。
################################################################################
# Kinesis Data Firehose #
################################################################################
resource "aws_kinesis_firehose_delivery_stream" "example" {
name = local.firehose_deliverystream_name
destination = "extended_s3"
extended_s3_configuration {
role_arn = aws_iam_role.firehose.arn
bucket_arn = aws_s3_bucket.example.arn
buffer_size = 1
buffer_interval = 60
cloudwatch_logging_options {
enabled = true
log_group_name = aws_cloudwatch_log_group.firehose.name
log_stream_name = aws_cloudwatch_log_stream.firehose.name
}
}
server_side_encryption {
enabled = true
key_type = "AWS_OWNED_CMK"
}
}
Kinesis Data Firehoseにデータを流し込む
何をデータソースにしても良いが、お手軽なのはLambdaからPutRecordのAPIを呼び出すことだろう。
Javaの場合は公式の開発者ガイドに従い流し込めば良い。
Typescriptの場合は以下のような感じだ。
今回のポイントは、「JSONの末尾に改行を付与すること」だ。Athenaに食わせるためには、JSONのレコード終端に改行が必要になる(無いとレコードとして認識されない)。Firehose側で改行を入れることも可能だが、そのためにいちいちLambdaを呼ばなければいけないので、シンプルに呼び出し側で付与してしまった方が楽だろう。
import { FirehoseClient, PutRecordCommand } from '@aws-sdk/client-firehose';
const client = new FirehoseClient({ 'ap-northeast-1' });
const command = new PutRecordCommand({
DeliveryStreamName: process.env.FIREHOSE_DELIVERYSTREAM_NAME,
Record: { Data: Buffer.from(JSON.stringify({ name: 'test' }) + '\n') },
});
try {
await client.send(command);
} catch (error) {
throw new Error(error);
}
これで、S3にJSONのレコードがバックアップされるようになった!