はじめに
fluentdに関するメモ書きです。ここでは、設定ファイル(主にBuffer関連)について調査/テストした内容を記載します。
関連記事
fluentdメモ - (1) インストール/簡易操作
fluentdメモ - (2) 設定ファイル概要
fluentdメモ - (3) 設定ファイル調査 Input/Fileter/Output編
fluentdメモ - (4) 設定ファイル調査 Buffer編
概要理解
まずはこの辺り。
参考:
Multi Process Worker
Performance Tuning
Input/Fileter/Outputの一連の処理は"Worker"と呼ばれるプロセスで実行されます。デフォルトでは1プロセス(1Worker)で処理されますが、<system>
ディレクティブでこのプロセス数(Worker数)の制御が行えます。
各プロセスのOutputのところでBufferingが行われるようで、これがちょっと分かりにくい。
参考:
fluentd - Buffer Plugins
fluentd - Config: Buffer Section
BufferedOutput pluginの代表的なoptionについて
fluentdのbuffer周りで注意すべき点
fluentd の基礎知識
[fluentd] buffer pluginの理解
Outputプラグインのところで、各eventが出力される際にBufferingされますが、その実体としてMermoryを使うかFileを使うか選択できます(デフォルトはMemory)。
いずれにしても各eventは一旦"Chunk"という単位にまとめられてから実際の宛先に出力されます。この際、Chunkのステータスも2つあって、まずはstagedのステータスで、その後、queuedになって書き出しが行われる、という流れのようです。このようなフローのなかでそのタイミングなどに影響を与えるパラメーターが多数あります。
File type bufferでのテスト
File typeのbufferを使うとChunkの動きが分かりやすいので、File typeを使って、主にflush_mode: intervalの動作を中心に、気になるパラメーターの動きを確認してみました。
<system>
log_level debug
</system>
<source>
@type tcp
port 8081
tag tcp.event
bind 0.0.0.0
<parse>
@type json
</parse>
</source>
<match tcp.**>
@type rewrite_tag_filter
<rule>
key field01
pattern /^(.*)/
tag test06.$1
</rule>
</match>
<filter test06.**>
@type record_transformer
enable_ruby
<record>
field_date_time ${require "date"; DateTime.strptime(record["field_date"] + "T" + record["field_time"] + "+0900", "%Y-%m-%dT%H:%M:%S.%L%z"
).iso8601(3).to_s}
</record>
</filter>
<match test06.**>
@type elasticsearch_dynamic
host localhost
port 9200
logstash_format true
logstash_prefix test06
logstash_dateformat %Y%m%d
time_key field_date_time
utc_index false
<buffer>
@type file
path /etc/td-agent/buffer
flush_thread_count 1
flush_interval 30
flush_thread_interval 1
flush_thread_burst_interval 180
chunk_limit_size 600
</buffer>
</match>
通常ケース
1メッセージ投入直後に以下のようなファイルが作成され、約30秒後にファイルが削除されました。一瞬ファイル名がbuffer.bxxxからbuffer.qxxxに変わってから削除されるはずですが、書き出し処理が速いと確認できません。
(Chunkはbuffer.bxxx.log と buffer.bxxx.log.meta という2つのファイルのセットのようです。後者はメタデータ?)
-rw-r--r--. 1 root root 157 5月 1 10:30 buffer.b5a48c1ead047cf4a23582df3630e8daf.log
-rw-r--r--. 1 root root 80 5月 1 10:30 buffer.b5a48c1ead047cf4a23582df3630e8daf.log.meta
flush_thread_interval 1 なので、スレッドは1秒間隔でstagedのChunkをチェックしにいきます。その際、Chunkが作成されてからflush_intervalの時間(30s)経過したChunkがあれば、flush処理を行います。
この"flush"と言っているのが具体的に何のことなのかちゃんとした説明が見つけられないのですが、stagedのChunkを実際の宛先(ここではElasticsearch)へ書き出しを試みる処理のことを指していると思われます。書き出しまでのステップとしては、まずenqueueという処理によってChunkのステータスが"staged"=>"queued"に移行します(File type bufferだと、buffer.bxxx => buffer.qxxx にファイル名が変わる)。その後、実際の書き出し処理が行われます。
※補足
意図的にネットワーク遅延を発生させてElasticsearchの書き込みを遅くしてみると、buffer.bxxxというファイルがbuffer.pxxxという名前に変更され(iノード番号は同一)、その後削除される様子が確認できました。
今回はfluentdとElasticsearchを同一ノードで試しているので、Loopback用のデバイスに対してtcコマンドで意図的な遅延を発生させました。
# tc qdisc add dev lo root netem delay 2000ms
# tc qdisc del dev lo root
Elasticsearchダウン状態
書き出し先であるElasticsearchを意図的に落として、書き出しに失敗する状態を作って動きを見ていきます。
Step1
1メッセージ投入直後に以下のようなファイルが作成されます。
-rw-r--r--. 1 root root 157 5月 1 10:35 buffer.b5a48c308f6b093165c5cc67a9e8d050f.log
-rw-r--r--. 1 root root 80 5月 1 10:35 buffer.b5a48c308f6b093165c5cc67a9e8d050f.log.meta
約30秒後に、ファイル名がbuffer.bxxxx から buffeer.qxxxになりました。
-rw-r--r--. 1 root root 157 5月 1 10:35 buffer.q5a48c308f6b093165c5cc67a9e8d050f.log
-rw-r--r--. 1 root root 80 5月 1 10:35 buffer.q5a48c308f6b093165c5cc67a9e8d050f.log.meta
メッセージ投入から約30秒後から、以下のようなエラーが出始めました。
2020-05-01 10:35:32 +0900 [debug]: #0 taking back chunk for errors. chunk="5a48c308f6b093165c5cc67a9e8d050f"
2020-05-01 10:35:32 +0900 [warn]: #0 failed to flush the buffer. retry_time=1 next_retry_seconds=2020-05-01 10:35:33 +0900 chunk="5a48c308f6b093165c5cc67a9e8d050f" error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"localhost\", :port=>9200, :scheme=>\"http\"}): Connection refused - connect(2) for 127.0.0.1:9200 (Errno::ECONNREFUSED)"
2020-05-01 10:35:32 +0900 [warn]: #0 suppressed same stacktrace
2020-05-01 10:35:33 +0900 [debug]: #0 taking back chunk for errors. chunk="5a48c308f6b093165c5cc67a9e8d050f"
2020-05-01 10:35:33 +0900 [warn]: #0 failed to flush the buffer. retry_time=2 next_retry_seconds=2020-05-01 10:35:34 +0900 chunk="5a48c308f6b093165c5cc67a9e8d050f" error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"localhost\", :port=>9200, :scheme=>\"http\"}): Connection refused - connect(2) for 127.0.0.1:9200 (Errno::ECONNREFUSED)"
2020-05-01 10:35:33 +0900 [warn]: #0 suppressed same stacktrace
2020-05-01 10:35:34 +0900 [debug]: #0 taking back chunk for errors. chunk="5a48c308f6b093165c5cc67a9e8d050f"
2020-05-01 10:35:34 +0900 [warn]: #0 failed to flush the buffer. retry_time=3 next_retry_seconds=2020-05-01 10:35:38 +0900 chunk="5a48c308f6b093165c5cc67a9e8d050f" error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"localhost\", :port=>9200, :scheme=>\"http\"}): Connection refused - connect(2) for 127.0.0.1:9200 (Errno::ECONNREFUSED)"
2020-05-01 10:35:34 +0900 [warn]: #0 suppressed same stacktrace
2020-05-01 10:35:38 +0900 [debug]: #0 taking back chunk for errors. chunk="5a48c308f6b093165c5cc67a9e8d050f"
2020-05-01 10:35:38 +0900 [warn]: #0 failed to flush the buffer. retry_time=4 next_retry_seconds=2020-05-01 10:35:46 +0900 chunk="5a48c308f6b093165c5cc67a9e8d050f" error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"localhost\", :port=>9200, :scheme=>\"http\"}): Connection refused - connect(2) for 127.0.0.1:9200 (Errno::ECONNREFUSED)"
2020-05-01 10:35:38 +0900 [warn]: #0 suppressed same stacktrace
2020-05-01 10:35:46 +0900 [debug]: #0 taking back chunk for errors. chunk="5a48c308f6b093165c5cc67a9e8d050f"
2020-05-01 10:35:46 +0900 [warn]: #0 failed to flush the buffer. retry_time=5 next_retry_seconds=2020-05-01 10:36:03 +0900 chunk="5a48c308f6b093165c5cc67a9e8d050f" error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"localhost\", :port=>9200, :scheme=>\"http\"}): Connection refused - connect(2) for 127.0.0.1:9200 (Errno::ECONNREFUSED)"
2020-05-01 10:35:46 +0900 [warn]: #0 suppressed same stacktrace
2020-05-01 10:36:03 +0900 [debug]: #0 taking back chunk for errors. chunk="5a48c308f6b093165c5cc67a9e8d050f"
2020-05-01 10:36:03 +0900 [warn]: #0 failed to flush the buffer. retry_time=6 next_retry_seconds=2020-05-01 10:36:35 +0900 chunk="5a48c308f6b093165c5cc67a9e8d050f" error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"localhost\", :port=>9200, :scheme=>\"http\"}): Connection refused - connect(2) for 127.0.0.1:9200 (Errno::ECONNREFUSED)"
2020-05-01 10:36:03 +0900 [warn]: #0 suppressed same stacktrace
2020-05-01 10:36:35 +0900 [debug]: #0 taking back chunk for errors. chunk="5a48c308f6b093165c5cc67a9e8d050f"
2020-05-01 10:36:35 +0900 [warn]: #0 failed to flush the buffer. retry_time=7 next_retry_seconds=2020-05-01 10:37:41 +0900 chunk="5a48c308f6b093165c5cc67a9e8d050f" error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"localhost\", :port=>9200, :scheme=>\"http\"}): Connection refused - connect(2) for 127.0.0.1:9200 (Errno::ECONNREFUSED)"
2020-05-01 10:36:35 +0900 [warn]: #0 suppressed same stacktrace
2020-05-01 10:37:41 +0900 [debug]: #0 taking back chunk for errors. chunk="5a48c308f6b093165c5cc67a9e8d050f"
2020-05-01 10:37:41 +0900 [warn]: #0 failed to flush the buffer. retry_time=8 next_retry_seconds=2020-05-01 10:39:39 +0900 chunk="5a48c308f6b093165c5cc67a9e8d050f" error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"localhost\", :port=>9200, :scheme=>\"http\"}): Connection refused - connect(2) for 127.0.0.1:9200 (Errno::ECONNREFUSED)"
2020-05-01 10:37:41 +0900 [warn]: #0 suppressed same stacktrace
2020-05-01 10:39:39 +0900 [debug]: #0 taking back chunk for errors. chunk="5a48c308f6b093165c5cc67a9e8d050f"
2020-05-01 10:39:39 +0900 [warn]: #0 failed to flush the buffer. retry_time=9 next_retry_seconds=2020-05-01 10:44:22 +0900 chunk="5a48c308f6b093165c5cc67a9e8d050f" error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"localhost\", :port=>9200, :scheme=>\"http\"}): Connection refused - connect(2) for 127.0.0.1:9200 (Errno::ECONNREFUSED)"
2020-05-01 10:39:39 +0900 [warn]: #0 suppressed same stacktrace
2020-05-01 10:44:22 +0900 [debug]: #0 taking back chunk for errors. chunk="5a48c308f6b093165c5cc67a9e8d050f"
2020-05-01 10:44:22 +0900 [warn]: #0 failed to flush the buffer. retry_time=10 next_retry_seconds=2020-05-01 10:53:23 +0900 chunk="5a48c308f6b093165c5cc67a9e8d050f" error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"localhost\", :port=>9200, :scheme=>\"http\"}): Connection refused - connect(2) for 127.0.0.1:9200 (Errno::ECONNREFUSED)"
2020-05-01 10:44:22 +0900 [warn]: #0 suppressed same stacktrace
リトライが繰り返されている様子が分かります(以下の記述の通り、リトライ間隔は1秒後、2秒後、4秒後、....)。
参考: How Exponential Backoff Works
ここで、もう一件メッセージを投入してみます。すると、以下のように新たなファイル(buffer.bxxx)が作成されます。
-rw-r--r--. 1 root root 157 5月 1 10:45 buffer.b5a48c544ba99f0abd59e5240b3af3aa7.log
-rw-r--r--. 1 root root 80 5月 1 10:45 buffer.b5a48c544ba99f0abd59e5240b3af3aa7.log.meta
-rw-r--r--. 1 root root 157 5月 1 10:35 buffer.q5a48c308f6b093165c5cc67a9e8d050f.log
-rw-r--r--. 1 root root 80 5月 1 10:35 buffer.q5a48c308f6b093165c5cc67a9e8d050f.log.meta
2020-05-01 10:45:00 +0900 [debug]: #0 Created new chunk chunk_id="5a48c544ba99f0abd59e5240b3af3aa7" metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag="test06.AAA", variables=nil>
30秒経過後も、ファイルの状況は変わりません。queued状態のchunkがあるため、flush_intervalを経過したstagedのchunkがあってもqueuedにはならないようです。
Step2
継続してデータを投入してみます。
今回一律で以下のようなJSONデータを投入していますが、Chunkファイルをみると1レコード157byteのようです。
{"message":"hello world", "field01":"AAA", "field02":"BBB", "field03":55, "field_date":"2020-04-29", "field_time":"01:23:45.222"}
今回のテストでは、chunk_limit_sizeを600byteに指定しているので、1つのChunkには3レコードしか入らない状態にしています。
buffer.b5a48d8ab9aed3ea69f6befa249b10f82.log というchunkにはレコードが3つ溜まった状態です。(157*3=471byte)
3299465 -rw-r--r--. 1 root root 471 5月 1 12:11 buffer.b5a48d8ab9aed3ea69f6befa249b10f82.log
3299478 -rw-r--r--. 1 root root 80 5月 1 12:11 buffer.b5a48d8ab9aed3ea69f6befa249b10f82.log.meta
1971918 -rw-r--r--. 1 root root 157 5月 1 12:11 buffer.q5a48d896ab9f51b7dcd4468c4ff308fa.log
3297661 -rw-r--r--. 1 root root 80 5月 1 12:11 buffer.q5a48d896ab9f51b7dcd4468c4ff308fa.log.meta
この状態で、さらにもう一つメッセージを投入すると、staged=>queuedになり、新たなstagedのChunkが作成されたことが分かります。
3299479 -rw-r--r--. 1 root root 157 5月 1 12:12 buffer.b5a48d8cede6f990568333b0c69777d2e.log
3301137 -rw-r--r--. 1 root root 80 5月 1 12:12 buffer.b5a48d8cede6f990568333b0c69777d2e.log.meta
1971918 -rw-r--r--. 1 root root 157 5月 1 12:11 buffer.q5a48d896ab9f51b7dcd4468c4ff308fa.log
3297661 -rw-r--r--. 1 root root 80 5月 1 12:11 buffer.q5a48d896ab9f51b7dcd4468c4ff308fa.log.meta
3299465 -rw-r--r--. 1 root root 471 5月 1 12:12 buffer.q5a48d8ab9aed3ea69f6befa249b10f82.log
3299478 -rw-r--r--. 1 root root 80 5月 1 12:12 buffer.q5a48d8ab9aed3ea69f6befa249b10f82.log.meta
さらに3つ投入すると同じようにChunkが増えます。
3301138 -rw-r--r--. 1 root root 157 5月 1 12:14 buffer.b5a48d93034899149b1a0c7747a07a998.log
3301378 -rw-r--r--. 1 root root 80 5月 1 12:14 buffer.b5a48d93034899149b1a0c7747a07a998.log.meta
1971918 -rw-r--r--. 1 root root 157 5月 1 12:11 buffer.q5a48d896ab9f51b7dcd4468c4ff308fa.log
3297661 -rw-r--r--. 1 root root 80 5月 1 12:11 buffer.q5a48d896ab9f51b7dcd4468c4ff308fa.log.meta
3299465 -rw-r--r--. 1 root root 471 5月 1 12:12 buffer.q5a48d8ab9aed3ea69f6befa249b10f82.log
3299478 -rw-r--r--. 1 root root 80 5月 1 12:12 buffer.q5a48d8ab9aed3ea69f6befa249b10f82.log.meta
3299479 -rw-r--r--. 1 root root 471 5月 1 12:14 buffer.q5a48d8cede6f990568333b0c69777d2e.log
3301137 -rw-r--r--. 1 root root 80 5月 1 12:14 buffer.q5a48d8cede6f990568333b0c69777d2e.log.meta
step3
この間、最初のqueuedのChunkの書き出しリトライが繰り返されています。
ここでElasticsearch 復旧させると、その後のリトライが成功してqueuedのChunkが1つ無くなります。
3301138 -rw-r--r--. 1 root root 157 5月 1 12:14 buffer.b5a48d93034899149b1a0c7747a07a998.log
3301378 -rw-r--r--. 1 root root 80 5月 1 12:14 buffer.b5a48d93034899149b1a0c7747a07a998.log.meta
3299465 -rw-r--r--. 1 root root 471 5月 1 12:12 buffer.q5a48d8ab9aed3ea69f6befa249b10f82.log
3299478 -rw-r--r--. 1 root root 80 5月 1 12:12 buffer.q5a48d8ab9aed3ea69f6befa249b10f82.log.meta
3299479 -rw-r--r--. 1 root root 471 5月 1 12:14 buffer.q5a48d8cede6f990568333b0c69777d2e.log
3301137 -rw-r--r--. 1 root root 80 5月 1 12:14 buffer.q5a48d8cede6f990568333b0c69777d2e.log.meta
約180s(flush_thread_burst_interval)の後、queuedのchunkが1つ消えます。
3301138 -rw-r--r--. 1 root root 157 5月 1 12:14 buffer.b5a48d93034899149b1a0c7747a07a998.log
3301378 -rw-r--r--. 1 root root 80 5月 1 12:14 buffer.b5a48d93034899149b1a0c7747a07a998.log.meta
3299479 -rw-r--r--. 1 root root 471 5月 1 12:14 buffer.q5a48d8cede6f990568333b0c69777d2e.log
3301137 -rw-r--r--. 1 root root 80 5月 1 12:14 buffer.q5a48d8cede6f990568333b0c69777d2e.log.meta
さらに、約180s(flush_thread_burst_interval)の後、queuedのchunkが消えます。これでqueuedのChunkが無くなったので、直後(恐らflush_thread_intervalのタイミング)にstagedのchunkがqueuedになって書き出されるので全部なくなりました。
まとめ
主要なパラメーターを整理するとこんなイメージなんじゃないかと思います。
また、enqueue, writeのタイミングは、ざっくりこんな感じでとらえるとよいかと思います。
- enqueueされるタイミング:flush_modeに応じたタイミング or Chunkサイズがthresholdを超えたタイミング
- writeされるタイミング: queuedのchunkが無ければ即時、queuedのChunkが残っていてキューイングされる場合はflush_thuread_burst_intervalで処理されるのを待つ