29
24

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Kinesis Data Firehoseを使ってみた話

Last updated at Posted at 2020-02-10

はじめに

新規プロジェクトでのログ集計をKinesis Data Firehoseで設計してみたものの
まだ使ったことがなかったので使ってみた話

Kinesis Data Firehoseとは

そもそもKinesisとはという話なんですが。
Kinesisでできることは、ストリーミングデータをリアルタイムで収集、処理、分析することです。
その中でも大きく四つあり

  • Kinesis Data Streams : ストリームデータを受けるサービス
  • Kinesis Data Firehose : ストリームデータをS3やRedshiftなどの分析ツールに送るサービス
  • Kinesis Data Analytics: ストリームデータを分析してくれるサービス
  • Kinesis Video Streams: 動画を簡単かつ安全にストリーミングするサービス

Kinesis Data StreamsとFirehoseの違い

Streamsは、雑に言えばストリームデータを受けるだけのサービス。
受けた後はLambdaか何かのイベント発火してストリームデータを受け取ってもらうようにするだけです。

Firehoseは、データを受けた後、S3やRedsiftまで流してくれます。

Streamsは受けるだけなので、例えばStreams -> Firehose -> S3もできる
また、ユースケースにもよりますが、Firehoseは1分間後にS3に吐き出しなど、"ある程度データが溜まったら"という閾値を儲けることができるのに対して、Streamsは1秒以下でデータロードができます。

Firehoseはエンドポイント(S3のパス)とデータ(ダンプしたjsonとか)を指定するだけで流してくれるので設定はGUI上で収まるくらい非常に簡単です。

つべこべ言わずに作ってみる

1. コンソール画面でKinesisと検索

スクリーンショット 2020-02-10 15.48.27.png

2. ダッシュボードで「配信ストリームの作成」を押す

picture1.png

3. Delivery stream nameを任意の名前にして、Choose a soueceのタイプを選ぶ

Streams -> Firehoseの流れではないので、"Direct Put or other sources"にチェック
pythonからboto3とかでFirehoseにデータを投入する場合はother sourcesになります。
スクリーンショット 2020-02-10 11.21.16.png

4. Transform source records with AWS Lambda, Convert record formatの設定

Transform source records with AWS Lambda: ストリームデータを保存する前に、Lambdaで編集することができます。
例えば、helloという文字列をkinesis firehoseで送ったとしてlambdaで受け取って"world"と追記して一レコードとして保存することが可能です。
今回はそのまま保存したいのでDisableにチェック

Convert record formatは、Enableにすると出力形式を決めることができて、二種類あります

  • Apache Parquet
  • Apache ORC

それぞれの違いは↓の画像を参考

Apache ParquetとApache ORCの違い

参考URL: https://speakerdeck.com/chie8842/karamunahuomatutofalsekihon-2

データ分析用途だと、特定カラムに焦点をあてて高速に集計などを行うので、こういったカラムナ形式が採用されていたりします。
AthenaとかだとParquet形式で集計がすすめられたりするので結構重要

Athenaのパフォーマンスチューニング Tips トップ 10を一読すると幸せ?かも

今回はテスト的にデータを送るので、そのままDISABLEで
スクリーンショット 2020-02-10 11.23.38.png

5. Select a destinationの設定

Amazon S3に入れる予定なのでS3にチェック

Select a destinationの設定でS3にチェック

6. S3のバケット設定、Prefix設定

S3のバケットは作ったバケット名を指定
Prefixはパスを決定できるのですが、これも"Athenaのパフォーマンスチューニング Tips トップ 10"を参考にパーティーションを意識して分けるとカスタムで設定します。

例:

s3://athena-examples/flight/parquet/year=1991/month=1/day=1/
s3://athena-examples/flight/parquet/year=1991/month=1/day=2/

S3のバケット設定とPrefix設定

細かいPrefixは公式を参考に
カスタムPrefixの場合はError Prefixも設定しないといけないことをお忘れなく。

7. S3 buffer conditionsと圧縮・暗号化設定

S3にPutされる1ファイルにどれだけの容量制限をかけるか、また、どれくらいの間隔(Interval)でファイル吐き出しをするか設定できる.
今回は5MB or 60 秒でファイル吐き出しを行うようにしました。
また圧縮方法はGZIP, Snappy, Zip様々あります、S3のencryptionも設定できます。

S3に吐き出す条件設定

8. Roleの設定

S3にPutするので必要な権限設定を行います。
Create new Roleをクリックし、新しいIAMロールの作成で作ります。(すでに作っている場合は選択)
スクリーンショット 2020-02-10 16.16.21.png
スクリーンショット 2020-02-10 16.16.45.png

設定したら最後の確認画面になるので、"Create delivery stream"して設定終了

最後に流してみる

pip install boto3とかでawsのclientライブラリであるboto3を入れる

import boto3
import uuid
import json

client = boto3.client(
    'firehose',
    aws_access_key_id='xxxxxxxxxxxxxxx',
    aws_secret_access_key='xxxxxxxxxxxxxxxxxxxx',
    region_name='ap-northeast-1'
)
data = {
    "musicId": 'xxxxxxxxxxxxxxxxxxxx',
    "userId": 1,
    "artistId": 1
}
response = client.put_record(
    DeliveryStreamName='a-honda-test-kinesis-firehose',
    Record={
        'Data': f'{json.dumps(data)}\n'
    }
)
print(response)

これで送信すると

a-honda@hoge:~
$ python test.py
{'RecordId': 'ls1DjmHBiOXPtNe/7zpYc/6Zk8o8j6JVz5QkBeyGC3I8aFjf7Dv/CAQ7JnRw913ovghFSIMuEn3MOkkR5GK8QVRJCjvH6AmRw0SPUqcTE8jm5kcDxWu+91HeFr+PJlyFCgIG259Ig+DH7rsXTfHRNMFPt3/G7GquA0WUmSXDTBIOzRxnsQ1bNyN0mEHFrhOkbiXElZ1rL6s1WvJLddHF+qDpggjUCX2i', 'Encrypted': False, 'ResponseMetadata': {'RequestId': 'e5724431-a382-2e82-b3c8-8bbaf8b786fe', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'e5724431-a382-2e82-b3c8-8bbaf8b786fe', 'x-amz-id-2': 'zWNAiCZio6tiW5gGbNggHSLjA4EHKnu3RXE5umbYbdVx/ZraOY8bEj2H4AlF/C7R0rtk5fLt0Bm+aPyNiMFqZ9ihKmOxrCDO', 'content-type': 'application/x-amz-json-1.1', 'content-length': '257', 'date': 'Mon, 10 Feb 2020 03:26:03 GMT'}, 'RetryAttempts': 0}}

こんな感じで送られています。
ちゃんとS3に保存されているのは確認できました。

さいごに

fluentdとの使い分けについてもう少し調べたほうがいいのかなって正直思いましたが、
fluentdだと、aggregatorを別途自作で用意する必要があるので、そこがマネジメントで賄えるという利点はあるかなと思いました。
参考URL:ログ管理のベストプラクティス

fluentd -> kinesis firehose
Gunosyさんのアーキでは↑のようになっているのですが
この流れが全くよくわからないです(苦笑
なんでfluentd挟むんだろう...

また直接クライアントからStream データを放り投げることもできるのですが不正なことをされるとコストはかかりそう。
設定は意外と簡単なのですが、S3のパスは固定なので、S3のディレクトリを細かくきるとその分Delivery Streamを作らないといけないっぽいので厄介(汗

色々ありますが、ログ管理は深い(小並感

29
24
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
29
24

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?