こんにちは、こちらはFluentd Advent Calendar 6日目の記事となります。
このところBufferedOutput系のpluginのoptionについて質問されることが多かったので、せっかくですのでつらつらとここで紹介していこうかなと思います。なお、コードはfluentd v0.10.55です。
optionの挙動を把握するためにまずはfluentdが受け取ったレコードをどう処理していくのかを紹介し、その後fluentdの処理方法を踏まえて各optionを紹介していきます。
レコードの流れ
はじめに、レコードを受け取ってからどう処理されてoutputされていくか簡単にレコードの処理を順を追って紹介いたします。
① レコードを受け取りbufferingする
input pluginがレコードを受け取ると、Engine
を介してそのレコードのtagにマッチするoutput pluginインスタンスのemit
メソッドが呼ばれます。output pluginがBufferedOutput
を継承している場合、bufferingされます。
② レコードをbufferに振り分ける
レコードからkeyが算出され、どのbufferにレコードを貯めていくか決定されます。fluentdではこのレコードの塊をchunkと呼んでいます。ObjectedBufferedOutput
pluginならtag、TimeSlicedOutput
pluginならtimeを元にkeyを生成します。(plugin毎に様々です)
③ レコードをchunkに追記する
②で生成したkeyを持つchunkが存在する場合はそこへ追記され、存在しない場合は新しくchunkが作られそこに追記されていきます。なお、buffer_type file
の場合にbufferingされている状態だと、chunkはhoge.bxxxxxxxxxxx
といったようなbから始まる文字列がついたファイル名となっているはずです。
④ chunkをenqueueする
output pluginのthread、つまり送信側の処理を行うthreadは、chunkを監視しており一定周期でchunkをenqueueし送信処理の準備をします。buffer_type file
だとchunkのファイル名がhoge.bxxxxxxxx
から、hoge.qxxxxxxxx
といった名前に変更されているはずです。enqueueされたchunkにはレコードが追記されることは無く、新しいレコードを受け取った場合は③と同様に新しいchunkが作られそこに追記されていきます。
⑤ chunkを送信する
enqueueした状態のchunkをoutput pluginの処理を行うthreadが1つづつ取り出しwriteメソッドを実行していきます。送信に失敗した場合はretryを行います。
以上がfluentdがレコードを受け取ってから送信されるまでの簡単な流れです。こうしたfluentdの処理の流れを踏まえてpluginのoptionについて紹介していきます。
BufferedOutput Pluginのoptionの紹介
今回は、BufferedOutput
系のpluginの基本的なoptionをターゲットとします。これらのpluginは、ストレージの前段に位置する集約サーバで利用する機会が多く、重要なoptionが多い部分です。基本となるBufferedOutput
以外に、このクラスを継承したObjectBufferedOutput
,TimeSlicedOutput
クラスがfluentdでは提供されています。
BufferedOutput
系pluginには代表的なものとして、
- out_foward (ForwardOutput < ObjectBufferedOutput)
- out_file (FileOutput < TimeSlicedOutput)
- fluent-plugin-td (TreasureDataLogOutput < BufferedOutput)
- fluent-plugin-webhdfs (WebHDFSOutput < TimeSlicedOutput)
等があり、ストレージや用途に合わせて数多く存在します。
それでは基本的なoptionを紹介していきます。
buffer_type
# file or memory
buffer_type file
bufferingする際に、fileにレコードを保存していくのか、memory上に保持するのかを選択します。流量も流速も多い場合はbuffer_type memory
の方が相性が良いですが、fluentdが落ちてしまうと保持していたデータが喪失してしまいます。私はログ欠損しないためにbuffer_type file
を選択することが多いです。
なお、buffer_type file
を選択した場合は、bufferファイルを置くためのpathを指定する必要があります。
buffer_type file
buffer_path /var/log/td-agent/buffer
このディレクトリはfleutndが起動した際に無ければ作成されます。
flush_interval
flush_interval 60s
output pluginのthreadは、flush_interval
毎にBufferのchunkをすべてenqueueします。ざっくりとした送信間隔だと捉えても良いです。ただし、この処理はQueueにchunkが存在しない場合のみ実行されます。そのためflush_interval
を短くしたからといってQueueに入っているchunkが爆発的に増えてしまうことはありません。
上のfluentdの処理の流れの図では、右上の**"output pluginのthreadが監視"**の部分にあたります。
try_flush_interval
try_flush_interval 0.1
output pluginのthreadがBufferを監視する頻度を指定できます。デフォルトでは1secで、floatに対応しています。流量も流速も大きい場合にはこの値を小さくして対応することができるようです。なお、内部的には#try_flush
メソッドを呼び出す間隔を調整するoptionで、#try_flush
メソッド内でflush_interval
のチェックをしています。(あまりまだ推奨されていないoptionのようですが)
上のfluentdの処理の流れの図では、flush_intervalと同じ部分にあたります。
retry_limit
retry_limit 18
BufferedOutput
は、Queueから取り出したchunkの送信に失敗した場合にretryを行います。retry_limit
でその上限値を設定できます。送信失敗が続きretry回数がこの値を超えてしまうと、送信しようとしていたchunkを破棄してしまうのである程度大きい値にしておいたほうが良いです。18ぐらいにしておけば1日以上はretryしてくれます。ただし、retry間隔がどんどん大きくなっていくので、送信先の復旧後もなかなか送信されない状態が続きます。ですので、他のoptionと組み合わせてretry間隔が大きくならないよう調整するか、USR1シグナルを送り強制的にflushさせるなどの処置が必要になるかと思います。
上のfluentdの処理の流れの図では、⑤の部分にあたります。
disable_retry_limit
# true or false
disable_retry_limit true
trueにしておくと、retry回数がretry_limit
を超えてもchunkを破棄せずretryし続けるようになります。送信が成功しないままだと、レコードがどんどんサーバに溜まってしまい他のoptionの設定によっては結局破棄することになる、もしくはサーバのストレージサイズの上限に達してしまうので注意が必要です。
retry_wait
retry_wait 30s
retryする間隔は、デフォルトだとretry回数に対して指数関数的に増えていきます。その間隔を一律でretry_wait
にします。
max_retry_wait
max_retry_wait 300s
こちらはretry_wait
と違い、retry間隔の上限値を設定します。retry回数が増えてもretry間隔がmax_retry_wait
以上になることはなくなります。
num_threads
num_threads 8
送信処理を行うoutput pluginのthread数を設定できます。デフォルトだと1threadが多いので、Queueに入ったchunkは1つづつ送信されます。num_threads
を多くすれば、Queueに入ったchunkがthread数ずつ送信されていきます。マシンの性能と、ネットワーク帯域、送信先の性能を鑑みて調整します。
上のfluentdの処理の流れの図では、**"output pluginのthreadが監視"**と⑤の部分にあたります。
queued_chunk_flush_interval
queued_chunk_flush_interval 20s
Queueに入っているchunkを送信する間隔。flush_interval
より短い値にしていることが多く、ログのはき方によってはたくさんのkeyが生成される可能性があるため、chunk数によっても調整が必要です。複数thread立っている場合は、queued_chunk_flush_interval
毎に全threadが一斉に動き、Queueからchunkを取り出していくことになります。
上のfluentdの処理の流れの図では、⑤の部分にあたります。
time_slice_format
time_slice_format %Y%m%d_%H%M
TimeSlicedOutput
クラスは、recordのtimeや現在時刻を使ってkeyを生成しています。つまりは、recordをどの時間の単位でかたまりとするかを指定することになります。例えば、上の利用例の場合なら1分毎のかたまりになるようにchunkが生成されます。(e.g. file.20141205_1137_0.log)
formatの指定方法はこちらを参考に。
time_slice_wait
time_slice_wait 60s
TimeSlicedOutput
では、enqueueする際にtime_slice_wait
時間だけ待ってからenqueueしています。つまり、buffering状態のchunkから確定状態のchunkになるまでにtime_slice_wait
だけラグがあることになります。recordが送られてくる際にどうしても遅延が発生してしまうので、その遅延を考慮して設定します。
Bufferのoptionの紹介
BufferedOutput
pluginを利用する際についてまわるBuffer
に関するoptionについても紹介しておきます。
buffer_chunk_limit
# 32k -> 32kbyte
buffer_chunk_limit 32m
chunkの最大サイズ。emit時、追記対象のchunkに書き込む際にchunkサイズがbuffer_chunk_limit
を超えてしまう場合は、そのchunkをenqueueします。その後、新しいchunkを作成しそこへrecordを書き込んでいきます。chunkの最大サイズを設定できるので、この値とflush_interval
, queued_chunk_flush_interval
の値を調整することで、精度は高くありませんが擬似的に帯域制限をすることができます。
buffer_queue_limit
# 個数
buffer_queue_limit 2048
Queueに存在できるchunkの最大数です。これに達するとログを受け取れなくなります。buffer_chunk_limit
を小さい値にするとQueue内のchunkが多くなる可能性があるので、この値を上げておきます。
どの程度この値を上げておけばよいかは、サーバの利用できるストレージサイズを参考にすると良いと思います。例えば、送信先のサーバで障害が起きた際に、ログの欠損を少しでも減らすためになるべく多くのログを一時保持しておき、障害が復旧した後に送信したいかと思います。そうした場合貯めておける最大値がサーバのストレージのサイズなので、buffer_chunk_limit
x buffer_queue_limit
の値が空いているストレージのサイズくらいになるように調整しています。(buffer_type fileなのでストレージサイズを考慮しています。)
まとめ
fluentdがレコードを受け取ってから送信するまでの簡単な流れと、BufferedOutput
とBasicBuffer
の基本となるoptionの紹介を致しました。送られてきたログを次に送信するまでの間隔を制御するためのoptionや、chunkの大きさを決めるためのoptionの組み合わせで細かい設定ができます。
fluentdで起きる障害は、inputに不正なレコードが送られてきた時やネットワーク周りのトラブルが多いです。しかし意外と陥りがちなのが、WebHDFSやRedisなどの送信先のサービスの特性に合わせたoptionを設定ができていないケースです。こうしたトラブルの解消に、少しでもこの記事が参考になれば幸いです。
また、現在fluentdのmaster branchで開発中の修正が加わると多少メソッドの呼出し関係が変わるかと思いますので、チェックしておきたいところです。というわけでFluentd Advent Calendar 6日目の記事でしたmm