0
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

KPLとfluentdプラグインのメモ

Last updated at Posted at 2016-12-12

KPL fluentd 設定

いつも通りの作業メモ

KPLについて

効率的にKinesis Streamsに転送してくれるライブラリ。
ざっくり以下の様なメリットがある

  • パフォーマンスの向上
  • コンシューマー側実装の簡素化
  • プロデューサーのモニタリング

公式による説明
http://docs.aws.amazon.com/ja_jp/streams/latest/dev/developing-producers-with-kpl.html#d0e4758

複数のStreamに転送

自動的で設定可能な再試行メカニズムにより 1 つ以上の Amazon Kinesis stream へ書き込む

複数レコードを各シャードにまとめて転送

レコードを収集し、PutRecords を使用して、リクエストごとに複数シャードへ複数レコードを書き込む

レコードを集約し、メタデータを減らすことで効率的に転送

ユーザーレコードを集約し、ペイロードサイズを増加させ、スループットを改善する

コンシューマ側でKCLを使うと集約レコードを簡単に扱える

コンシューマーで Amazon Kinesis Client Library(KCL)とシームレスに統合して、バッチ処理されたレコードを集約解除する

CloudWatchのメトリクスに必要なデータを転送

Amazon CloudWatch メトリックスをユーザーに代わって送信し、プロデューサーのパフォーマンスを確認可能にする

KPLが適さない場面

レコードをバッファにためてまとめて転送するので、最大 RecordMaxBufferedTime まで追加の処理遅延が生じる。これが許容できないような場合にはNGで。

KPLの設定

どんな設定があるかも一通り確認

設定値をいい感じに検討

ConnectTimeout (connect_timeout)

  • Default: 6000
  • Minimum: 100
  • Maximum (inclusive): 300000

TLS接続のタイムアウト

FailIfThrottled (fail_if_throttled)

  • default: false

putの書き込み速度にシャードの書き込みスループットが追いつかない場合、シャード側はputリクエストを抑止する。
その際に再実行せずに即時エラーとして扱う。
エラーとして扱うことで、別のパーティションキーで別シャードに送ったりとかそういうことができそうな感じ?
ちゃんとパーティションキーが散らされてるなら、触らないほうが良さげ。
falseの場合、再実行の回数(時間)とかは気をつけたほうがいい。

LogLevel (log_level)

  • Default: info
  • Expected pattern: info|warning|error

MaxConnections (max_connections)

  • Default: 24
  • Minimum: 1
  • Maximum (inclusive): 256

最大接続数
上げすぎるとリソースを結構使うかもしれないので、別のアプリケーションと同居しているような場合は程々に。

MetricsGranularity (metrics_granularity)

  • Default: shard
  • Expected pattern: global|stream|shard

CloudWatchに転送するメトリクスの粒度

MetricsLevel (metrics_level)

  • Default: detailed
  • Expected pattern: none|summary|detailed

有効にするメトリクス

  • none:無し
  • summary:UserRecordsPut、KinesisRecordsPut、ErrorsByCode、AllErrors、BufferingTime
  • detailed:ぜんぶ

MetricsNamespace (metrics_namespace)

  • Default: KinesisProducerLibrary
  • Expected pattern: (?!AWS/).{1,255}

CloudWatchネームスペース名 同じAWS内で複数のKPLが動く場合はそれぞれ名前を変更

MetricsUploadDelay (metrics_upload_delay)

  • Default: 60000
  • Minimum: 1
  • Maximum (inclusive): 60000

メトリクスアップロード間隔 (ms)

RateLimit (rate_limit)

  • Default: 150
  • Minimum: 1
  • Maximum (inclusive): 9223372036854775807

KPL にはレート制限機能があり、1 つのプロデューサーからの送信されるシャード単位のスループットを制限できます。
このしきい値は設定できますが、デフォルトでは実際のシャード制限より 50% 大きく設定され、単一のプロデューサーによるシャードの飽和が許されています。
この制限を小さくすることにより、過剰な再試行による大量送信を抑制できます。

ただし、ベストプラクティスは、各プロデューサーについて、最大スループットまで積極的に再試行することと、ストリームの容量を拡大し、適切なパーティションキー戦略を実装することにより、結果的に過剰と判断されたスロットリングを適切に処理することです。

同一シャードに転送するプロデューサがたくさん存在するなら調整したほうがいいのかな?

RecordMaxBufferedTime (record_max_buffered_time)

  • Default: 100
  • Maximum (inclusive): 9223372036854775807

バッファリングに使用される最大時間
かならずこの数値になるわけじゃないけど、リソースに余裕があればだいたいこの時間内で転送を完了するらしい。

遅延が許容されない場合はRecordTtlを使って調整してね

RecordTtl (record_ttl)

  • default: 30000
  • Minimum: 100
  • Maximum (inclusive): 9223372036854775807

レコードTTL (milliseconds)
この時間内に再投入が成功しなかったレコードはロストする
レコードをロストしたくない場合、超デカイ値にすればいいけど、当然その分いろいろ問題も起こる。ちゃんとメトリクスを監視しよう

RequestTimeout (request_timeout)

  • default: 6000
  • Minimum: 100
  • Maximum (inclusive): 600000

HTTPリクエストの最大要求
この時間が経過した場合はタイムアウト扱いとなる。

当然、ストリーム側では転送できている可能性があるので、その場合に再実行するとデータ重複が発生する
なので、あまり低すぎる値にすると重複率があがってくるよ

変更しなくて良さげ

AggregationEnabled

  • default: true

集約のON/OFF

AggregationMaxCount

  • default: 4294967295
  • Minimum: 1
  • Maximum (inclusive): 9223372036854775807

集約レコードの最大アイテム数
バッファリング周りのチューニングはrecord_max_buffered_timeをいじってね

AggregationMaxSize

  • default: 51200
  • Minimum: 64
  • Maximum (inclusive): 1048576

集約レコードの最大バイト数

CollectionMaxCount

  • Default: 500
  • Minimum: 1
  • Maximum (inclusive): 500

putRecordsに送る際の最大アイテム数

CollectionMaxSize

  • Default: 5242880
  • Minimum: 52224
  • Maximum (inclusive): 9223372036854775807

putRecordsに送る際の最大バイト数

MinConnections

  • Default: 1
  • Minimum: 1
  • Maximum (inclusive): 16

開きっぱなしにするコネクション数

Port

  • Default: 443
  • Minimum: 1
  • Maximum (inclusive): 65535

VerifyCertificate

  • default: true
    証明書チェック

fluentdプラグイン

KPLで実装されているkinesis-stream(firehose)fluentdのプラグイン

コレ => https://github.com/awslabs/aws-fluent-plugin-kinesis

使い方

インストール

gem install fluent-plugin-kinesis

僕はTreasureDataを使っているのでtd-agentでやります。
td-agentのインストールはこちらの方の記事にまとまっています。

sudo td-agent-gem install fluent-plugin-kinesis

fluentdプラグインの設定

kinesis_producer KPLを使うのでkinesis_producerの設定をする


<match delivery_logs>
  @type kinesis_producer
  region ap-northeast-1
  stream_name dev-delivery-logs
  partition_key nil # Otherwise, use random partition key
</match>

KPLのプロパティ設定方法

KPLの設定はコレ => https://github.com/awslabs/aws-fluent-plugin-kinesis#configuration-kinesis_producer

色々設定を入れてrequest_timeoutとrecord_max_buffered_timeだけいじってとりあえずこれでうごかしてみよう

/etc/td-agent/td-agent.conf

<match delivery_logs.*>
    type copy
    <store>
        @type kinesis_producer
        region ap-northeast-1
        remove_tag_prefix delivery_logs
        stream_name dev-delivery-logs
        partition_key nil # Otherwise, use random partition key
        connect_timeout 6000
        fail_if_throttled false
        log_level info
        max_connections 24 
        metrics_granularity shard
        metrics_level detailed
        metrics_namespace dev-streams
        metrics_upload_delay 60000
        rate_limit 150
        record_max_buffered_time 100
        record_ttl 30000
        request_timeout 60000
        <secondary>
            type file
            path /var/log/td-agent/failed/delivery-logs/forward-failed
        </secondary>
    </store>
</match>

はい、なんかエラー出てきた。

config error file="/etc/td-agent/td-agent.conf" error="'stream_name' or 'stream_name_prefix' is required"

設定しているつもりなんだがなんでだろう?_?

ということで調べ中

追記

なんかいろいろ謎なままでもやもやするけど、これで転送できるようになった。

1.typeの@を削除

@type kinesis_producer => type kinesis_producer

2.credentialの指定も追加

aws_key_id 
aws_sec_key 

td-agentのバージョンとか最新を使っているつもりではあるのですが、もう少し調べて直す

amazonlinux上で動作チェックしつつ、これでいくことに。
<kinesis_producer>で囲わないといかんのですね。

あとpluginのオススメ設定を入れた。


<match delivery_logs.test>
  @type copy
  <store>
    type kinesis_producer

    region ap-northeast-1
    stream_name lcl-delivery-log-raw
    partition_key uuid
    aws_key_id xxxx
    aws_sec_key xxxx

    <kinesis_producer>
      connect_timeout 6000
      fail_if_throttled false
      log_level info
      max_connections 24
      metrics_granularity shard
      metrics_level detailed
      metrics_namespace dev-streams
      metrics_upload_delay 60000
      rate_limit 150
      record_max_buffered_time 100
      record_ttl 30000
      request_timeout 60000
    </kinesis_producer>

    buffer_type file
    buffer_path /var/log/td-agent/buffer/td
    buffer_chunk_limit 1m
    flush_interval 1
    try_flush_interval 0.1
    queued_chunk_flush_interval 0.01
    num_threads 15
    detach_process 5

    <secondary>
      type file
      path /var/log/td-agent/failed/delivery-logs/forward-failed
    </secondary>
  </store>
</match>

0
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?