はじめに
以前書いたFluentBit × Kinesis × S3 でログ保存基盤を作る ~概要編~の続きです。
今回はAWS側の実装である、Kinesis Firehose Delivery StreamからS3にデータを流す基盤を作ります。
リソース作成
今回はCloudFormationを使ってリソースを作成しました。
必要なリソース
- S3バケット
- Kinesis Firehose Delivery Stream用のIAMロール
- S3に書き込みをできる必要がある
- 必要なIAMポリシーは 公式ページ を参照
- Kinesis Firehose Delivery Stream
※ Kinesis Firehose Delivery Stream が長いので、以降はFirehoseと呼びます。
テンプレート
上記要件を満たすためのテンプレートが下記です。
バケット名はきっと被ってしまうので、適宜変えるようにするのが良いと思います。
(サンプル用として書いてるのでリソース名の適当さはお許しください...)
AWSTemplateFormatVersion: 2010-09-09
Resources:
LogBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: log-bucket-stream-sample0021
AccessControl: Private
LogToS3Stream:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
DeliveryStreamName: log-to-s3-stream
DeliveryStreamType: DirectPut
ExtendedS3DestinationConfiguration:
BucketARN: !GetAtt LogBucket.Arn
RoleARN: !GetAtt FirehoseToS3IamRole.Arn
BufferingHints:
IntervalInSeconds: 60
SizeInMBs: 50
CompressionFormat: GZIP
FirehoseToS3IamRole:
Type: AWS::IAM::Role
Properties:
RoleName: firehose-to-s3-iam-role
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
- firehose.amazonaws.com
Action:
- sts:AssumeRole
ManagedPolicyArns:
- !Ref FirehoseToS3DeliveryPolicy
FirehoseToS3DeliveryPolicy:
Type: AWS::IAM::ManagedPolicy
Properties:
PolicyDocument:
Version: 2012-10-17
Statement:
-
Sid: AllowFirehoseToS3Delivery
Effect: Allow
Action:
- s3:AbortMultipartUpload
- s3:GetBucketLocation
- s3:GetObject
- s3:ListBucket
- s3:ListBucketMultipartUploads
- s3:PutObject
- kinesis:DescribeStream
- kinesis:GetShardIterator
- kinesis:GetRecords
- kinesis:ListShards
- kms:Decrypt
- kms:GenerateDataKey
- logs:PutLogEvents
- lambda:InvokeFunction
- lambda:GetFunctionConfiguration
Resource: "*"
こちらをCloudFormationで流し込みます。
変数は特に埋め込んでいないので、設定は必要ありません。
リソース確認
S3
バケットが作成されています。
ここがFirehoseから送られてくるデータが書き込まれる場所になります。
Firehose
キャッチしたデータの書き込み先のS3もちゃんと指定されています。
ちなみにここのBuffer conditionsは、Firehoseがデータをキャッチしてから、S3に送るまでの間隔値などのバッファ設定です。
動かしてみる
データを送る
Firehoseのコンソール画面にはテストデータを送るようのボタンが設置されているので、これを使ってFirehoseにデータを送ってみます。
「Sending demo data」をクリックすると、データが送られ始めます。
1回押すと1件データが送られるのではなく、何件もデータが連続で送られるみたいです。
そのため、「Stop sending demo data」をクリックしないとデータの送信が止まらないのでご注意を!
ちなみに
{"ticker_symbol":"QXZ", "sector":"HEALTHCARE", "change":-0.05, "price":84.51}
が何度も送られる形です。JSON文字列ですね。
Firehoseのモニタリングを確認してみる
「Monitoring」をクリックすると、Firehoseが受信したデータ量、S3に送信したデータ量などのメトリクスを見ることができます。
しばらく待って画面の更新をしてみると、Firehoseがデータをストリームしていることが分かるメトリクスデータが現れます。
今回は60個データをFirehoseに流し、それをキャッチしたFirehoseが60個のデータをS3に書き込みました。
バケットを確認する
S3をみてみると、日時で区切られたディレクトリが自動生成され、その中にFirehoseがストリーミングで書き込んだデータのファイルが配置されています。
gzファイルをダウンロードし、lessコマンド等で確認。
すると、
Firehoseから送られてきたデータを確認することができました。
最後に
今回はFirehoseからS3にデータをストリームする基盤を構築しました。
次回はコンテナ側の実装となる、FluentBitからFirehoseにデータを送る部分についてまとめます。