2
2

More than 1 year has passed since last update.

Amazon Kinesis Data FirehoseでS3に情報をバックアップする

Posted at

はじめに

Kinesis Data Streamsはストリームをマネージドにスケールしてくれて強力だが、流れ込んできたデータを処理した後に参照するということが苦手(というかKinesis Data Streams単体ではできない)という欠点がある。
参照できるようにするために、Kinesis Data FirehoseでS3に流し込んでバックアップを行い、さらにAthenaで参照できるような処理を自動構築してみよう。

本記事の前提知識としては以下だ。

  • Kinesis Data Streamsのサービスをある程度理解している
  • S3のサービスをある程度理解している
  • Terraformをそこそこ理解している

最後に参照するAthenaについては、以下の記事で作成するデータベースと同じ方法でパーティショニングできるようにしているため、本記事ではAthenaの構築までは含めない。

Well Architectedフレームワークに従うのであれば、セキュリティを強化するためにKMS等によるマネージドサービスのデータを暗号化することが推奨されているので、今回はその部分も構築に組み込んでいく。

Kinesis Data Firehoseに必要なリソース

上記の「はじめに」に記載の要件で作成していくには、以下が必要になる。
CloudWatch Logsは絶対に必要なわけではないが、うまく動かなかったり不測のエラーが発生した際にワケが分からない状態にならないよう、設定しておくほうが無難である。

  • S3バケット
  • KMS
  • CloudWatch Logs

前提となるリソースの準備(IAM以外)

IAMの前に、必要なリソースを一通り揃えよう。

S3バケット

S3バケットはAWSマネージドのKMSキーで暗号化する設定をaws_s3_bucket_server_side_encryption_configurationで入れておく。
分かりにくいが、この書き方でマネージドなKMSキーが選択される。
あとは、aws_s3_bucket_aclaws_s3_bucket_public_access_blockで外部アクセスを禁止しておこう。

################################################################################
# S3 Firehose                                                                  #
################################################################################
resource "aws_s3_bucket" "example" {
  bucket = local.s3_firehose_bucket_name

  force_destroy = true
}

resource "aws_s3_bucket_acl" "example" {
  bucket = aws_s3_bucket.example.id
  acl    = "private"
}

resource "aws_s3_bucket_public_access_block" "example" {
  bucket = aws_s3_bucket.example.id

  block_public_acls       = true
  block_public_policy     = true
  ignore_public_acls      = true
  restrict_public_buckets = true
}

resource "aws_s3_bucket_server_side_encryption_configuration" "example" {
  bucket = aws_s3_bucket.example.bucket

  rule {
    apply_server_side_encryption_by_default {
      sse_algorithm = "AES256"
    }

    bucket_key_enabled = false
  }
}

KMS

KMSはマネージドなキーを使う場合はデータソースで参照してあげればよい。

################################################################################
# KMS                                                                          #
################################################################################
data "aws_kms_alias" "s3" {
  name = "alias/aws/s3"
}

CloudWatch Logs

CloudWatch Logsも特段細かい設定をする必要はないが、Firehoseはサービス側でログストリームを作らないので、ロググループとログストリームの両方を作成する。
なお、マネコンから作成すると/aws/kinesisfirehose/でロググループ名が始まるのと、DestinationDeliveryというストリーム名になるので、今回は命名規則を合わせた。

################################################################################
# CloudWatch Logs                                                        #
################################################################################
resource "aws_cloudwatch_log_group" "firehose" {
  name              = "/aws/kinesisfirehose/${local.firehose_deliverystream_name}"
}

resource "aws_cloudwatch_log_stream" "firehose" {
  log_group_name = aws_cloudwatch_log_group.firehose.name
  name           = "DestinationDelivery"
}

前提となるリソースの準備(IAM)

さて、これらのリソースが作成出来たらIAMロールを作る。Assume Roleを設定するサービス名は"firehose.amazonaws.comだ。
ログストリームへのアクセスと、S3バケットへのアクセス、S3バケットにアクセスするためのKMSのアクセス権限をポリシーでAllowしてあげよう。

################################################################################
# IAM                                                                          #
################################################################################
resource "aws_iam_role" "firehose" {
  name               = local.iam_role_name_firehose
  assume_role_policy = data.aws_iam_policy_document.firehose_assume.json
}

data "aws_iam_policy_document" "firehose_assume" {
  statement {
    effect = "Allow"

    actions = [
      "sts:AssumeRole",
    ]

    principals {
      type = "Service"
      identifiers = [
        "firehose.amazonaws.com",
      ]
    }
  }
}

resource "aws_iam_role_policy" "firehose_custom" {
  name   = local.iam_policy_name_firehose
  role   = aws_iam_role.firehose.id
  policy = data.aws_iam_policy_document.firehose_custom.json
}

data "aws_iam_policy_document" "firehose_custom" {
  statement {
    effect = "Allow"

    actions = [
      "logs:PutLogEvents",
      "s3:AbortMultipartUpload",
      "s3:GetBucketLocation",
      "s3:GetObject",
      "s3:ListBucket",
      "s3:ListBucketMultipartUploads",
      "s3:PutObject",
    ]

    resources = [
      "*",
    ]
  }
  statement {
    effect = "Allow"

    actions = [
      "kms:Decrypt",
      "kms:GenerateDataKey",
    ]

    resources = [
      data.aws_kms_alias.s3.target_key_arn,
    ]

    condition {
      test     = "StringEquals"
      variable = "kms:ViaService"
      values = [
        "s3.${data.aws_region.current.name}.amazonaws.com"
      ]
    }

    condition {
      test     = "StringLike"
      variable = "kms:EncryptionContext:aws:s3:arn"
      values = [
        aws_s3_bucket.example.arn,
        "${aws_s3_bucket.example.arn}/*",
      ]
    }
  }
}

Kinesis Data Firehose

さて、いよいよKinesis Data Firehoseの作成だ。
今回はS3バケットに書き出したいので、destination = "extended_s3"で以下のように作成する。
buffer_sizebuffer_intervalは要件に合わせて適切に設定しておこう。
server_side_encryptionでは、key_type = "AWS_OWNED_CMK"にすれば追加の料金はかからないが、セキュリティ要件に合わせてセルフマネージド型のKMSキーを利用しよう。

################################################################################
# Kinesis Data Firehose                                                        #
################################################################################
resource "aws_kinesis_firehose_delivery_stream" "example" {
  name        = local.firehose_deliverystream_name
  destination = "extended_s3"

  extended_s3_configuration {
    role_arn        = aws_iam_role.firehose.arn
    bucket_arn      = aws_s3_bucket.example.arn
    buffer_size     = 1
    buffer_interval = 60

    cloudwatch_logging_options {
      enabled         = true
      log_group_name  = aws_cloudwatch_log_group.firehose.name
      log_stream_name = aws_cloudwatch_log_stream.firehose.name
    }
  }

  server_side_encryption {
    enabled  = true
    key_type = "AWS_OWNED_CMK"
  }
}

Kinesis Data Firehoseにデータを流し込む

何をデータソースにしても良いが、お手軽なのはLambdaからPutRecordのAPIを呼び出すことだろう。

Javaの場合は公式の開発者ガイドに従い流し込めば良い。

Typescriptの場合は以下のような感じだ。

今回のポイントは、「JSONの末尾に改行を付与すること」だ。Athenaに食わせるためには、JSONのレコード終端に改行が必要になる(無いとレコードとして認識されない)。Firehose側で改行を入れることも可能だが、そのためにいちいちLambdaを呼ばなければいけないので、シンプルに呼び出し側で付与してしまった方が楽だろう。

import { FirehoseClient, PutRecordCommand } from '@aws-sdk/client-firehose';

const client = new FirehoseClient({ 'ap-northeast-1' });

const command = new PutRecordCommand({
  DeliveryStreamName: process.env.FIREHOSE_DELIVERYSTREAM_NAME,
  Record: { Data: Buffer.from(JSON.stringify({ name: 'test' }) + '\n') },
});

try {
  await client.send(command);
} catch (error) {
  throw new Error(error);
}

これで、S3にJSONのレコードがバックアップされるようになった!

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2