4
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

AWS Kinesis を使って S3 にちょっとずつ書き込む。

Posted at

AWS Kinesis というのは、ちょっとずつデータを集めるための仕組みらしいです。これで試しに S3 にちょっとずつデータを書き込んでみます。

コツ

  • 複数のサービスを組み合わせるので権限の設定がややこしい
  • バッファが貯まるまで S3 からデータを取り出せない

Kinesis Data Stream でデータを集める

データを集める stream を作るには create-stream コマンドを使います。パラメータは名前と shard 数です。

shared 数一つで 1MiB/秒 のデータ吸い取りと 2MiB/秒のデータ吐き出しが出来ます。(データストリームの作成および更新)

コマンド例:

aws kinesis create-stream --stream-name hoge --shard-count 1

それでは作ったストリームに put-record コマンドで書き込みます。--partition-key を使ってどの shard に保存するか決めるとあるので、適当な文字列を指定します。

$ aws kinesis put-record --stream-name hoge --partition-key hoge-key --data HelloWorld
{
    "ShardId": "shardId-000000000000", 
    "SequenceNumber": "49598759924773225241889582386124629810459480330394927106"
}
$ aws kinesis put-record --stream-name hoge --partition-key hoge-key --data ByeWorld
{
    "ShardId": "shardId-000000000000", 
    "SequenceNumber": "49598759924773225241889582386125838736279126639248408578"
}

書き込んだデータを読み込みには get-shard-iterator でイテレータを取得します。TRIM_HORIZON といのは古い順に読むということらしい。

$ aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name hoge
{
    "ShardIterator": "AAAAAAAAAAFECIoPc6qtHLXUSuEyUtRKh0o9iruhc/8JSu+6acPa9pZxTuPLm1Dcn0j7926YtoCBaE2kGXT8G6GRGPOQPADp5uHX9A9pPORDUe5xMbrw8XZmxyHp/GJqy7ihqykXkh+QJXkAgLqpfXM6BhoJ23aQCogbYs5sBCadzaDdt3onf8xWitFkRGX87PuqS5shMnUJZ1EF7n70/ydF41lan9L1"
}

イテレータを指定して get-records でデータを取得します。

$ aws kinesis get-records --shard-iterator AAAAAAAAAAFECIoPc6qtHLXUSuEyUtRKh0o9iruhc/8JSu+6acPa9pZxTuPLm1Dcn0j7926YtoCBaE2kGXT8G6GRGPOQPADp5uHX9A9pPORDUe5xMbrw8XZmxyHp/GJqy7ihqykXkh+QJXkAgLqpfXM6BhoJ23aQCogbYs5sBCadzaDdt3onf8xWitFkRGX87PuqS5shMnUJZ1EF7n70/ydF41lan9L1
{
    "Records": [
        {
            "Data": "SGVsbG9Xb3JsZA==", 
            "PartitionKey": "hoge-key", 
            "ApproximateArrivalTimestamp": 1566379302.427, 
            "SequenceNumber": "49598759924773225241889582386124629810459480330394927106"
        }, 
        {
            "Data": "QnllV29ybGQ=", 
            "PartitionKey": "hoge-key", 
            "ApproximateArrivalTimestamp": 1566379763.774, 
            "SequenceNumber": "49598759924773225241889582386125838736279126639248408578"
        }
    ], 
    "NextShardIterator": "AAAAAAAAAAGlRLCOBUpJHWWqIYeF5K86L9KZ9js+Jo/l40VQdYlp3JGyHOKRZSXuAgbhgN5pEIEmHf/aN3a5dpH+LiI/NRb16Ta7znugIV15+ZO8eT8wlhiO9cICKFbqkqoq0E2DDK7vUf2FgNUP4XVkaV6WM69HvxWXDKbIfdady/ZMkNmCfqGEBMABw6dWgueFCpPX3OD6aYo/MLLwJs4alv09Hku3", 
    "MillisBehindLatest": 0
}

内容は Base64 されています。put-record には生データを指定するのに、get-records では Base64 された物が出てくるという仕様になっています。

$ echo SGVsbG9Xb3JsZA== | base64 -D -
HelloWorld

後始末

aws kinesis delete-stream --stream-name hoge

Kinesis Data Firehose で S3 にデータを集める

次に、Amazon Kinesis Data Firehose で S3 に書き込みます。Firehose というのは、Kinesis Data Stream で集めたデータを簡単に S3 などに書き込む仕組みです。複数のサービスを関連付けるのはややこしいので Terraform で作ります。Firehose 自体の設定は簡単ですが、必要な権限が沢山あります。

Firehose に付与する権限の設定は Amazon Kinesis Data Firehose によるアクセスの制御 に書いてありますが基本はこんな感じです。

  • Firehose 用の IAM Role を作る。
  • AssumeRole で自分のアカウントの firehose だけがこの IAM Role を使えるようにする。
  • この IAM Role に、Stream からデータを読み込む権限を与える。
  • この IAM Role に、S3 にデータを書き込む権限を与える。

Terraform ファイルの例:

// Kinesis Data Stream を作ります。
resource "aws_kinesis_stream" "stream" {
  name        = "hoge"
  shard_count = 1
}

// S3 を作ります。
resource "aws_s3_bucket" "bucket" {
  bucket = "kinesis-bucket-hoge"
}

// Firehose 用の IAM Role を作ります。自分のアカウントの Firehose だけがこの Role を使えるようにします。
data "aws_caller_identity" "current" {}

resource "aws_iam_role" "role" {
  name = "firehose-role-hoge"

  assume_role_policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
        "Effect": "Allow",
        "Action": "sts:AssumeRole",
        "Principal": {
            "Service": "firehose.amazonaws.com"
        },
        "Condition": {
            "StringEquals": {
                "sts:ExternalId": "${data.aws_caller_identity.current.account_id}"
            }
        }
    }
  ]
}
EOF
}

// Stream からデータを取り出して S3 に書き出すための権限を作ります。

resource "aws_iam_policy" "policy" {
  name = "firehose-policy"

  policy = <<EOF
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords"
            ],
            "Resource": "${aws_kinesis_stream.stream.arn}"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:AbortMultipartUpload",
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListBucketMultipartUploads",
                "s3:PutObject"
            ],
            "Resource": [
                "${aws_s3_bucket.bucket.arn}",
                "${aws_s3_bucket.bucket.arn}/*"
            ]
        }
    ]
}
EOF
}

// 権限を Firehose 用の Role に追加します。
resource "aws_iam_role_policy_attachment" "attachment" {
  role       = "${aws_iam_role.role.name}"
  policy_arn = "${aws_iam_policy.policy.arn}"
}

// Firehose を追加して、Kinesis Stream と S3 を繋げます。

resource "aws_kinesis_firehose_delivery_stream" "hose" {
  name        = "kinesis-firehose-hoge"
  destination = "s3"

  kinesis_source_configuration {
    kinesis_stream_arn = "${aws_kinesis_stream.stream.arn}"
    role_arn           = "${aws_iam_role.role.arn}"
  }

  s3_configuration {
    role_arn        = "${aws_iam_role.role.arn}"
    bucket_arn      = "${aws_s3_bucket.bucket.arn}"
    // テスト用にバッファを 60 秒にします。これで put-record したら 60 秒後に S3 で確認出来ます。
    buffer_interval = 60
  }
}

テスト

terraform init
terraform apply
aws kinesis put-record --stream-name hoge --partition-key hoge-key --data HelloWorld
aws kinesis put-record --stream-name hoge --partition-key hoge-key --data ByeWorld
などなど

一分後 AWS Console で S3 に書き込まれたことを確認

あと始末

terraform destroy

参考

4
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?