はじめに
fluentdに関するメモ書きです。ここでは、設定ファイル(主にInput/Filter/Output関連)について調査/テストした内容を記載します。
関連記事
fluentdメモ - (1) インストール/簡易操作
fluentdメモ - (2) 設定ファイル概要
fluentdメモ - (3) 設定ファイル調査 Input/Fileter/Output編
fluentdメモ - (4) 設定ファイル調査 Buffer編
設定ファイルメモ
入力関連
tcp
特定のポートをListenしてTCPによりJSONデータを受け取ってみます。
<system>
log_level debug
</system>
<source>
@type tcp
port 8081
tag tcp.event
bind 0.0.0.0
<parse>
@type json
</parse>
</source>
<match tcp.**>
@type stdout
</match>
上のファイルを指定してfluentdを起動します。
[root@test08 /etc/td-agent]# td-agent -c td-agent-test01_tcp.conf
2020-04-29 09:46:27 +0900 [info]: parsing config file is succeeded path="td-agent-test01_tcp.conf"
2020-04-29 09:46:27 +0900 [info]: gem 'fluent-mixin-config-placeholders' version '0.4.0'
2020-04-29 09:46:27 +0900 [info]: gem 'fluent-mixin-rewrite-tag-name' version '0.1.0'
2020-04-29 09:46:27 +0900 [info]: gem 'fluent-plugin-elasticsearch' version '4.0.7'
2020-04-29 09:46:27 +0900 [info]: gem 'fluent-plugin-elasticsearch' version '4.0.3'
2020-04-29 09:46:27 +0900 [info]: gem 'fluent-plugin-kafka' version '0.12.3'
2020-04-29 09:46:27 +0900 [info]: gem 'fluent-plugin-prometheus' version '1.7.3'
2020-04-29 09:46:27 +0900 [info]: gem 'fluent-plugin-prometheus_pushgateway' version '0.0.1'
2020-04-29 09:46:27 +0900 [info]: gem 'fluent-plugin-record-modifier' version '2.1.0'
2020-04-29 09:46:27 +0900 [info]: gem 'fluent-plugin-record_splitter' version '0.2.1'
2020-04-29 09:46:27 +0900 [info]: gem 'fluent-plugin-rewrite-tag-filter' version '2.2.0'
2020-04-29 09:46:27 +0900 [info]: gem 'fluent-plugin-s3' version '1.3.0'
2020-04-29 09:46:27 +0900 [info]: gem 'fluent-plugin-systemd' version '1.0.2'
2020-04-29 09:46:27 +0900 [info]: gem 'fluent-plugin-td' version '1.1.0'
2020-04-29 09:46:27 +0900 [info]: gem 'fluent-plugin-td-monitoring' version '0.2.4'
2020-04-29 09:46:27 +0900 [info]: gem 'fluent-plugin-webhdfs' version '1.2.4'
2020-04-29 09:46:27 +0900 [info]: gem 'fluentd' version '1.9.2'
2020-04-29 09:46:27 +0900 [debug]: No fluent logger for internal event
2020-04-29 09:46:27 +0900 [info]: using configuration file: <ROOT>
<system>
log_level debug
</system>
<source>
@type tcp
port 8081
tag "tcp.event"
bind "0.0.0.0"
<parse>
@type "json"
</parse>
</source>
<match tcp.**>
@type stdout
</match>
</ROOT>
2020-04-29 09:46:27 +0900 [info]: starting fluentd-1.9.2 pid=10242 ruby="2.4.9"
2020-04-29 09:46:27 +0900 [info]: spawn command to main: cmdline=["/opt/td-agent/embedded/bin/ruby", "-Eascii-8bit:ascii-8bit", "/usr/sbin/td-agent", "-c", "td-agent-test01_tcp.conf", "--under-supervisor"]
2020-04-29 09:46:28 +0900 [info]: adding match pattern="tcp.**" type="stdout"
2020-04-29 09:46:28 +0900 [info]: adding source type="tcp"
2020-04-29 09:46:28 +0900 [debug]: #0 No fluent logger for internal event
2020-04-29 09:46:28 +0900 [info]: #0 starting fluentd worker pid=10253 ppid=10242 worker=0
2020-04-29 09:46:28 +0900 [info]: #0 fluentd worker is now running worker=0
8081ポートがListenされていることを確認します。
[root@test08 /etc/td-agent]# netstat -an | grep 8081 | grep LISTEN
tcp 0 0 0.0.0.0:8081 0.0.0.0:* LISTEN
telnetコマンドで8081ポートに接続し、以下のようなJSONデータを送り込んでみます。
{"fieldA":"AAA","fieldB":"BBB","fieldC":"CCC","message":"XXX,YYY,ZZZ"}
[root@test08 ~]# telnet localhost 8081
Trying ::1...
Connected to localhost.
Escape character is '^]'.
{"fieldA":"AAA","fieldB":"BBB","fieldC":"CCC","message":"XXX,YYY,ZZZ"}
fluentd起動シェルに以下のようなメッセージが出力されます。
2020-04-29 09:56:24.909966194 +0900 tcp.event: {"fieldA":"AAA","fieldB":"BBB","fieldC":"CCC","message":"XXX,YYY,ZZZ"}
tcp.event としてJSONデータが認識されたことが分かります。
加工関連
record_transformer
新たにフィールド追加したり値を変更したりできます。いくつかのパターンをまとめて試してみます。
<system>
log_level debug
</system>
<source>
@type tcp
port 8081
tag tcp.event
bind 0.0.0.0
<parse>
@type json
</parse>
</source>
<filter tcp.**>
@type record_transformer
enable_ruby true
auto_typecast true
<record>
new01 ${tag}
new02 ${record["message"].slice(0,3)}
field01 ${record["field01"] * 2}
new03 ${(record["field01"].to_f + record["field02"].to_f) / record["field03"].to_f * 100}
new04 ${record["message"].gsub(/l/, "L")}
</record>
</filter>
<match tcp.**>
@type stdout
</match>
telnetから8081ポートに対して、以下のJSON投入してみます。
{"message":"hello world", "field01":11, "field02":30, "field03":55}
2020-04-29 09:30:45.995052598 +0900 tcp.event: {"message":"hello world","field01":22,"field02":30,"field03":55,"new01":"tcp.event","new02":"hel","new03":74.54545454545455,"new04":"heLLo worLd"}
new01: 新たにタグ名を値に持つnew01というフィールドを追加しています。
new02: messageの内容の先頭3文字を抜き出した値を持つnew02というフィールドを追加しています。
field01: field01の値を2倍した結果をfield01に上書きしています。
new03: (field01 + field02) / field03 * 100 の結果を持つnew03というフィールドを追加しています。
new04: messageの値に含まれる小文字"l"を大文字"L"に置き換えた値を持つnew04というフィールドを追加しています。
※補足
new03の計算に使われているfield01は変更前の値が使われていることに注意!(new03の定義の下にnew03を参照するような計算式を書くとエラーになりますので、参照できるのはオリジナルのイベントデータのみと言えそうです)
new03の計算式で、to_fというメソッドでfloat型に変換しています。これ付けてないとデフォルトでIntになってしまっているようで割算して1以下になったら切り捨てられて結果が0になってしまいました。
rewrite_tag_filter
参考: rewrite_tag_filter
特定のフィールドの値に応じてタグ名を変更する際に利用できます。
<system>
log_level debug
</system>
<source>
@type tcp
port 8081
tag tcp.event
bind 0.0.0.0
<parse>
@type json
</parse>
</source>
<match tcp.**>
@type rewrite_tag_filter
<rule>
key field01
pattern /(.*)/
tag test.$1
</rule>
</match>
<match **>
@type stdout
</match>
上の例は、TCPで受け取ったデータを一度tcp.event
というタグで受け取ります。それをrewrite_tag_filterで、test01.<field01の値>
というタグに置き換えています。
telnetから8081ポートに対して、以下の4つのJSON投入してみます。
{"message":"hello world", "field01":"ABC", "field02":30, "field03":55}
{"message":"hello world", "field01":"", "field02":30, "field03":55}
<=field01がNull
{"message":"hello world", "field02":30, "field03":55}
<=field01無し
{"message":"hello world", "field01":"TTT", "field02":30, "field03":55}
2020-04-29 09:58:30.979913021 +0900 test.ABC: {"message":"hello world","field01":"ABC","field02":30,"field03":55}
2020-04-29 10:00:26.916290451 +0900 test.TTT: {"message":"hello world","field01":"TTT","field02":30,"field03":55}
field01をkeyに指定して、その値を元にタグを変更していますが、fild01がNullのケースやfield01自体が無い場合、メッセージがロストしています。
参考:
With rewrite-tag-filter, logs are not forwarded. Why?
fluentdからログが転送・出力されてこない時に見てほしい記事 ( rewrite_tag_filter 使用上の注意 )
上の記述によるとタグの付け方によっては無限ループする可能性があるらしいです。今回、keyをfield01にしていますが、それが無い場合の挙動についてはfluentdのドキュメントに記載が見つかりません。
参考: Option to re-emit unmatched records #75
ただ、上のやりとりを見ていると、ruleに合致しなかったeventはタグの変更が行われず、eventは再生成されないように見えます。Keyとなるフィールドが無い場合のルール指定はこのプラグインではできなさそうなので、その辺やるには別で判定ロジックをかます必要がありそうです。
出力関連
stdout
参考: stdout
標準出力に出す!上の例でも示している通り。
<match **>
@type stdout
</match>
elasticsearch
index名/timestampの制御
参考:GitHub - uken/fluent-plugin-elasticsearch - Dynamic Configuration
If you want configurations to depend on information in messages, you can use elasticsearch_dynamic. This is an experimental variation of the Elasticsearch plugin allows configuration values to be specified in ways such as the below:
index名にフィールド名の参照などをしたい場合は、このDynamic Configuration(@type elasticsearch_dynamic
)を使用する必要があるらしい。
<system>
log_level debug
</system>
<source>
@type tcp
port 8081
tag tcp.event
bind 0.0.0.0
<parse>
@type json
</parse>
</source>
<match tcp.**>
@type rewrite_tag_filter
<rule>
key field01
pattern /^(.*)/
tag test04.$1
</rule>
</match>
<filter test04.**>
@type record_transformer
enable_ruby
<record>
indexname ${record["field01"]}-${record["field02"]}
field_date_time ${require "date"; DateTime.strptime(record["field_date"] + "T" + record["field_time"] + "+0900", "%Y-%m-%dT%H:%M:%S.%L%z").iso8601(3).to_s}
</record>
</filter>
<match test04.**>
@type elasticsearch_dynamic
host localhost
port 9200
logstash_format true
logstash_prefix test04-${record['indexname']}
logstash_dateformat %Y%m%d
time_key field_date_time
utc_index false
</match>
record_transformerで、<field01>-<field02>
という値を持つindexnameというフィールドを追加しています。
また、field_dateとfield_timeというフィールドからそのレコードのタイムスタンプを取得し、field_date_timeというフィールドを追加しています。field_dateには"YYYY-mm-dd"というフォーマット、field_timeには"HH:MM:SS.sss"というフォーマットのデータが入ってくる想定で型変換のRubyコードを埋め込んでいます。タイムゾーンとしては"+0900"を指定しているので、指定される時刻は日本のタイムゾーンでの時刻情報が入ってくる想定です。iso8601(3)でミリ秒単位の時刻情報が扱えます。(iso8601(6)だとマイクロ秒まで可。)
参考: instance method Time#iso8601
elasticsearch_dynamicの所では、上のfield_date_timeをそのレコードのタイムスタンプとし、test04-<indexname>-YYmmdd
という名前のindexに投入するようにしています。utc_index falseにすることでindex名につかわれる時刻はローカルタイムのものが使われます(utc_index trueにすると9時間ずれるので、0~9時までのレコードは前の日のindex名が付いてしまう)。
さて、上の設定を適用したfluentdを起動し、以下のJSONを投入してみます。
{"message":"hello world", "field01":"AAA", "field02":"BBB", "field03":55, "field_date":"2020-04-29", "field_time":"01:23:45.222"}
Kibanaで確認すると、以下のようなレコードがElasticsearchに投入されました!
index名: test04-aaa-bbb-20200429 (index名は小文字に変換されます)
@timestamp
: Apr 29, 2020 @ 01:23:45.222
同一キーでのデータの結合/更新
<system>
log_level debug
</system>
<source>
@type tcp
port 8081
tag tcp.event
bind 0.0.0.0
<parse>
@type json
</parse>
</source>
<match tcp.**>
@type rewrite_tag_filter
<rule>
key field01
pattern /^(.*)/
tag test05.$1
</rule>
</match>
<filter test05.**>
@type record_transformer
enable_ruby
<record>
field_date_time ${require "date"; DateTime.strptime(record["field_date"] + "T" + record["field_time"] + "+0900", "%Y-%m-%dT%H:%M:%S.%L%z").iso8601(3).to_s}
</record>
</filter>
#<match **>
# @type stdout
#</match>
<match test05.**>
@type elasticsearch_dynamic
host localhost
port 9200
logstash_format true
logstash_prefix test05
logstash_dateformat %Y%m%d
time_key field_date_time
utc_index false
id_key field01
write_operation upsert
</match>
この設定で起動したfluentdに以下のJSONを投入してみます。
{"message":"hello world", "field01":"AAA", "field02":"BBB", "field03":55, "field_date":"2020-04-29", "field_time":"01:23:45.222"}
最初のレコードなので普通に追加されます。投入された結果はこうなります(Kibana-Discoverで確認)。
次に、同じfield01の値を含む以下のJSONを投入してみます。(field03の値を変更)
{"message":"hello world", "field01":"AAA", "field02":"BBB", "field03":66, "field_date":"2020-04-29", "field_time":"02:23:45.222"}
レコードは追加されず、field03の値が変更されました。
続いて以下のJSONを投入してみます。(filed02に異なる値を指定、field03無し、新たにfield04追加)
{"message":"hello world", "field01":"AAA", "field02":"CCC", "field04":999,"field_date":"2020-04-29", "field_time":"01:23:45.222"}
field02は更新され、field03は前の値がそのまま残り、field04が新たに追加されました。
さらに、以下のJSONを投入してみます。(filed04に異なる型のデータを投入)
{"message":"hello world", "field01":"AAA", "field04":"xxx","field_date":"2020-04-29", "field_time":"01:23:45.222"}
これは以下のようなエラーで失敗しました。
2020-04-30 08:57:26 +0900 [error]: #0 Could not push log to Elasticsearch: {"took"=>1, "errors"=>true, "items"=>[{"update"=>{"_index"=>"test05-20200429", "_type"=>"_doc", "_id"=>"AAA", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [field04] of type [long] in document with id 'AAA'. Preview of field's value: 'xxx'", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"For input string: \"xxx\""}}}}]}
id_key 指定と write_operation upsert の指定によって、特定キーで複数レコードのjoinのような操作が行えることが確認できました。ただ、Elasticsearchの場合、通常日次等でIndexを分けて管理することになりますが、Indexをまたぐ複数レコードの場合この方法は使えないので注意が必要です。