3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

fluentdメモ - (3) 設定ファイル調査 Input/Fileter/Output編

Last updated at Posted at 2020-05-01

はじめに

fluentdに関するメモ書きです。ここでは、設定ファイル(主にInput/Filter/Output関連)について調査/テストした内容を記載します。

関連記事

fluentdメモ - (1) インストール/簡易操作
fluentdメモ - (2) 設定ファイル概要
fluentdメモ - (3) 設定ファイル調査 Input/Fileter/Output編
fluentdメモ - (4) 設定ファイル調査 Buffer編

設定ファイルメモ

入力関連

tcp

特定のポートをListenしてTCPによりJSONデータを受け取ってみます。

設定ファイル例
<system>
  log_level debug
</system>

<source>
  @type tcp
  port 8081
  tag tcp.event
  bind 0.0.0.0
  <parse>
    @type json
  </parse>
</source>

<match tcp.**>
  @type stdout
</match>

上のファイルを指定してfluentdを起動します。

[root@test08 /etc/td-agent]# td-agent -c td-agent-test01_tcp.conf
 2020-04-29 09:46:27 +0900 [info]: parsing config file is succeeded path="td-agent-test01_tcp.conf"
 2020-04-29 09:46:27 +0900 [info]: gem 'fluent-mixin-config-placeholders' version '0.4.0'
 2020-04-29 09:46:27 +0900 [info]: gem 'fluent-mixin-rewrite-tag-name' version '0.1.0'
 2020-04-29 09:46:27 +0900 [info]: gem 'fluent-plugin-elasticsearch' version '4.0.7'
 2020-04-29 09:46:27 +0900 [info]: gem 'fluent-plugin-elasticsearch' version '4.0.3'
 2020-04-29 09:46:27 +0900 [info]: gem 'fluent-plugin-kafka' version '0.12.3'
 2020-04-29 09:46:27 +0900 [info]: gem 'fluent-plugin-prometheus' version '1.7.3'
 2020-04-29 09:46:27 +0900 [info]: gem 'fluent-plugin-prometheus_pushgateway' version '0.0.1'
 2020-04-29 09:46:27 +0900 [info]: gem 'fluent-plugin-record-modifier' version '2.1.0'
 2020-04-29 09:46:27 +0900 [info]: gem 'fluent-plugin-record_splitter' version '0.2.1'
 2020-04-29 09:46:27 +0900 [info]: gem 'fluent-plugin-rewrite-tag-filter' version '2.2.0'
 2020-04-29 09:46:27 +0900 [info]: gem 'fluent-plugin-s3' version '1.3.0'
 2020-04-29 09:46:27 +0900 [info]: gem 'fluent-plugin-systemd' version '1.0.2'
 2020-04-29 09:46:27 +0900 [info]: gem 'fluent-plugin-td' version '1.1.0'
 2020-04-29 09:46:27 +0900 [info]: gem 'fluent-plugin-td-monitoring' version '0.2.4'
 2020-04-29 09:46:27 +0900 [info]: gem 'fluent-plugin-webhdfs' version '1.2.4'
 2020-04-29 09:46:27 +0900 [info]: gem 'fluentd' version '1.9.2'
 2020-04-29 09:46:27 +0900 [debug]: No fluent logger for internal event
 2020-04-29 09:46:27 +0900 [info]: using configuration file: <ROOT>
  <system>
    log_level debug
  </system>
  <source>
    @type tcp
    port 8081
    tag "tcp.event"
    bind "0.0.0.0"
    <parse>
      @type "json"
    </parse>
  </source>
  <match tcp.**>
    @type stdout
  </match>
</ROOT>
 2020-04-29 09:46:27 +0900 [info]: starting fluentd-1.9.2 pid=10242 ruby="2.4.9"
 2020-04-29 09:46:27 +0900 [info]: spawn command to main:  cmdline=["/opt/td-agent/embedded/bin/ruby", "-Eascii-8bit:ascii-8bit", "/usr/sbin/td-agent", "-c", "td-agent-test01_tcp.conf", "--under-supervisor"]
 2020-04-29 09:46:28 +0900 [info]: adding match pattern="tcp.**" type="stdout"
 2020-04-29 09:46:28 +0900 [info]: adding source type="tcp"
 2020-04-29 09:46:28 +0900 [debug]: #0 No fluent logger for internal event
 2020-04-29 09:46:28 +0900 [info]: #0 starting fluentd worker pid=10253 ppid=10242 worker=0
 2020-04-29 09:46:28 +0900 [info]: #0 fluentd worker is now running worker=0

8081ポートがListenされていることを確認します。

[root@test08 /etc/td-agent]# netstat -an | grep 8081 | grep LISTEN
tcp        0      0 0.0.0.0:8081            0.0.0.0:*               LISTEN

telnetコマンドで8081ポートに接続し、以下のようなJSONデータを送り込んでみます。
{"fieldA":"AAA","fieldB":"BBB","fieldC":"CCC","message":"XXX,YYY,ZZZ"}

[root@test08 ~]# telnet localhost 8081
Trying ::1...
Connected to localhost.
Escape character is '^]'.
{"fieldA":"AAA","fieldB":"BBB","fieldC":"CCC","message":"XXX,YYY,ZZZ"}

fluentd起動シェルに以下のようなメッセージが出力されます。

2020-04-29 09:56:24.909966194 +0900 tcp.event: {"fieldA":"AAA","fieldB":"BBB","fieldC":"CCC","message":"XXX,YYY,ZZZ"}

tcp.event としてJSONデータが認識されたことが分かります。

加工関連

record_transformer

参考: record_transformer

新たにフィールド追加したり値を変更したりできます。いくつかのパターンをまとめて試してみます。

設定ファイル例
<system>
  log_level debug
</system>

<source>
  @type tcp
  port 8081
  tag tcp.event
  bind 0.0.0.0
  <parse>
    @type json
  </parse>
</source>

<filter tcp.**>
  @type record_transformer
  enable_ruby true
  auto_typecast true
  <record>
    new01 ${tag}
    new02 ${record["message"].slice(0,3)}
    field01 ${record["field01"] * 2}
    new03 ${(record["field01"].to_f + record["field02"].to_f) / record["field03"].to_f * 100}
    new04 ${record["message"].gsub(/l/, "L")}
  </record>
</filter>

<match tcp.**>
  @type stdout
</match>

telnetから8081ポートに対して、以下のJSON投入してみます。
{"message":"hello world", "field01":11, "field02":30, "field03":55}

出力結果
2020-04-29 09:30:45.995052598 +0900 tcp.event: {"message":"hello world","field01":22,"field02":30,"field03":55,"new01":"tcp.event","new02":"hel","new03":74.54545454545455,"new04":"heLLo worLd"}

new01: 新たにタグ名を値に持つnew01というフィールドを追加しています。
new02: messageの内容の先頭3文字を抜き出した値を持つnew02というフィールドを追加しています。
field01: field01の値を2倍した結果をfield01に上書きしています。
new03: (field01 + field02) / field03 * 100 の結果を持つnew03というフィールドを追加しています。
new04: messageの値に含まれる小文字"l"を大文字"L"に置き換えた値を持つnew04というフィールドを追加しています。

※補足
new03の計算に使われているfield01は変更前の値が使われていることに注意!(new03の定義の下にnew03を参照するような計算式を書くとエラーになりますので、参照できるのはオリジナルのイベントデータのみと言えそうです)
new03の計算式で、to_fというメソッドでfloat型に変換しています。これ付けてないとデフォルトでIntになってしまっているようで割算して1以下になったら切り捨てられて結果が0になってしまいました。

rewrite_tag_filter

参考: rewrite_tag_filter
特定のフィールドの値に応じてタグ名を変更する際に利用できます。

設定ファイル例
<system>
  log_level debug
</system>

<source>
  @type tcp
  port 8081
  tag tcp.event
  bind 0.0.0.0
  <parse>
    @type json
  </parse>
</source>

<match tcp.**>
  @type rewrite_tag_filter
  <rule>
    key field01
    pattern /(.*)/
    tag test.$1
  </rule>
</match>

<match **>
  @type stdout
</match>

上の例は、TCPで受け取ったデータを一度tcp.eventというタグで受け取ります。それをrewrite_tag_filterで、test01.<field01の値> というタグに置き換えています。

telnetから8081ポートに対して、以下の4つのJSON投入してみます。
{"message":"hello world", "field01":"ABC", "field02":30, "field03":55}
{"message":"hello world", "field01":"", "field02":30, "field03":55} <=field01がNull
{"message":"hello world", "field02":30, "field03":55} <=field01無し
{"message":"hello world", "field01":"TTT", "field02":30, "field03":55}

出力結果
2020-04-29 09:58:30.979913021 +0900 test.ABC: {"message":"hello world","field01":"ABC","field02":30,"field03":55}
2020-04-29 10:00:26.916290451 +0900 test.TTT: {"message":"hello world","field01":"TTT","field02":30,"field03":55}

field01をkeyに指定して、その値を元にタグを変更していますが、fild01がNullのケースやfield01自体が無い場合、メッセージがロストしています。
参考:
With rewrite-tag-filter, logs are not forwarded. Why?
fluentdからログが転送・出力されてこない時に見てほしい記事 ( rewrite_tag_filter 使用上の注意 )

上の記述によるとタグの付け方によっては無限ループする可能性があるらしいです。今回、keyをfield01にしていますが、それが無い場合の挙動についてはfluentdのドキュメントに記載が見つかりません。
参考: Option to re-emit unmatched records #75
ただ、上のやりとりを見ていると、ruleに合致しなかったeventはタグの変更が行われず、eventは再生成されないように見えます。Keyとなるフィールドが無い場合のルール指定はこのプラグインではできなさそうなので、その辺やるには別で判定ロジックをかます必要がありそうです。

出力関連

stdout

参考: stdout
標準出力に出す!上の例でも示している通り。

設定ファイル例
<match **>
  @type stdout
</match>

elasticsearch

参考:elasticsearch

index名/timestampの制御

参考:GitHub - uken/fluent-plugin-elasticsearch - Dynamic Configuration

If you want configurations to depend on information in messages, you can use elasticsearch_dynamic. This is an experimental variation of the Elasticsearch plugin allows configuration values to be specified in ways such as the below:

index名にフィールド名の参照などをしたい場合は、このDynamic Configuration(@type elasticsearch_dynamic)を使用する必要があるらしい。

設定ファイル例
<system>
  log_level debug
</system>

<source>
  @type tcp
  port 8081
  tag tcp.event
  bind 0.0.0.0
  <parse>
    @type json
  </parse>
</source>

<match tcp.**>
  @type rewrite_tag_filter
  <rule>
    key field01
    pattern /^(.*)/
    tag test04.$1
  </rule>
</match>

<filter test04.**>
  @type record_transformer
  enable_ruby
  <record>
    indexname ${record["field01"]}-${record["field02"]}
    field_date_time ${require "date"; DateTime.strptime(record["field_date"] + "T" + record["field_time"] + "+0900", "%Y-%m-%dT%H:%M:%S.%L%z").iso8601(3).to_s}
  </record>
</filter>

<match test04.**>
  @type elasticsearch_dynamic
  host localhost
  port 9200
  logstash_format true
  logstash_prefix test04-${record['indexname']}
  logstash_dateformat %Y%m%d
  time_key field_date_time
  utc_index false
</match>

record_transformerで、<field01>-<field02>という値を持つindexnameというフィールドを追加しています。
また、field_dateとfield_timeというフィールドからそのレコードのタイムスタンプを取得し、field_date_timeというフィールドを追加しています。field_dateには"YYYY-mm-dd"というフォーマット、field_timeには"HH:MM:SS.sss"というフォーマットのデータが入ってくる想定で型変換のRubyコードを埋め込んでいます。タイムゾーンとしては"+0900"を指定しているので、指定される時刻は日本のタイムゾーンでの時刻情報が入ってくる想定です。iso8601(3)でミリ秒単位の時刻情報が扱えます。(iso8601(6)だとマイクロ秒まで可。)
参考: instance method Time#iso8601

elasticsearch_dynamicの所では、上のfield_date_timeをそのレコードのタイムスタンプとし、test04-<indexname>-YYmmddという名前のindexに投入するようにしています。utc_index falseにすることでindex名につかわれる時刻はローカルタイムのものが使われます(utc_index trueにすると9時間ずれるので、0~9時までのレコードは前の日のindex名が付いてしまう)。

さて、上の設定を適用したfluentdを起動し、以下のJSONを投入してみます。
{"message":"hello world", "field01":"AAA", "field02":"BBB", "field03":55, "field_date":"2020-04-29", "field_time":"01:23:45.222"}

Kibanaで確認すると、以下のようなレコードがElasticsearchに投入されました!
image.png
index名: test04-aaa-bbb-20200429 (index名は小文字に変換されます)
@timestamp: Apr 29, 2020 @ 01:23:45.222

同一キーでのデータの結合/更新

設定ファイル例
<system>
  log_level debug
</system>

<source>
  @type tcp
  port 8081
  tag tcp.event
  bind 0.0.0.0
  <parse>
    @type json
  </parse>
</source>

<match tcp.**>
  @type rewrite_tag_filter
  <rule>
    key field01
    pattern /^(.*)/
    tag test05.$1
  </rule>
</match>


<filter test05.**>
  @type record_transformer
  enable_ruby
  <record>
    field_date_time ${require "date"; DateTime.strptime(record["field_date"] + "T" + record["field_time"] + "+0900", "%Y-%m-%dT%H:%M:%S.%L%z").iso8601(3).to_s}
  </record>
</filter>

#<match **>
#  @type stdout
#</match>

<match test05.**>
  @type elasticsearch_dynamic
  host localhost
  port 9200
  logstash_format true
  logstash_prefix test05
  logstash_dateformat %Y%m%d
  time_key field_date_time
  utc_index false
  id_key field01
  write_operation upsert
</match>

この設定で起動したfluentdに以下のJSONを投入してみます。
{"message":"hello world", "field01":"AAA", "field02":"BBB", "field03":55, "field_date":"2020-04-29", "field_time":"01:23:45.222"}
最初のレコードなので普通に追加されます。投入された結果はこうなります(Kibana-Discoverで確認)。
image.png

次に、同じfield01の値を含む以下のJSONを投入してみます。(field03の値を変更)
{"message":"hello world", "field01":"AAA", "field02":"BBB", "field03":66, "field_date":"2020-04-29", "field_time":"02:23:45.222"}
image.png
レコードは追加されず、field03の値が変更されました。

続いて以下のJSONを投入してみます。(filed02に異なる値を指定、field03無し、新たにfield04追加)
{"message":"hello world", "field01":"AAA", "field02":"CCC", "field04":999,"field_date":"2020-04-29", "field_time":"01:23:45.222"}
image.png
field02は更新され、field03は前の値がそのまま残り、field04が新たに追加されました。

さらに、以下のJSONを投入してみます。(filed04に異なる型のデータを投入)
{"message":"hello world", "field01":"AAA", "field04":"xxx","field_date":"2020-04-29", "field_time":"01:23:45.222"}
これは以下のようなエラーで失敗しました。

2020-04-30 08:57:26 +0900 [error]: #0 Could not push log to Elasticsearch: {"took"=>1, "errors"=>true, "items"=>[{"update"=>{"_index"=>"test05-20200429", "_type"=>"_doc", "_id"=>"AAA", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [field04] of type [long] in document with id 'AAA'. Preview of field's value: 'xxx'", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"For input string: \"xxx\""}}}}]}

id_key 指定と write_operation upsert の指定によって、特定キーで複数レコードのjoinのような操作が行えることが確認できました。ただ、Elasticsearchの場合、通常日次等でIndexを分けて管理することになりますが、Indexをまたぐ複数レコードの場合この方法は使えないので注意が必要です。

3
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?