fluentdを使う時にまず知っておいたほうがよさそうなこと
はじめに
- 朝からElasticsearchへのデータの投げ込み方を考えていました。
- データベースやメッセージキューなどにデータを投げ込んでおいて、ニアリアルなバッチでElasticsearchに投げ込むよりも、fluentdを使う方が圧倒的に簡単で信頼性が高いものができますね。自分で作りこむのがバカらしくなりますね。
- ということで、fluentd利用時に気を付けておきたいことについて調べてみました。内容は公式ドキュメントの内容をベースに自身で調べたことを追記しています。公式ドキュメントへのリンクも貼ってありますので適宜そちらをご覧いただければと。
環境
- CentOS6.7
- td-agent 0.12.19
- Ruby2.2.2(リストアスクリプトで利用)
- Fluent-Logger(0.5.1)
- Elasticsearch2.1.1
fluentd vs logstash
- Fluentd vs. Logstash: A Comparison of Log Collectors
- Fluentd vs Logstash
- Fluentdの現実装のPros/Cons
- これ結構悩みますよね。
- 私がやりたかったことはSQSとElasticsesarch,S3にデータを流し込めれば良かったのですが、両者ともプラグインは公開されていました。また、それ以外のプラグインもたくさんあるようでして、プラグインの数などは甲乙付け難い。
- 構成面で言うとActive-Standby構成はどちらも取れますので、私が満たしたい要件は揃っているように思えました。
- 結局触ってみた結果、バッファリング機能とロードバランス機能の部分の差でfluentdを使うことにしました。
fluentdのインストール
- いつも通りansible playbookにおとしてあります。 - ansible-td-agent
NTP
- 当然ですけど設定しておきましょう。ログでタイムスタンプズレは話にならないので。
File Descriptors
- File Descriptorsが足りなくなる恐れがあるので増やしておく。ただし、最新のバージョンだと起動スクリプト内に以下の記述があるため特に設定は不要
ulimit -n 65536 1>/dev/null 2>&1 || true
ネットワークカーネルパラメータの変更
- マニュアルにはTCP_WAITが問題になっている場合は設定しなさいという注意書きがある。
- ソケットの再利用設定を行い効率的に通信する設定やローカルポートが枯渇するような場合に備えてレンジを増やしている。反映は再起動か、sysctl -w。
/etc/sysctl.conf
net.ipv4.tcp_tw_recycle = 1
net.ipv4.tcp_tw_reuse = 1
net.ipv4.ip_local_port_range = 10240 65535
監視 - Monitoring Fluentd
monitor_agent
- pluginの状態の監視が可能です。buffer_queue_lengthとかbuffer_total_queued_sizeとかわかるのはありがたいですね。設定はtd-agent.confに以下を追記して再起動するだけ。
/etc/td-agent/td-agent.conf
<source>
@type monitor_agent
bind 0.0.0.0
port 24220
</source>
- httpのポートがlistenするので監視サービスからのhttp監視として利用しても良いと思われます。
- curlでアクセスすることによってpluginの情報が抽出できます。.jsonを付けないとtsv形式で抽出可能です。
[root@es1 restore]# curl -s http://localhost:24220/api/plugins.json | jq .
{
"plugins": [
・・・・
{
"retry_count": 0,
"buffer_total_queued_size": 0,
"buffer_queue_length": 0,
"output_plugin": true,
"config": {
"retry_wait": "1s",
"retry_limit": "16",
"flush_interval": "1s",
"buffer_queue_limit": "10",
"type": "elasticsearch",
"host": "192.168.1.41",
"port": "9200",
"type_name": "access_log",
"logstash_format": "true",
"logstash_prefix": "test-apache-access",
"buffer_type": "memory",
"buffer_chunk_limit": "10m"
},
"type": "elasticsearch",
"plugin_category": "output",
"plugin_id": "object:3fb52c489614"
}
]
}
- これらをmonitoringするサービスをTreasureData社が無料で提供しているようでしてちょっと試してみましたがすごく簡単にできそうです。 - Treasure Agent Monitoring Service
プロセス監視
- 2つのrubyプロセスが上がっていることを監視しておく
[root@es1 restore]# ps w -C ruby -C td-agent --no-heading
23228 ? Sl 0:00 /opt/td-agent/embedded/bin/ruby /usr/sbin/td-agent --log /var/log/td-agent/td-agent.log --use-v1-config -
23231 ? Sl 0:00 /opt/td-agent/embedded/bin/ruby /usr/sbin/td-agent --log /var/log/td-agent/td-agent.log --use-v1-config -
ポート
- 24220(defaultの場合)は監視しておく。
- fluent debugなるものがあって、このポートはトラブルシューティングの為にローカルからのアクセスだけでも良いので空けておいた方がいざという時によさそうです。
<source>
@type debug_agent
bind 127.0.0.1
port 24230
</source>
- 具体的な使い方はfluent-debugと戯れるを参照。
flowcounter
- どのくらいの流量が流れているかってモニタリングしたいですよね。そんなときはflowcounter pluginを使うとよさそうです。
- インストール方法
td-agent-gem install fluent-plugin-flowcounter
- 設定はaggregator側でcopyを使って設定します。今回はElasticsearchへの投入とfileへの出力を同時に行うような設定になります。
- unitは単位で、muniteを設定すると分単位での設定になります。aggregateは何を取るかで、今回は全てを取得するような設定になっています。ちなみにtag名はflowcountと設定してありますが省略した場合もflowcountになります。
<match test.apache.access>
@type copy
<store>
type elasticsearch
host 192.168.1.41
port 9200
・・・ Elasticsearchへデータ投入する設定
</store>
<store>
@type flowcounter
count_keys *
unit minute
aggregate all
tag flowcount
</store>
</match>
<match flowcount>
type file
path /var/log/td-agent/traffic
time_slice_format %Y%m%d%H%M
</match>
- 出力結果
[root@es1 td-agent]# tail -f /var/log/td-agent/traffic.20160127_0.log
2016-01-27T04:56:05+00:00 flowcount {"count":5,"bytes":995,"count_rate":0.08,"bytes_rate":16.58}
- Fluentd Meetup #2 @外道父 Fluentdを優しく見守る監視事例を参考にさせていただくと1時間に1回、月別のファイルを作成するように運用されているそうです。ここは想定されるトラフィックなどに合わせて考慮が必要ですね。
シグナルとAPIを使ったハンドリング
- シグナル or APIを叩くことで停止やバッファのflushなんかを行えます。
- APIを叩けるようにするにはtd-agent.confに設定して再起動するだけ。APIとシグナルは同じ動きをするので外部からAPIを叩くようなアプリケーションを作ろうとしない限りシグナルだけで十分だと思われます。
<system> rpc_endpoint 127.0.0.1:24444 </system>
gracefulに停止
- 1回だけバッファのflushを試みたあと、gracefulに停止させる
- SIGINT or SIGTERM
- /api/processes.interruptWorkers
- /api/processes.killWorkers
バッファのflush
- flush_intervalを維持したまますぐにバッファをflushする
- SIGUSR1
- /api/plugins.flushBuffers
設定のリロード
- SIGHUP
- /api/config.reload
ログの設定
td-agent.logの設定
- td-agent.logのログレベルの変更などは/etc/td-agent/td-agent.confに記載します。
- ログレベルはdefaultでinfo.trace,debug,info,warn,error,fatalから設定可能です。
- suppress_repeated_stacktrace:連続して同一エラーが発生した場合抑制します。
- emit_error_log_interval: 指定時間内の同一エラーを抑制します
- suppress_config_dump: 起動時に設定ファイルがtd-agent.logに出力されますが、それを抑制します。
<system>
log_level trace
suppress_repeated_stacktrace true
emit_error_log_interval 60s
suppress_config_dump false
</system>
td-agent.logのローテート
- logrotateでローテートします。TreasureDataのGithubにサンプルがありました。↓
- td-agent/td-agent.logrotate
ディスクバッファを使うかメモリバッファを使うか
- 参考:td-agent のメモリバッファとファイルバッファでどんな違いが発生するか観察してみた
- ディスクバッファの方が遅いんじゃないかと勝手に思い込んでいたのですがどうやらそうでもないようです。データロストする可能性も考慮するとディスクバッファを使っておいた方が安全ですね。ディスクバッファを使っておけば次回はそこから再送してくれますし、キューがあふれる可能性もなくなります。
- 念のため性能テストして要件が満たせることだけは確認しておけばよさそうです。
High Availability
- fluentdは単体でも信頼性の高い再送処理を行ってくれるほか、active-standby構成や、負荷分散構成、さらにactive-standby構成でも救えなかった場合にべつの媒体(ファイルなど)に保管しておくことが可能です。これを実際に見ていきたいと思います。
どんな構成で試したのか
- apacheのアクセスログを①のサーバのtd-agentが拾い、②のサーバで受け取りElasticsearchに投入する構成としました。②と③のサーバはクラスタが組まれており、それぞれのサーバにtd-agentをインストールしてあります。
|番号|host|IPアドレス|説明|
|:--|:--|:------------|:-----------|:-----------|
|①|client|192.168.1.10|httpd,td-agent(forwarder)|
|②|es01|192.168.1.41|Elasticsearch,td-agent(aggregator)|
|③|es02|192.168.1.42|Elasticsearch,td-agent(aggregator)|
インストールについての補足
- すべて自前のansible playbookで入れました。2,3分あれば3台分入れられます。
- td-agent
- Elasticsearch
なぜローカルのtd-agentに一度送るのか?
- 直接aggregatorノードに送信することは可能ですが、ネットワーク断などが発生したときに再送処理が必要になり、考慮することが増えてしまいます。なので一度ローカルのtd-agentに送信してキューイングさせてからaggregatorに送るがよいかと思います。
- これは非同期でメッセージングする時などのアーキテクチャでは良くあるパターンですね
①のサーバから②のサーバに送るだけの設定
- 下記の設定はapacheのアクセスログをtailで拾って、es01(192.168.1.41 24224)に送信する設定です。
- 実際に送信するときに②のサーバがダウンしていたらどうなるかというと、retry_limitで設定された回数分リトライし続け、その間にes01が復旧したら自動的に再送してくれます。
- retryする秒数は指数関数的に増えていくのですがそれはこちらの記事「BufferedOutput pluginの代表的なoptionについて」のretry_wait,retry_limit,max_retry_waitあたりがわかりやすいので読んでいただければと思います。
- 復旧するまでの時間バッファリングし続けることは可能でして、buffer_chunk_limit x buffer_queue_limitで決まります。メモリバッファを使う場合はこの量バッファリングできるくらいメモリの確保は必要でしょう。
/etc/td-agent/td-agent.conf
<source>
type tail
format apache2
path /var/log/httpd/access_log
tag test.apache.access
pos_file /var/log/td-agent/position/access_log.pos
types code:integer,size:integer
</source>
<match **>
type forward
send_timeout 60s
recover_wait 10s
heartbeat_interval 1s
phi_threshold 8
hard_timeout 60s
retry_limit 2
retry_wait 2s
max_retry_wait 30s
<server>
name es01
host 192.168.1.41
port 24224
</server>
</match>
- リトライしている間にノードがダウンしてしまったら・・・とか、大量のデータを再送するのが怖い場合は、ある一定のリトライ回数を超えたらファイルに落としてしまうのもありだと思います。
- ファイルに落とすにはの下にを追記します。
<secondary>
type file
path /var/log/td-agent/forward-failed
</secondary>
- このようにしておくと/var/log/td-agent/forward-failed.tag名_0.logなどというファイルができ、ここから復旧することが可能です。Elasticsearchに大量のデータを投入し直すときはrefresh_intervalを長めに設定しておくこともお忘れなく。
Active-Standby構成
- es01がダウンした場合に、es02に送りたい。という場合はタグを追加し、standby側にstandbyと記述します。
- es01へのヘルスチェック間隔はheartbeat_intervalで指定されており、hard timeoutで指定された秒数が経過するとダウンしたと見なされes02に送ろうとします。
- standbyが落ちた時のことを考慮して、でファイルに落としておくと尚良いと思われます。
/etc/td-agent/td-agent.conf
<source>
type tail
format apache2
path /var/log/httpd/access_log
tag test.apache.access
pos_file /var/log/td-agent/position/access_log.pos
types code:integer,size:integer
</source>
<match **>
type forward
send_timeout 60s
recover_wait 10s
heartbeat_interval 1s
phi_threshold 8
hard_timeout 60s
retry_limit 2
retry_wait 2s
max_retry_wait 30s
<server>
name es01
host 192.168.1.41
port 24224
</server>
<server>
name es02
host 192.168.1.42
port 24224
standby
</server>
</match>
負荷分散構成
- es01とes02に負荷分散したい場合は以下のように設定します。
- weightで偏りを設定します。合計で100である必要はなく、割合を記載すれば良いです。ちなみにデフォルトは60。
<server>
name es01
host 192.168.1.41
port 24224
weight 60
</server>
<server>
name es02
host 192.168.1.42
port 24224
weight 60
</server>
パフォーマンスチューニング
- 公式ドキュメントを見るとtopコマンドとかでCPU,Memory,DiskIOどれがボトルネックなのかは見ておくと良いよ。と書かれています。
- CPUが多くの場合ボトルネックになるようで、複数のCPUコアを利用するためin_multiprocessプラグインというのを使うと良いとのこと。
- まぁ、私が今仕事でやろうとしているユースケースでは必要になることはなさそうなのですが、こちらの記事「ロードバランス + マルチプロセス化で Fluentd のログ収集を効率化する」に詳しく記載されています。
- 使うか使わないかはおいといて、いざという時の為に始めから入れておいても良いかもしれません。
UI
- bootstrapで作られたUIも用意されているようです。日本語です。ちょっとびっくりしました。
- 起動するにはこんな感じ
[root@es1 td-agent]# td-agent-ui start
Puma 2.11.1 starting...
* Min threads: 0, max threads: 16
* Environment: production
* Listening on tcp://0.0.0.0:9292
- 下記のURLでアクセスし、アカウントはadmin、パスワードはchangemeでログイン可能です。利用する場合は初期パスワードを変更しておきます。
http://host:9292/
- UIからセットアップできたり、インストールされているプラグインの一覧などを見ることができます。運用ではあまり使わないのかな。