1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Fluentd: kubernetesでの複数行ログ対応

Last updated at Posted at 2020-10-04

はじめに

Kubernetesでのログ収集にfluentを使うのは結構メジャーな方法だと思います。
実際にやってみて分かったのですが、kubernetes上で動いているアプリケーションが複数業のログを出した場合、
kubernetes上ではログが複数に分割されてしまいます。
Fluentdではログの固定フォーマットに従って、ログ内容をパースできるのがメリットの一つだと思うのですが、
このままだとパースができなくなってしまいます。

具体的にはこんな感じ。

original-logs
2020-10-04T14:07:22.234Z ERR Authorization error because you have to be a member of following groups
XXXX
YYYY
ZZZZ

これがこんな感じに分割されてしまいます。

{"log":"2020-10-04T14:07:22.234Z ERR Authorization error because you have to be a member of following groups\n","stream":"stderr","time":"2020-10-04T14:07:22.234Z"}
{"log":"XXXX\n","stream":"stderr","time":"2020-10-04T14:07:22.434Z"}
{"log":"YYYY\n","stream":"stderr","time":"2020-10-04T14:07:22.634Z"}
{"log":"ZZZZ\n","stream":"stderr","time":"2020-10-04T14:07:22.639Z"}

最終的にログをDBなり、Elasticsearchなりに集約すると思いますが、これだと可読性が下がってしまいますよね。。。
色々調べてみましたが、ログの結合とパースを(私のやりたいように)両方やっている事例はなかったので、試行錯誤しなんとかして記事にしてみました。
どなたかのお役に立つと嬉しいです。

ちなみに、fluent内ではrubyによるコードを使うことができるのですが、
筆者はRubyの知識がありません。
最後のパース周りでRubyを使っているのですが行き当たりばったりベースでやっているので、
Rubyのエキスパートには改善点が見つかるかもしれません。

その際には是非、優しく有意義なご意見をいただけると幸いです。

環境

kubectl: v1.16.13
k8s server: AWS EKS
fluentd: 1.11.3

Fluend config

早速解決策です。
まず、使用するpluginです。

fluent-plugin-concat (2.4.0)
fluent-plugin-record-reformer (0.9.1)

調べる時間もなかったので、既存のイメージを編集して使いました。

Dockerfile
FROM fluent/fluentd-kubernetes-daemonset:v1.11.3-debian-kinesis-1.0
RUN fluent-gem install fluent-plugin-record-reformer

ここで作ったイメージをAWS ECRにアップロードし、Daemonsetとして使用しています。
このイメージはkubernetes上のログもデフォルトで収集対象としていますが、
上記の分割されてしまったログの結合対応はされておらず、1行ずつ出力されるようになっています。

これを変更します。

もともとのロジック

メインのコンフィグファイルは/fluentd/etc/fluent.confです。

/fluentd/etc/fluent.conf
@include "#{ENV['FLUENTD_SYSTEMD_CONF'] || 'systemd'}.conf"
@include "#{ENV['FLUENTD_PROMETHEUS_CONF'] || 'prometheus'}.conf"
@include kubernetes.conf

Kubernetes上のアプリログは、/var/log/containers/*.logに格納されます。
kubernets.confが上記に対応するコンフィグなので、こちらを上書きます。

元々のファイルに以下を追記してます。

kubernetes.conf

    <filter kubernetes.**>
      @type concat
      key log
      multiline_start_regexp /^\d{4}-\d{1,2}-\d{1,2}.*/
      separator ''
      flush_interval 5
      timeout_label @NORMAL
    </filter>

    <match **>
      @type relabel
      @label @NORMAL
    </match>

    <label @NORMAL>
      <match kubernetes.**>
        @type record_reformer
        tag ref.kubernetes
        enable_ruby true
          time ${begin log.scan(/^(\d{4}-\d{1,2}-\d{1,2}T\d{1,2}:\d{1,2}:\d{1,2}.\d{1,3}Z?)/).first.first rescue "" end}
          level ${begin log.scan(/^\d{4}-\d{1,2}-\d{1,2}T\d{1,2}:\d{1,2}:\d{1,2}.\d{1,3}Z (\w{3}?)/).first.first rescue "" end}
          message ${begin log.gsub(/\n/, 'NNN').scan(/^\d{4}-\d{1,2}-\d{1,2}T\d{1,2}:\d{1,2}:\d{1,2}.\d{1,3}Z \w{3} (.*?)$/).first.first.gsub(/NNN/, '\n') rescue log end}
      </match>
      <filter ref.kubernetes>
          @type record_transformer
          enable_ruby
          <record>
            container ${record.dig("kubernetes", "container_name")}
          </record>
      </filter>
      <match **>
          @type kinesis_streams
          @id out_kinesis_streams
          region "ap-northeast-1"
          stream_name "xxxxx"
          include_time_key "xxxxx"
          <buffer>
            @type file
            path /var/log
            flush_at_shutdown true
            flush_interval 1
            chunk_limit_size "1m"
            flush_thread_interval 0.1
            flush_thread_burst_interval 0.01
            flush_thread_count 15
          </buffer>
        </store>
      </match>
    </label>

以下、解説です。

1. @type concat

GithubのREADMEにある、kubernetesでの活用事例ほぼそのままです。
アプリケーションログに規則性(改行されたとしても、必ず日付がログの一番最初に来る)を利用し、multiline_start_regexpを使って、multilineの終了を判別しています。

また、例外によるエラーはこのフォーマットにのらないことが多いので、timeoutとtimeout_labelを設定し、このregexpに当てはまらないものも後続に流れるようにしています。

キーに設定してあるlogフィールドがコンカチされて、次のプロセスに流れます。

2. @type record_reformer

前プロセスでラベルがつけられているので、<label @NORMAL>内で処理を記載します。
ここでは、パースするための処理を記載しています。

Rubyのscanというメソッドを使い、regexpで値を抽出しています。
また、前プロセスのmultilineのregexpに合致しないものはそもそもこのscanの対象にならず、
scan(xxx)が値を持たないため、scan(xxx).first.firstが例外を発生させてします。
そこで、Rubyの例外対応である、begin rescue endをこの中に入れています。

最後、ログ内のメイン部分は改行されていたものを結合しているため、もちろん改行を含みます。
ここで、通常のregexpとscanで色々と試してみたのですが、どうしても改行部分で決裂してしまったり、
改行を無視しようとすると、終わりが判別できなくなったりうまく行きませんでした。

若干汚いですが、gsubを使い一旦改行をエスケープし、最後に改行を戻す(今回の例ではkinesisに改行として送りたいため。スペースでいい、などであればスペースに変更するだけで良い)ようなことをしています。

終わりに

以上です。
同じことをやろうとしている方は多いのかと思いつつ、日本語、英語共にダイレクトに当てはまるものがなかったので、
色々調べて試行錯誤した内容を記事にしてみました。

質問・改善点など、お待ちしております。

参考

concatenateについて:fluent-plugin-concat
concatenateしたあとのパースに関するヒント:Parse Ruby on Rails logs with FluentD

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?