1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

試験用のfluentdサーバーを作る

Posted at

fluentdを使うシステムを作っていく過程で、そもそもfluentdにデータが送られていることをどうテストしようかと悩んだので色々調べてみた。

やりたいこと

fluentdを使うシステムを作っていく過程で、fluentdにデータが送られていることを確認するテストをサクッと書きたい。

方針

CIを行う上で出来る限りテストの時間を短くしたいので、FluentLoggerで投げたデータをなるべくリアルタイムでoutputに連携したい。

以前は自分でソケット開いて待ち受けるコードを書いていたけど、バージョン上がってSSLが絡んで面倒になったので、なるべくpluginとかでなんとかしたい。

CIを回す際にdockerを使って周囲のミドルウェアを立てることが多いので、以前に一度Elasticsearchをミドルウェアキットに入れて試してもみたが、直接呼んでないElasticsearchをわざわざ建ててテストのためにelasticsearch-ruby入れて繋ぎ込むのもだるい。。。
特に公式イメージだとシングルノードで起きないし、バージョン変わった時に全部のマイクロサービスで追従してアップデートするのが面倒なので、出来る限り追加のミドルウェアなしでテストしてたい。

方式検討

リアルタイム性

fluentdのバッファリングはいつもflush_intervalの値をチューニングしながらやってるけど、テストだったらflush_mode=immediateで行けそう

CIでの使い勝手

今回はActiveJob等でも使ってるRedisをうまく活用できないか考えたところ、公式のプラグインリストに良さげなプラグインがあったので試してみる。

MySQLのもあったけど、FluentLoggerから送るデータ構造が割とバラバラなので、どっちかというとそのまま入れてくれるredisのプラグインの方がマッチしてた。

実際にやってみた

方式検討を踏まえつつ必要なGemのインストールとfluentdの設定を進めていく。

0. 準備

gemは本体の他にfluent-plugin-redis-storeのプラグインがあればいいので、まとめて入れてしまう

command
gem install fluentd fluent-plugin-redis-store --no-document

Redisのインストールについては割愛するが、brewなりdockerなりを使ってredisは建てておく。

fluentdはforwardから受け取り、redis_storeに出していくので、fluent.confはこんな感じになる。

fluent.conf
<source>
  @type forward
  @id input1
  @label @mainstream
  port 24224
</source>

<label @mainstream>
  <filter **>
    @type record_transformer
    <record>
      tag ${tag}
    </record>
  </filter>
  <match **>
    @type copy
    <store>
      @type stdout
    </store>
    <store>
      @type redis_store
      key_path tag
      <buffer>
        flush_mode immediate
      </buffer>
    </store>
  </match>
</label>

やってることはこんな感じ。

  • sourceとしてforwardから受け取ったものに@mainstreamのラベルをつける
  • filterとして@mainstreamの全てのデータに対してtagというキーでタグの値をセットする
  • matchとして@mainstreamの全てのデータに対してstdoutとredis_storeの2つへアウトプットする
  • redis_storeの設定として、Redisのキーとして使う値に上記のfilterでセットしたタグを設定する
  • redis_storeのバッファ設定として、flush_mode immediateとしてバッファに入った後すぐにアウトプットする

stdoutに出すのはおまけだけど、出しておくと後々デバックしやすい。

これを./fluent.confに書き込み、fluentdを起動する

fluentd -c fluent.conf

これで準備が整ったので、データを投げ込んでみたいところだが、Redisをモニタリングしておいた方が動きがわかりやすいので、別窓を開いてモニタリングしておく

redis-cli monitor

1. fluentdにデータを投げる

fluentdにデータを送るところは、実際のアプリケーションは fluent-logger-rubyを使ってデータを送るが、今回はfluent-catを使って代用してみる

echo '{ "key" : "sample" }' | bundle exe fluent-cat debug.test

これは{ "key" : "sample" }というデータに debug.test というタグと現在時刻のタイムスタンプを付与してデータを送信してくれるコマンド。設定しない場合はlocalhost宛になるので、上記で建てたfluentdサーバーに対してデータが送信される。

そうするとfluentd側の標準出力に下記のような出力が出る。

fluentdの実行ウィンドウ
2020-11-03 09:37:28.016591000 +0900 debug.test: {"key":"sample","tag":"debug.test"}

これはfluent.confに設定した@type stdoutが出力しているもので、実際にデータを受け取った後にfilterがタグを付加してこの形になっていることが確認できる。

さらにRedisをモニタリングしている窓を見ると下記のように表示されている。

redis-cliの実行ウインドウ
1604363848.025602 [0 172.28.0.1:40998] "zadd" "debug.test" "1604363848.016591" "{\"key\":\"sample\",\"tag\":\"debug.test\"}"

pluginのreadmeに書いてあるとおりzaddされている。keyはkey_path tagの設定が効いていて、データに含まれているtagの値であるdebug.testがセットされている。

2. 投げたデータの確認

Redisからデータを取得してみる。

redis-cli zrange debug.test 0 -1 withscores
1) "{\"key\":\"sample\",\"tag\":\"debug.test\"}"
2) "1604363848.016591"

タイムスタンプがスコアになってるので大量に入れてもいい感じにソートされる。

ちなみにrubyでかくとこんな感じ。

require 'redis'

Redis.new.zrange 'debug.test', 0, -1, withscores: true
output
 => [["{\"key\":\"sample\",\"tag\":\"debug.test\"}", 1604363848.016591]]

とりあえずシンプルに値は取れるので、割と使い勝手は良さそう。

おまけ

並列テストでの使い勝手を考えてみる

parallel_teststest-queueを使ってテストの並列実行をした際、どのテストプロセスが入れたログなのか判定する必要が出てくる。その時には送るデータにプロセスIDを含めておいて、それを使ってキーを設定する形にしてしまえばいい。

プロセスIDや時間、リクエストIDはRailsのログファイルにも入れておきたいものなので、特に害にはならないと考える。

その場合送るデータは下記のようになる

{ "key" : "sample", "pid" : 123 }

これに伴ってfluent.confも下記のように変更する。

  <label @mainstream>
    <filter **>
      @type record_transformer
      <record>
        tag ${tag}
+       tag_with_pid '${tag}.${record["pid"]}'
      </record>
    </filter>
    <match **>
      @type copy
      <store>
        @type stdout
      </store>
      <store>
        @type redis_store
-       key_path tag
+       key_path tag_with_pid
        <buffer>
          flush_mode immediate
        </buffer>
      </store>
    </match>
  </label>

これでRedisに登録される時のキーにプロセスIDが入るので、あとはそのプロセスIDを使って自プロセスのログを特定し、それを評価することで実際にデータが送られたのかどうか判定することができるようになる。

Redisの宛先を変える

READMEに書いてあるように下記の形で設定を変えられるらしい

  <label @mainstream>
    <filter **>
      @type record_transformer
      <record>
        tag ${tag}
      </record>
    </filter>
    <match **>
      @type copy
      <store>
        @type stdout
      </store>
      <store>
        @type redis_store
        key_path tag
+       host 10.0.0.1
+       db   11
        <buffer>
          flush_mode immediate
        </buffer>
      </store>
    </match>
  </label>

RSpecやCucumberで使う時のヘルパー

実際にテストで使う時にはこんな感じのヘルパー作れば楽に使えそう

# frozen_string_literal: true

require 'redis'

module FluentdLogHelper

  def fetch_fluentd_log_by(tag:, pid: nil)
    redis_key = pid ? "#{tag}.#{pid}" : tag
    redis.zrange redis_key, 0, -1
  end

  def redis(options = {})
    options[:db] ||= 0
    @redis ||= Redis.new(options)
  end

end
シンプルな呼び方
fetch_fluentd_log_by tag: 'debug.test'
プロセスIDを伴う呼び方
fetch_fluentd_log_by tag: 'debug.test', pid: Process.pid
1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?