AWS Kinesis というのは、ちょっとずつデータを集めるための仕組みらしいです。これで試しに S3 にちょっとずつデータを書き込んでみます。
コツ
- 複数のサービスを組み合わせるので権限の設定がややこしい
- バッファが貯まるまで S3 からデータを取り出せない
Kinesis Data Stream でデータを集める
データを集める stream を作るには create-stream コマンドを使います。パラメータは名前と shard 数です。
shared 数一つで 1MiB/秒 のデータ吸い取りと 2MiB/秒のデータ吐き出しが出来ます。(データストリームの作成および更新)
コマンド例:
aws kinesis create-stream --stream-name hoge --shard-count 1
それでは作ったストリームに put-record コマンドで書き込みます。--partition-key
を使ってどの shard に保存するか決めるとあるので、適当な文字列を指定します。
$ aws kinesis put-record --stream-name hoge --partition-key hoge-key --data HelloWorld
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49598759924773225241889582386124629810459480330394927106"
}
$ aws kinesis put-record --stream-name hoge --partition-key hoge-key --data ByeWorld
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49598759924773225241889582386125838736279126639248408578"
}
書き込んだデータを読み込みには get-shard-iterator でイテレータを取得します。TRIM_HORIZON といのは古い順に読むということらしい。
$ aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name hoge
{
"ShardIterator": "AAAAAAAAAAFECIoPc6qtHLXUSuEyUtRKh0o9iruhc/8JSu+6acPa9pZxTuPLm1Dcn0j7926YtoCBaE2kGXT8G6GRGPOQPADp5uHX9A9pPORDUe5xMbrw8XZmxyHp/GJqy7ihqykXkh+QJXkAgLqpfXM6BhoJ23aQCogbYs5sBCadzaDdt3onf8xWitFkRGX87PuqS5shMnUJZ1EF7n70/ydF41lan9L1"
}
イテレータを指定して get-records でデータを取得します。
$ aws kinesis get-records --shard-iterator AAAAAAAAAAFECIoPc6qtHLXUSuEyUtRKh0o9iruhc/8JSu+6acPa9pZxTuPLm1Dcn0j7926YtoCBaE2kGXT8G6GRGPOQPADp5uHX9A9pPORDUe5xMbrw8XZmxyHp/GJqy7ihqykXkh+QJXkAgLqpfXM6BhoJ23aQCogbYs5sBCadzaDdt3onf8xWitFkRGX87PuqS5shMnUJZ1EF7n70/ydF41lan9L1
{
"Records": [
{
"Data": "SGVsbG9Xb3JsZA==",
"PartitionKey": "hoge-key",
"ApproximateArrivalTimestamp": 1566379302.427,
"SequenceNumber": "49598759924773225241889582386124629810459480330394927106"
},
{
"Data": "QnllV29ybGQ=",
"PartitionKey": "hoge-key",
"ApproximateArrivalTimestamp": 1566379763.774,
"SequenceNumber": "49598759924773225241889582386125838736279126639248408578"
}
],
"NextShardIterator": "AAAAAAAAAAGlRLCOBUpJHWWqIYeF5K86L9KZ9js+Jo/l40VQdYlp3JGyHOKRZSXuAgbhgN5pEIEmHf/aN3a5dpH+LiI/NRb16Ta7znugIV15+ZO8eT8wlhiO9cICKFbqkqoq0E2DDK7vUf2FgNUP4XVkaV6WM69HvxWXDKbIfdady/ZMkNmCfqGEBMABw6dWgueFCpPX3OD6aYo/MLLwJs4alv09Hku3",
"MillisBehindLatest": 0
}
内容は Base64 されています。put-record
には生データを指定するのに、get-records
では Base64 された物が出てくるという仕様になっています。
$ echo SGVsbG9Xb3JsZA== | base64 -D -
HelloWorld
後始末
aws kinesis delete-stream --stream-name hoge
Kinesis Data Firehose で S3 にデータを集める
次に、Amazon Kinesis Data Firehose で S3 に書き込みます。Firehose というのは、Kinesis Data Stream で集めたデータを簡単に S3 などに書き込む仕組みです。複数のサービスを関連付けるのはややこしいので Terraform で作ります。Firehose 自体の設定は簡単ですが、必要な権限が沢山あります。
Firehose に付与する権限の設定は Amazon Kinesis Data Firehose によるアクセスの制御 に書いてありますが基本はこんな感じです。
- Firehose 用の IAM Role を作る。
- AssumeRole で自分のアカウントの firehose だけがこの IAM Role を使えるようにする。
- この IAM Role に、Stream からデータを読み込む権限を与える。
- この IAM Role に、S3 にデータを書き込む権限を与える。
Terraform ファイルの例:
// Kinesis Data Stream を作ります。
resource "aws_kinesis_stream" "stream" {
name = "hoge"
shard_count = 1
}
// S3 を作ります。
resource "aws_s3_bucket" "bucket" {
bucket = "kinesis-bucket-hoge"
}
// Firehose 用の IAM Role を作ります。自分のアカウントの Firehose だけがこの Role を使えるようにします。
data "aws_caller_identity" "current" {}
resource "aws_iam_role" "role" {
name = "firehose-role-hoge"
assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "sts:AssumeRole",
"Principal": {
"Service": "firehose.amazonaws.com"
},
"Condition": {
"StringEquals": {
"sts:ExternalId": "${data.aws_caller_identity.current.account_id}"
}
}
}
]
}
EOF
}
// Stream からデータを取り出して S3 に書き出すための権限を作ります。
resource "aws_iam_policy" "policy" {
name = "firehose-policy"
policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:DescribeStream",
"kinesis:GetShardIterator",
"kinesis:GetRecords"
],
"Resource": "${aws_kinesis_stream.stream.arn}"
},
{
"Effect": "Allow",
"Action": [
"s3:AbortMultipartUpload",
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:PutObject"
],
"Resource": [
"${aws_s3_bucket.bucket.arn}",
"${aws_s3_bucket.bucket.arn}/*"
]
}
]
}
EOF
}
// 権限を Firehose 用の Role に追加します。
resource "aws_iam_role_policy_attachment" "attachment" {
role = "${aws_iam_role.role.name}"
policy_arn = "${aws_iam_policy.policy.arn}"
}
// Firehose を追加して、Kinesis Stream と S3 を繋げます。
resource "aws_kinesis_firehose_delivery_stream" "hose" {
name = "kinesis-firehose-hoge"
destination = "s3"
kinesis_source_configuration {
kinesis_stream_arn = "${aws_kinesis_stream.stream.arn}"
role_arn = "${aws_iam_role.role.arn}"
}
s3_configuration {
role_arn = "${aws_iam_role.role.arn}"
bucket_arn = "${aws_s3_bucket.bucket.arn}"
// テスト用にバッファを 60 秒にします。これで put-record したら 60 秒後に S3 で確認出来ます。
buffer_interval = 60
}
}
テスト
terraform init
terraform apply
aws kinesis put-record --stream-name hoge --partition-key hoge-key --data HelloWorld
aws kinesis put-record --stream-name hoge --partition-key hoge-key --data ByeWorld
などなど
一分後 AWS Console で S3 に書き込まれたことを確認
あと始末
terraform destroy