LoginSignup
17
14

More than 1 year has passed since last update.

fluentdメモ - (4) 設定ファイル調査 Buffer編

Last updated at Posted at 2020-05-01

はじめに

fluentdに関するメモ書きです。ここでは、設定ファイル(主にBuffer関連)について調査/テストした内容を記載します。

関連記事

fluentdメモ - (1) インストール/簡易操作
fluentdメモ - (2) 設定ファイル概要
fluentdメモ - (3) 設定ファイル調査 Input/Fileter/Output編
fluentdメモ - (4) 設定ファイル調査 Buffer編

概要理解

まずはこの辺り。
参考:
Multi Process Worker
Performance Tuning

Input/Fileter/Outputの一連の処理は"Worker"と呼ばれるプロセスで実行されます。デフォルトでは1プロセス(1Worker)で処理されますが、<system>ディレクティブでこのプロセス数(Worker数)の制御が行えます。

各プロセスのOutputのところでBufferingが行われるようで、これがちょっと分かりにくい。
参考:
fluentd - Buffer Plugins
fluentd - Config: Buffer Section
BufferedOutput pluginの代表的なoptionについて
fluentdのbuffer周りで注意すべき点
fluentd の基礎知識
[fluentd] buffer pluginの理解

image.png
Outputプラグインのところで、各eventが出力される際にBufferingされますが、その実体としてMermoryを使うかFileを使うか選択できます(デフォルトはMemory)。
いずれにしても各eventは一旦"Chunk"という単位にまとめられてから実際の宛先に出力されます。この際、Chunkのステータスも2つあって、まずはstagedのステータスで、その後、queuedになって書き出しが行われる、という流れのようです。このようなフローのなかでそのタイミングなどに影響を与えるパラメーターが多数あります。

File type bufferでのテスト

File typeのbufferを使うとChunkの動きが分かりやすいので、File typeを使って、主にflush_mode: intervalの動作を中心に、気になるパラメーターの動きを確認してみました。

設定ファイル例
<system>
  log_level debug

</system>

<source>
  @type tcp
  port 8081
  tag tcp.event
  bind 0.0.0.0
  <parse>
    @type json
  </parse>
</source>

<match tcp.**>
  @type rewrite_tag_filter
  <rule>
    key field01
    pattern /^(.*)/
    tag test06.$1
  </rule>
</match>

<filter test06.**>
  @type record_transformer
  enable_ruby
  <record>
    field_date_time ${require "date"; DateTime.strptime(record["field_date"] + "T" + record["field_time"] + "+0900", "%Y-%m-%dT%H:%M:%S.%L%z"
).iso8601(3).to_s}
  </record>
</filter>

<match test06.**>
  @type elasticsearch_dynamic
  host localhost
  port 9200
  logstash_format true
  logstash_prefix test06
  logstash_dateformat %Y%m%d
  time_key field_date_time
  utc_index false
  <buffer>
    @type file
    path /etc/td-agent/buffer
    flush_thread_count 1
    flush_interval 30
    flush_thread_interval 1
    flush_thread_burst_interval 180
    chunk_limit_size 600
  </buffer>
</match>

通常ケース

1メッセージ投入直後に以下のようなファイルが作成され、約30秒後にファイルが削除されました。一瞬ファイル名がbuffer.bxxxからbuffer.qxxxに変わってから削除されるはずですが、書き出し処理が速いと確認できません。
(Chunkはbuffer.bxxx.log と buffer.bxxx.log.meta という2つのファイルのセットのようです。後者はメタデータ?)

-rw-r--r--.  1 root root  157  5月  1 10:30 buffer.b5a48c1ead047cf4a23582df3630e8daf.log
-rw-r--r--.  1 root root   80  5月  1 10:30 buffer.b5a48c1ead047cf4a23582df3630e8daf.log.meta

flush_thread_interval 1 なので、スレッドは1秒間隔でstagedのChunkをチェックしにいきます。その際、Chunkが作成されてからflush_intervalの時間(30s)経過したChunkがあれば、flush処理を行います。
この"flush"と言っているのが具体的に何のことなのかちゃんとした説明が見つけられないのですが、stagedのChunkを実際の宛先(ここではElasticsearch)へ書き出しを試みる処理のことを指していると思われます。書き出しまでのステップとしては、まずenqueueという処理によってChunkのステータスが"staged"=>"queued"に移行します(File type bufferだと、buffer.bxxx => buffer.qxxx にファイル名が変わる)。その後、実際の書き出し処理が行われます。

※補足
意図的にネットワーク遅延を発生させてElasticsearchの書き込みを遅くしてみると、buffer.bxxxというファイルがbuffer.pxxxという名前に変更され(iノード番号は同一)、その後削除される様子が確認できました。
今回はfluentdとElasticsearchを同一ノードで試しているので、Loopback用のデバイスに対してtcコマンドで意図的な遅延を発生させました。

2秒遅延設定例
# tc qdisc add dev lo root netem delay 2000ms
設定解除例
# tc qdisc del dev lo root

図で示すとこんな感じ
image.png

Elasticsearchダウン状態

書き出し先であるElasticsearchを意図的に落として、書き出しに失敗する状態を作って動きを見ていきます。

Step1

1メッセージ投入直後に以下のようなファイルが作成されます。

-rw-r--r--.  1 root root  157  5月  1 10:35 buffer.b5a48c308f6b093165c5cc67a9e8d050f.log
-rw-r--r--.  1 root root   80  5月  1 10:35 buffer.b5a48c308f6b093165c5cc67a9e8d050f.log.meta

約30秒後に、ファイル名がbuffer.bxxxx から buffeer.qxxxになりました。

-rw-r--r--.  1 root root  157  5月  1 10:35 buffer.q5a48c308f6b093165c5cc67a9e8d050f.log
-rw-r--r--.  1 root root   80  5月  1 10:35 buffer.q5a48c308f6b093165c5cc67a9e8d050f.log.meta

メッセージ投入から約30秒後から、以下のようなエラーが出始めました。

2020-05-01 10:35:32 +0900 [debug]: #0 taking back chunk for errors. chunk="5a48c308f6b093165c5cc67a9e8d050f"
2020-05-01 10:35:32 +0900 [warn]: #0 failed to flush the buffer. retry_time=1 next_retry_seconds=2020-05-01 10:35:33 +0900 chunk="5a48c308f6b093165c5cc67a9e8d050f" error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"localhost\", :port=>9200, :scheme=>\"http\"}): Connection refused - connect(2) for 127.0.0.1:9200 (Errno::ECONNREFUSED)"
  2020-05-01 10:35:32 +0900 [warn]: #0 suppressed same stacktrace
2020-05-01 10:35:33 +0900 [debug]: #0 taking back chunk for errors. chunk="5a48c308f6b093165c5cc67a9e8d050f"
2020-05-01 10:35:33 +0900 [warn]: #0 failed to flush the buffer. retry_time=2 next_retry_seconds=2020-05-01 10:35:34 +0900 chunk="5a48c308f6b093165c5cc67a9e8d050f" error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"localhost\", :port=>9200, :scheme=>\"http\"}): Connection refused - connect(2) for 127.0.0.1:9200 (Errno::ECONNREFUSED)"
  2020-05-01 10:35:33 +0900 [warn]: #0 suppressed same stacktrace
2020-05-01 10:35:34 +0900 [debug]: #0 taking back chunk for errors. chunk="5a48c308f6b093165c5cc67a9e8d050f"
2020-05-01 10:35:34 +0900 [warn]: #0 failed to flush the buffer. retry_time=3 next_retry_seconds=2020-05-01 10:35:38 +0900 chunk="5a48c308f6b093165c5cc67a9e8d050f" error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"localhost\", :port=>9200, :scheme=>\"http\"}): Connection refused - connect(2) for 127.0.0.1:9200 (Errno::ECONNREFUSED)"
  2020-05-01 10:35:34 +0900 [warn]: #0 suppressed same stacktrace
2020-05-01 10:35:38 +0900 [debug]: #0 taking back chunk for errors. chunk="5a48c308f6b093165c5cc67a9e8d050f"
2020-05-01 10:35:38 +0900 [warn]: #0 failed to flush the buffer. retry_time=4 next_retry_seconds=2020-05-01 10:35:46 +0900 chunk="5a48c308f6b093165c5cc67a9e8d050f" error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"localhost\", :port=>9200, :scheme=>\"http\"}): Connection refused - connect(2) for 127.0.0.1:9200 (Errno::ECONNREFUSED)"
  2020-05-01 10:35:38 +0900 [warn]: #0 suppressed same stacktrace
2020-05-01 10:35:46 +0900 [debug]: #0 taking back chunk for errors. chunk="5a48c308f6b093165c5cc67a9e8d050f"
2020-05-01 10:35:46 +0900 [warn]: #0 failed to flush the buffer. retry_time=5 next_retry_seconds=2020-05-01 10:36:03 +0900 chunk="5a48c308f6b093165c5cc67a9e8d050f" error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"localhost\", :port=>9200, :scheme=>\"http\"}): Connection refused - connect(2) for 127.0.0.1:9200 (Errno::ECONNREFUSED)"
  2020-05-01 10:35:46 +0900 [warn]: #0 suppressed same stacktrace
2020-05-01 10:36:03 +0900 [debug]: #0 taking back chunk for errors. chunk="5a48c308f6b093165c5cc67a9e8d050f"
2020-05-01 10:36:03 +0900 [warn]: #0 failed to flush the buffer. retry_time=6 next_retry_seconds=2020-05-01 10:36:35 +0900 chunk="5a48c308f6b093165c5cc67a9e8d050f" error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"localhost\", :port=>9200, :scheme=>\"http\"}): Connection refused - connect(2) for 127.0.0.1:9200 (Errno::ECONNREFUSED)"
  2020-05-01 10:36:03 +0900 [warn]: #0 suppressed same stacktrace
2020-05-01 10:36:35 +0900 [debug]: #0 taking back chunk for errors. chunk="5a48c308f6b093165c5cc67a9e8d050f"
2020-05-01 10:36:35 +0900 [warn]: #0 failed to flush the buffer. retry_time=7 next_retry_seconds=2020-05-01 10:37:41 +0900 chunk="5a48c308f6b093165c5cc67a9e8d050f" error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"localhost\", :port=>9200, :scheme=>\"http\"}): Connection refused - connect(2) for 127.0.0.1:9200 (Errno::ECONNREFUSED)"
  2020-05-01 10:36:35 +0900 [warn]: #0 suppressed same stacktrace
2020-05-01 10:37:41 +0900 [debug]: #0 taking back chunk for errors. chunk="5a48c308f6b093165c5cc67a9e8d050f"
2020-05-01 10:37:41 +0900 [warn]: #0 failed to flush the buffer. retry_time=8 next_retry_seconds=2020-05-01 10:39:39 +0900 chunk="5a48c308f6b093165c5cc67a9e8d050f" error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"localhost\", :port=>9200, :scheme=>\"http\"}): Connection refused - connect(2) for 127.0.0.1:9200 (Errno::ECONNREFUSED)"
  2020-05-01 10:37:41 +0900 [warn]: #0 suppressed same stacktrace
2020-05-01 10:39:39 +0900 [debug]: #0 taking back chunk for errors. chunk="5a48c308f6b093165c5cc67a9e8d050f"
2020-05-01 10:39:39 +0900 [warn]: #0 failed to flush the buffer. retry_time=9 next_retry_seconds=2020-05-01 10:44:22 +0900 chunk="5a48c308f6b093165c5cc67a9e8d050f" error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"localhost\", :port=>9200, :scheme=>\"http\"}): Connection refused - connect(2) for 127.0.0.1:9200 (Errno::ECONNREFUSED)"
  2020-05-01 10:39:39 +0900 [warn]: #0 suppressed same stacktrace
2020-05-01 10:44:22 +0900 [debug]: #0 taking back chunk for errors. chunk="5a48c308f6b093165c5cc67a9e8d050f"
2020-05-01 10:44:22 +0900 [warn]: #0 failed to flush the buffer. retry_time=10 next_retry_seconds=2020-05-01 10:53:23 +0900 chunk="5a48c308f6b093165c5cc67a9e8d050f" error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"localhost\", :port=>9200, :scheme=>\"http\"}): Connection refused - connect(2) for 127.0.0.1:9200 (Errno::ECONNREFUSED)"
  2020-05-01 10:44:22 +0900 [warn]: #0 suppressed same stacktrace

リトライが繰り返されている様子が分かります(以下の記述の通り、リトライ間隔は1秒後、2秒後、4秒後、....)。
参考: How Exponential Backoff Works

ここで、もう一件メッセージを投入してみます。すると、以下のように新たなファイル(buffer.bxxx)が作成されます。

-rw-r--r--.  1 root root  157  5月  1 10:45 buffer.b5a48c544ba99f0abd59e5240b3af3aa7.log
-rw-r--r--.  1 root root   80  5月  1 10:45 buffer.b5a48c544ba99f0abd59e5240b3af3aa7.log.meta
-rw-r--r--.  1 root root  157  5月  1 10:35 buffer.q5a48c308f6b093165c5cc67a9e8d050f.log
-rw-r--r--.  1 root root   80  5月  1 10:35 buffer.q5a48c308f6b093165c5cc67a9e8d050f.log.meta
2020-05-01 10:45:00 +0900 [debug]: #0 Created new chunk chunk_id="5a48c544ba99f0abd59e5240b3af3aa7" metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag="test06.AAA", variables=nil>

30秒経過後も、ファイルの状況は変わりません。queued状態のchunkがあるため、flush_intervalを経過したstagedのchunkがあってもqueuedにはならないようです。

image.png

Step2

継続してデータを投入してみます。

今回一律で以下のようなJSONデータを投入していますが、Chunkファイルをみると1レコード157byteのようです。
{"message":"hello world", "field01":"AAA", "field02":"BBB", "field03":55, "field_date":"2020-04-29", "field_time":"01:23:45.222"}

今回のテストでは、chunk_limit_sizeを600byteに指定しているので、1つのChunkには3レコードしか入らない状態にしています。

buffer.b5a48d8ab9aed3ea69f6befa249b10f82.log というchunkにはレコードが3つ溜まった状態です。(157*3=471byte)

  3299465 -rw-r--r--.  1 root root  471  5月  1 12:11 buffer.b5a48d8ab9aed3ea69f6befa249b10f82.log
  3299478 -rw-r--r--.  1 root root   80  5月  1 12:11 buffer.b5a48d8ab9aed3ea69f6befa249b10f82.log.meta
  1971918 -rw-r--r--.  1 root root  157  5月  1 12:11 buffer.q5a48d896ab9f51b7dcd4468c4ff308fa.log
  3297661 -rw-r--r--.  1 root root   80  5月  1 12:11 buffer.q5a48d896ab9f51b7dcd4468c4ff308fa.log.meta

この状態で、さらにもう一つメッセージを投入すると、staged=>queuedになり、新たなstagedのChunkが作成されたことが分かります。

  3299479 -rw-r--r--.  1 root root  157  5月  1 12:12 buffer.b5a48d8cede6f990568333b0c69777d2e.log
  3301137 -rw-r--r--.  1 root root   80  5月  1 12:12 buffer.b5a48d8cede6f990568333b0c69777d2e.log.meta
  1971918 -rw-r--r--.  1 root root  157  5月  1 12:11 buffer.q5a48d896ab9f51b7dcd4468c4ff308fa.log
  3297661 -rw-r--r--.  1 root root   80  5月  1 12:11 buffer.q5a48d896ab9f51b7dcd4468c4ff308fa.log.meta
  3299465 -rw-r--r--.  1 root root  471  5月  1 12:12 buffer.q5a48d8ab9aed3ea69f6befa249b10f82.log
  3299478 -rw-r--r--.  1 root root   80  5月  1 12:12 buffer.q5a48d8ab9aed3ea69f6befa249b10f82.log.meta

さらに3つ投入すると同じようにChunkが増えます。

  3301138 -rw-r--r--.  1 root root  157  5月  1 12:14 buffer.b5a48d93034899149b1a0c7747a07a998.log
  3301378 -rw-r--r--.  1 root root   80  5月  1 12:14 buffer.b5a48d93034899149b1a0c7747a07a998.log.meta
  1971918 -rw-r--r--.  1 root root  157  5月  1 12:11 buffer.q5a48d896ab9f51b7dcd4468c4ff308fa.log
  3297661 -rw-r--r--.  1 root root   80  5月  1 12:11 buffer.q5a48d896ab9f51b7dcd4468c4ff308fa.log.meta
  3299465 -rw-r--r--.  1 root root  471  5月  1 12:12 buffer.q5a48d8ab9aed3ea69f6befa249b10f82.log
  3299478 -rw-r--r--.  1 root root   80  5月  1 12:12 buffer.q5a48d8ab9aed3ea69f6befa249b10f82.log.meta
  3299479 -rw-r--r--.  1 root root  471  5月  1 12:14 buffer.q5a48d8cede6f990568333b0c69777d2e.log
  3301137 -rw-r--r--.  1 root root   80  5月  1 12:14 buffer.q5a48d8cede6f990568333b0c69777d2e.log.meta

image.png

step3

この間、最初のqueuedのChunkの書き出しリトライが繰り返されています。
ここでElasticsearch 復旧させると、その後のリトライが成功してqueuedのChunkが1つ無くなります。

  3301138 -rw-r--r--.  1 root root  157  5月  1 12:14 buffer.b5a48d93034899149b1a0c7747a07a998.log
  3301378 -rw-r--r--.  1 root root   80  5月  1 12:14 buffer.b5a48d93034899149b1a0c7747a07a998.log.meta
  3299465 -rw-r--r--.  1 root root  471  5月  1 12:12 buffer.q5a48d8ab9aed3ea69f6befa249b10f82.log
  3299478 -rw-r--r--.  1 root root   80  5月  1 12:12 buffer.q5a48d8ab9aed3ea69f6befa249b10f82.log.meta
  3299479 -rw-r--r--.  1 root root  471  5月  1 12:14 buffer.q5a48d8cede6f990568333b0c69777d2e.log
  3301137 -rw-r--r--.  1 root root   80  5月  1 12:14 buffer.q5a48d8cede6f990568333b0c69777d2e.log.meta

約180s(flush_thread_burst_interval)の後、queuedのchunkが1つ消えます。

  3301138 -rw-r--r--.  1 root root  157  5月  1 12:14 buffer.b5a48d93034899149b1a0c7747a07a998.log
  3301378 -rw-r--r--.  1 root root   80  5月  1 12:14 buffer.b5a48d93034899149b1a0c7747a07a998.log.meta
  3299479 -rw-r--r--.  1 root root  471  5月  1 12:14 buffer.q5a48d8cede6f990568333b0c69777d2e.log
  3301137 -rw-r--r--.  1 root root   80  5月  1 12:14 buffer.q5a48d8cede6f990568333b0c69777d2e.log.meta

さらに、約180s(flush_thread_burst_interval)の後、queuedのchunkが消えます。これでqueuedのChunkが無くなったので、直後(恐らflush_thread_intervalのタイミング)にstagedのchunkがqueuedになって書き出されるので全部なくなりました。

image.png

まとめ

主要なパラメーターを整理するとこんなイメージなんじゃないかと思います。

image.png

また、enqueue, writeのタイミングは、ざっくりこんな感じでとらえるとよいかと思います。

  • enqueueされるタイミング:flush_modeに応じたタイミング or Chunkサイズがthresholdを超えたタイミング
  • writeされるタイミング: queuedのchunkが無ければ即時、queuedのChunkが残っていてキューイングされる場合はflush_thuread_burst_intervalで処理されるのを待つ
17
14
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
17
14