fluent-plugin-beatsを作った

  • 47
    いいね
  • 0
    コメント
この記事は最終更新日から1年以上が経過しています。

最近Elastic社が力を入れているBeatsと呼ばれる,各種イベントに特化したエージェントがあります.

Beats: Data Shippers for Elasticsearch | Elastic

現状だとシステムのメトリクスを集めるtopbeats,ネットワーク周りの情報を集めるpacketbeat,ファイルを監視してログを集めるfilebeat(fluentdのin_tailみたいなもの)があります.

これらは単体でElasticsearchに投げることが出来るんですが,フィールドを加工したりとか,様々なデータストアに投げるとか,あまり複雑なことは出来ません.
ですが,そういう時のためにlumberjackというプロトコルを使って外部に投げることが出来るようになってます.ログを集めて加工したり転送したりといえば,軽量でロバストなFluentdというビンゴなソフトウェアがあるので,これらとやりとり出来たら便利だなぁとfluent-plugin-beatsを作りました!

fluent-plugin-beats

ということで,以下プラグインの説明をします

インストール

いつも通りgem経由です

$ gem install fluent-plugin-beats --no-document

基本設定

<source>
  @type beats
  tag beats.event
</source>

複雑な設定はありません.デフォルトで0.0.0.0:5044で起動し,topbeat,packetbeat,filebeatすべてのイベントを受け付けます.この設定だと固定タグですが,tagの代わりにmetadata_as_tagを設定すると,各イベントが持っているメタデータの設定を使うようになり,topbeatならtopbeatというタグが付くようになります.以下の"Elasticsearchに投げる設定例"を参照してください.

filebeatが送ってくるイベントにはログそのものが含まれているので,これをbeatsプラグインの中でパースしたい場合には,in_tailのようにformatを指定すれば処理してくれるようになります.

その他書いてない設定に関してはREADMEの設定セクションを参照してください.

Elasticsearchに投げる設定例

各beatはlogstashのインデックスと同じ仕組みでインデックスを作るので(xxxbeat-YYYY.MM.DD),用意されているテンプレートを使うためには,それらに合わせる必要があります.以下が設定の実例です.

<source>
  @type beats
  metadata_as_tag
</source>

# xxxbeatのログを全部ESに投げる
<match *beat>
  @type elasticsearch_dynamic
  logstash_format true
  logstash_prefix ${tag_parts[0]}  # metadata_as_tagによりタグに各beatの名前が入っているので,それを使う
  type_name ${record['type']}      # レコードにインデックスのtypeが含まれているので,それを利用する
  flush_interval 10s
</match>

一つにまとめるためにelasticsearch_dynamicを使ってますが,filebeatはデフォルトで常にtype_namelogなので,流量が大きければ通常のelasticsearchにするのも手です.

もちろん,copyプラグインを使ってS3やTreasure Dataみたいなサービスにバックアップ用途で投げることが出来ます.topbeatのデータは長期に保存する必要性は薄いですが,filebeatの結果はログで結構膨れるので,その手のサービスに入れておくのは有りだと思います.

以下は各beatの出力をfluentd経由で保存した結果になります.beatから直接送ったのとちゃんと同じ結果になっていると思います.

注意点

コネクション毎にスレッドを作る

このプラグインは,logstash-input-beatsプラグインが使っているlumberjackモジュールを流用していますが,このモジュールが1コネクション毎にスレッドを作る前提の実装になっているので,たくさんのbeatからアクセスが来るとそれだけスレッドが立ち上がります.
max_connectionsという設定で最大数を制御できるので,問題になりそうな方は設定してください.一応スレッドプールとかを使ってはいますが,解決するにはCool.ioとかのモジュールで根本から修正しないと行けない.

filebeatとの相性

lumberjackモジュールがバルクでデータを取得するというAPIを提供してくれていないので(受け取ったデータをばらして渡してくる),実は流量が多いとそんなに効率がよくありません.これはlumberjackが1レコードずつackを送信元に返すプロトコルであるのにも起因しています.

10万行のnginxログを食わせてflowcounter_simpleを使って手元のMBPでパフォーマンスを計測してみたところ,fluentdのin_tailがformat noneで80,000/sec,fluent-agent-hydraが100,000+/sec,filebeatが18,000/secと,同一マシン内ですら遅い結果になっています.実環境だとネットワークを跨ぐので,1レコードずつのackによる速度低下がもっと顕著になると思います(Filebeat performance when sending to Logstasというissueで同じくパフォーマンス問題が指摘されている).
パフォーマンスを気にするなら,fluentdを直接使うか,末端ではよりエコなfluent-agent-hydraを使う方が良いと思います.

logstash-input-beatsの実装

fluent-plugin-beatsはいくつかの部分でlogstash-input-beatsの実装を参考にしました.で,色々と気づいたことがあったので,logstash-input-beatsを使う方のためにメモ代わりに書いて起きます.

  • logstashの内部キューが詰まるとレスポンスを返さなくなる

logstashの内部キューがブロッキングキューなのは有名ですが,それらで問題があった時のために,コネクションをacceptするまえにCircuitBreakerで処理するかどうか判断しています.で,CircuitBreakerが働いている限りはひたすらsleep(0.5)をループし続けるだけの処理になっていて,だんまりしてしまいます.
設定でそのタイムアウトを調整できるようなので,気になる方は調整しましょう.

  • beatsプラグイン自体もオンメモリキューを持っている

logstashはFluentdや他のログコレクタと違い,データをためる内部キューやOutputのバッファが現在オンメモリでしか持てないという非常に厳しい制限があり,かつブロッキングなのは上で述べました.で,実はこのキューはブロッキング時のタイムアウトすら設定出来ないという実装になっているらしく,上記のタイムアウトを実現するために,さらにbeatsプラグインの中で別のタイムアウト可能なオンメモリキューをもつことでタイムアウトを実現しています.
その結果,オンメモリのブロッキングキューを二つ持つことになり,処理がブロックしやすく,データの欠損がさらに起きやすくなっているという感じです.気をつけましょう.

まとめ

個人的にBeats自体はかなり使いやすさそうだなぁと感じていて,組み込み向けエージェントならfluent-bit,通常のサーバとかならBeats,ログ集約・転送にはfluentdと,それぞれ分けて使えそうだなぁと思っています.topbeat/packetbeatを使えば,Elasticsearchを運用出来るリソースがあれば,Kibanaなどと合わせて簡易的なモニタリングシステムを構築できるので,今後この辺のユーザは増えそうですね.

追記

Elasticsearch勉強会#14で話したスライド: Fluentd meets Beats