KPL fluentd 設定
いつも通りの作業メモ
KPLについて
効率的にKinesis Streamsに転送してくれるライブラリ。
ざっくり以下の様なメリットがある
- パフォーマンスの向上
- コンシューマー側実装の簡素化
- プロデューサーのモニタリング
公式による説明
http://docs.aws.amazon.com/ja_jp/streams/latest/dev/developing-producers-with-kpl.html#d0e4758
複数のStreamに転送
自動的で設定可能な再試行メカニズムにより 1 つ以上の Amazon Kinesis stream へ書き込む
複数レコードを各シャードにまとめて転送
レコードを収集し、PutRecords を使用して、リクエストごとに複数シャードへ複数レコードを書き込む
レコードを集約し、メタデータを減らすことで効率的に転送
ユーザーレコードを集約し、ペイロードサイズを増加させ、スループットを改善する
コンシューマ側でKCLを使うと集約レコードを簡単に扱える
コンシューマーで Amazon Kinesis Client Library(KCL)とシームレスに統合して、バッチ処理されたレコードを集約解除する
CloudWatchのメトリクスに必要なデータを転送
Amazon CloudWatch メトリックスをユーザーに代わって送信し、プロデューサーのパフォーマンスを確認可能にする
KPLが適さない場面
レコードをバッファにためてまとめて転送するので、最大 RecordMaxBufferedTime まで追加の処理遅延が生じる。これが許容できないような場合にはNGで。
KPLの設定
どんな設定があるかも一通り確認
設定値をいい感じに検討
ConnectTimeout (connect_timeout)
- Default: 6000
- Minimum: 100
- Maximum (inclusive): 300000
TLS接続のタイムアウト
FailIfThrottled (fail_if_throttled)
- default: false
putの書き込み速度にシャードの書き込みスループットが追いつかない場合、シャード側はputリクエストを抑止する。
その際に再実行せずに即時エラーとして扱う。
エラーとして扱うことで、別のパーティションキーで別シャードに送ったりとかそういうことができそうな感じ?
ちゃんとパーティションキーが散らされてるなら、触らないほうが良さげ。
falseの場合、再実行の回数(時間)とかは気をつけたほうがいい。
LogLevel (log_level)
- Default: info
- Expected pattern: info|warning|error
MaxConnections (max_connections)
- Default: 24
- Minimum: 1
- Maximum (inclusive): 256
最大接続数
上げすぎるとリソースを結構使うかもしれないので、別のアプリケーションと同居しているような場合は程々に。
MetricsGranularity (metrics_granularity)
- Default: shard
- Expected pattern: global|stream|shard
CloudWatchに転送するメトリクスの粒度
MetricsLevel (metrics_level)
- Default: detailed
- Expected pattern: none|summary|detailed
有効にするメトリクス
- none:無し
- summary:UserRecordsPut、KinesisRecordsPut、ErrorsByCode、AllErrors、BufferingTime
- detailed:ぜんぶ
MetricsNamespace (metrics_namespace)
- Default: KinesisProducerLibrary
- Expected pattern: (?!AWS/).{1,255}
CloudWatchネームスペース名 同じAWS内で複数のKPLが動く場合はそれぞれ名前を変更
MetricsUploadDelay (metrics_upload_delay)
- Default: 60000
- Minimum: 1
- Maximum (inclusive): 60000
メトリクスアップロード間隔 (ms)
RateLimit (rate_limit)
- Default: 150
- Minimum: 1
- Maximum (inclusive): 9223372036854775807
KPL にはレート制限機能があり、1 つのプロデューサーからの送信されるシャード単位のスループットを制限できます。
このしきい値は設定できますが、デフォルトでは実際のシャード制限より 50% 大きく設定され、単一のプロデューサーによるシャードの飽和が許されています。
この制限を小さくすることにより、過剰な再試行による大量送信を抑制できます。
ただし、ベストプラクティスは、各プロデューサーについて、最大スループットまで積極的に再試行することと、ストリームの容量を拡大し、適切なパーティションキー戦略を実装することにより、結果的に過剰と判断されたスロットリングを適切に処理することです。
同一シャードに転送するプロデューサがたくさん存在するなら調整したほうがいいのかな?
RecordMaxBufferedTime (record_max_buffered_time)
- Default: 100
- Maximum (inclusive): 9223372036854775807
バッファリングに使用される最大時間
かならずこの数値になるわけじゃないけど、リソースに余裕があればだいたいこの時間内で転送を完了するらしい。
遅延が許容されない場合はRecordTtlを使って調整してね
RecordTtl (record_ttl)
- default: 30000
- Minimum: 100
- Maximum (inclusive): 9223372036854775807
レコードTTL (milliseconds)
この時間内に再投入が成功しなかったレコードはロストする
レコードをロストしたくない場合、超デカイ値にすればいいけど、当然その分いろいろ問題も起こる。ちゃんとメトリクスを監視しよう
RequestTimeout (request_timeout)
- default: 6000
- Minimum: 100
- Maximum (inclusive): 600000
HTTPリクエストの最大要求
この時間が経過した場合はタイムアウト扱いとなる。
当然、ストリーム側では転送できている可能性があるので、その場合に再実行するとデータ重複が発生する
なので、あまり低すぎる値にすると重複率があがってくるよ
変更しなくて良さげ
AggregationEnabled
- default: true
集約のON/OFF
AggregationMaxCount
- default: 4294967295
- Minimum: 1
- Maximum (inclusive): 9223372036854775807
集約レコードの最大アイテム数
バッファリング周りのチューニングはrecord_max_buffered_timeをいじってね
AggregationMaxSize
- default: 51200
- Minimum: 64
- Maximum (inclusive): 1048576
集約レコードの最大バイト数
CollectionMaxCount
- Default: 500
- Minimum: 1
- Maximum (inclusive): 500
putRecordsに送る際の最大アイテム数
CollectionMaxSize
- Default: 5242880
- Minimum: 52224
- Maximum (inclusive): 9223372036854775807
putRecordsに送る際の最大バイト数
MinConnections
- Default: 1
- Minimum: 1
- Maximum (inclusive): 16
開きっぱなしにするコネクション数
Port
- Default: 443
- Minimum: 1
- Maximum (inclusive): 65535
VerifyCertificate
- default: true
証明書チェック
fluentdプラグイン
KPLで実装されているkinesis-stream(firehose)fluentdのプラグイン
コレ => https://github.com/awslabs/aws-fluent-plugin-kinesis
使い方
インストール
gem install fluent-plugin-kinesis
僕はTreasureDataを使っているのでtd-agentでやります。
td-agentのインストールはこちらの方の記事にまとまっています。
sudo td-agent-gem install fluent-plugin-kinesis
fluentdプラグインの設定
kinesis_producer KPLを使うのでkinesis_producerの設定をする
<match delivery_logs>
@type kinesis_producer
region ap-northeast-1
stream_name dev-delivery-logs
partition_key nil # Otherwise, use random partition key
</match>
KPLのプロパティ設定方法
KPLの設定はコレ => https://github.com/awslabs/aws-fluent-plugin-kinesis#configuration-kinesis_producer
色々設定を入れてrequest_timeoutとrecord_max_buffered_timeだけいじってとりあえずこれでうごかしてみよう
/etc/td-agent/td-agent.conf
<match delivery_logs.*>
type copy
<store>
@type kinesis_producer
region ap-northeast-1
remove_tag_prefix delivery_logs
stream_name dev-delivery-logs
partition_key nil # Otherwise, use random partition key
connect_timeout 6000
fail_if_throttled false
log_level info
max_connections 24
metrics_granularity shard
metrics_level detailed
metrics_namespace dev-streams
metrics_upload_delay 60000
rate_limit 150
record_max_buffered_time 100
record_ttl 30000
request_timeout 60000
<secondary>
type file
path /var/log/td-agent/failed/delivery-logs/forward-failed
</secondary>
</store>
</match>
はい、なんかエラー出てきた。
config error file="/etc/td-agent/td-agent.conf" error="'stream_name' or 'stream_name_prefix' is required"
設定しているつもりなんだがなんでだろう?_?
ということで調べ中
追記
なんかいろいろ謎なままでもやもやするけど、これで転送できるようになった。
1.typeの@を削除
@type kinesis_producer => type kinesis_producer
2.credentialの指定も追加
aws_key_id
aws_sec_key
td-agentのバージョンとか最新を使っているつもりではあるのですが、もう少し調べて直す
amazonlinux上で動作チェックしつつ、これでいくことに。
<kinesis_producer>
で囲わないといかんのですね。
あとpluginのオススメ設定を入れた。
<match delivery_logs.test>
@type copy
<store>
type kinesis_producer
region ap-northeast-1
stream_name lcl-delivery-log-raw
partition_key uuid
aws_key_id xxxx
aws_sec_key xxxx
<kinesis_producer>
connect_timeout 6000
fail_if_throttled false
log_level info
max_connections 24
metrics_granularity shard
metrics_level detailed
metrics_namespace dev-streams
metrics_upload_delay 60000
rate_limit 150
record_max_buffered_time 100
record_ttl 30000
request_timeout 60000
</kinesis_producer>
buffer_type file
buffer_path /var/log/td-agent/buffer/td
buffer_chunk_limit 1m
flush_interval 1
try_flush_interval 0.1
queued_chunk_flush_interval 0.01
num_threads 15
detach_process 5
<secondary>
type file
path /var/log/td-agent/failed/delivery-logs/forward-failed
</secondary>
</store>
</match>