はじめに
新規プロジェクトでのログ集計をKinesis Data Firehoseで設計してみたものの
まだ使ったことがなかったので使ってみた話
Kinesis Data Firehoseとは
そもそもKinesisとはという話なんですが。
Kinesisでできることは、ストリーミングデータをリアルタイムで収集、処理、分析することです。
その中でも大きく四つあり
- Kinesis Data Streams : ストリームデータを受けるサービス
- Kinesis Data Firehose : ストリームデータをS3やRedshiftなどの分析ツールに送るサービス
- Kinesis Data Analytics: ストリームデータを分析してくれるサービス
- Kinesis Video Streams: 動画を簡単かつ安全にストリーミングするサービス
Kinesis Data StreamsとFirehoseの違い
Streamsは、雑に言えばストリームデータを受けるだけのサービス。
受けた後はLambdaか何かのイベント発火してストリームデータを受け取ってもらうようにするだけです。
Firehoseは、データを受けた後、S3やRedsiftまで流してくれます。
Streamsは受けるだけなので、例えばStreams -> Firehose -> S3もできる
また、ユースケースにもよりますが、Firehoseは1分間後にS3に吐き出しなど、"ある程度データが溜まったら"という閾値を儲けることができるのに対して、Streamsは1秒以下でデータロードができます。
Firehoseはエンドポイント(S3のパス)とデータ(ダンプしたjsonとか)を指定するだけで流してくれるので設定はGUI上で収まるくらい非常に簡単です。
つべこべ言わずに作ってみる
1. コンソール画面でKinesisと検索
2. ダッシュボードで「配信ストリームの作成」を押す
3. Delivery stream nameを任意の名前にして、Choose a soueceのタイプを選ぶ
Streams -> Firehoseの流れではないので、"Direct Put or other sources"にチェック
pythonからboto3とかでFirehoseにデータを投入する場合はother sourcesになります。
4. Transform source records with AWS Lambda, Convert record formatの設定
Transform source records with AWS Lambda: ストリームデータを保存する前に、Lambdaで編集することができます。
例えば、helloという文字列をkinesis firehoseで送ったとしてlambdaで受け取って"world"と追記して一レコードとして保存することが可能です。
今回はそのまま保存したいのでDisableにチェック
Convert record formatは、Enableにすると出力形式を決めることができて、二種類あります
- Apache Parquet
- Apache ORC
それぞれの違いは↓の画像を参考
参考URL: https://speakerdeck.com/chie8842/karamunahuomatutofalsekihon-2
データ分析用途だと、特定カラムに焦点をあてて高速に集計などを行うので、こういったカラムナ形式が採用されていたりします。
AthenaとかだとParquet形式で集計がすすめられたりするので結構重要
Athenaのパフォーマンスチューニング Tips トップ 10を一読すると幸せ?かも
5. Select a destinationの設定
Amazon S3に入れる予定なのでS3にチェック
6. S3のバケット設定、Prefix設定
S3のバケットは作ったバケット名を指定
Prefixはパスを決定できるのですが、これも"Athenaのパフォーマンスチューニング Tips トップ 10"を参考にパーティーションを意識して分けるとカスタムで設定します。
例:
s3://athena-examples/flight/parquet/year=1991/month=1/day=1/
s3://athena-examples/flight/parquet/year=1991/month=1/day=2/
細かいPrefixは公式を参考に
カスタムPrefixの場合はError Prefixも設定しないといけないことをお忘れなく。
7. S3 buffer conditionsと圧縮・暗号化設定
S3にPutされる1ファイルにどれだけの容量制限をかけるか、また、どれくらいの間隔(Interval)でファイル吐き出しをするか設定できる.
今回は5MB or 60 秒でファイル吐き出しを行うようにしました。
また圧縮方法はGZIP, Snappy, Zip様々あります、S3のencryptionも設定できます。
8. Roleの設定
S3にPutするので必要な権限設定を行います。
Create new Roleをクリックし、新しいIAMロールの作成で作ります。(すでに作っている場合は選択)
設定したら最後の確認画面になるので、"Create delivery stream"して設定終了
最後に流してみる
pip install boto3とかでawsのclientライブラリであるboto3を入れる
import boto3
import uuid
import json
client = boto3.client(
'firehose',
aws_access_key_id='xxxxxxxxxxxxxxx',
aws_secret_access_key='xxxxxxxxxxxxxxxxxxxx',
region_name='ap-northeast-1'
)
data = {
"musicId": 'xxxxxxxxxxxxxxxxxxxx',
"userId": 1,
"artistId": 1
}
response = client.put_record(
DeliveryStreamName='a-honda-test-kinesis-firehose',
Record={
'Data': f'{json.dumps(data)}\n'
}
)
print(response)
これで送信すると
a-honda@hoge:~
$ python test.py
{'RecordId': 'ls1DjmHBiOXPtNe/7zpYc/6Zk8o8j6JVz5QkBeyGC3I8aFjf7Dv/CAQ7JnRw913ovghFSIMuEn3MOkkR5GK8QVRJCjvH6AmRw0SPUqcTE8jm5kcDxWu+91HeFr+PJlyFCgIG259Ig+DH7rsXTfHRNMFPt3/G7GquA0WUmSXDTBIOzRxnsQ1bNyN0mEHFrhOkbiXElZ1rL6s1WvJLddHF+qDpggjUCX2i', 'Encrypted': False, 'ResponseMetadata': {'RequestId': 'e5724431-a382-2e82-b3c8-8bbaf8b786fe', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'e5724431-a382-2e82-b3c8-8bbaf8b786fe', 'x-amz-id-2': 'zWNAiCZio6tiW5gGbNggHSLjA4EHKnu3RXE5umbYbdVx/ZraOY8bEj2H4AlF/C7R0rtk5fLt0Bm+aPyNiMFqZ9ihKmOxrCDO', 'content-type': 'application/x-amz-json-1.1', 'content-length': '257', 'date': 'Mon, 10 Feb 2020 03:26:03 GMT'}, 'RetryAttempts': 0}}
こんな感じで送られています。
ちゃんとS3に保存されているのは確認できました。
さいごに
fluentdとの使い分けについてもう少し調べたほうがいいのかなって正直思いましたが、
fluentdだと、aggregatorを別途自作で用意する必要があるので、そこがマネジメントで賄えるという利点はあるかなと思いました。
参考URL:ログ管理のベストプラクティス
fluentd -> kinesis firehose
Gunosyさんのアーキでは↑のようになっているのですが
この流れが全くよくわからないです(苦笑
なんでfluentd挟むんだろう...
また直接クライアントからStream データを放り投げることもできるのですが不正なことをされるとコストはかかりそう。
設定は意外と簡単なのですが、S3のパスは固定なので、S3のディレクトリを細かくきるとその分Delivery Streamを作らないといけないっぽいので厄介(汗
色々ありますが、ログ管理は深い(小並感