はじめに
タイトルにELK連携とあるように、基本的にはElasticsearch/Logstash/Kibanaをベースに考えてきましたが、Logstashは大量データを扱う場合など、パフォーマンス上ちょっと厳しいケースが出てきます。
ここではLogstashの代わりにfluentdを利用してCDPzのデータをElasticsearchに取り込む方法について記載します。
関連記事
CDPzを利用したz/OS-ELK連携 - (1)セットアップ編
CDPzを利用したz/OS-ELK連携 - (2)SYSLOG取得編
CDPzを利用したz/OS-ELK連携 - (3)SMF-バッチ編
CDPzを利用したz/OS-ELK連携 - (4)SMF-リアルタイム編
CDPzを利用したz/OS-ELK連携 - (5)カスタマイズ-共通
CDPzを利用したz/OS-ELK連携 - (6)カスタマイズ-SYSLOG
CDPzを利用したz/OS-ELK連携 - (7)カスタマイズ-CICS
CDPzを利用したz/OS-ELK連携 - (8)カスタマイズ-RMF Monitor III
CDPzを利用したz/OS-ELK連携 - (9)カスタマイズ-fluentdの利用
fluentdインストール
この辺を参考に...
fluentdメモ - (1) インストール/簡易操作
Fluentd - Installation
プラグインの設定
CDPzから送信されてくるデータは、以下のように、messageの項目内にいくつかのレコードが配列としてまとめられて送付されてきます。以下はSYSLOGの例ですがSMFのデータでも同様です。
{
"seq" => {
"w" => "1",
"c" => "2"
},
"sysplexName" => "EPLEX",
"@timestamp" => 2020-01-30T10:36:29.289Z,
"systemName" => "ZOS1",
"message" => [
[0] "NE,007C,20022 15.02.43.770 +0900,ZOS1 ,STC00809, ,00000000000000000000000000000000,00000290,CICS0041,00,\" IEA630I OPERATOR CICS004$ NOW ACTIVE, SYSTEM=ZOS1 , LU=0944FEF3\"\n",
[1] "NE,007C,20022 15.02.43.770 +0900,ZOS1 ,STC00809,CICS004$,00000000000000000000000000000000,00000290,CICS0041,80,\" D T\"\n",
[2] "NE,007C,20022 15.02.43.770 +0900,ZOS1 ,STC00809,CICS004$,00000000000000000000000000000000,00000090,CICS0041,40,\" IEE136I LOCAL: TIME=15.02.43 DATE=2020.022 UTC: TIME=06.02.43 DATE=2020.022\"\n",
[3] "NE,007C,20022 15.02.44.790 +0900,ZOS1 ,STC00809, ,00000000000000000000000000000000,00000290,CICS0041,00,\" IEA631I OPERATOR CICS004$ NOW INACTIVE, SYSTEM=ZOS1 , LU=0944FEF3\"\n"
],
"host" => "EPLEX1",
"port" => 46768,
"sourceType" => "zOS-SYSLOG-Console",
"path" => "SYSLOG",
"timeZone" => "+0900",
"inputsequence" => "20200122060248560:000000",
"@version" => "1",
"sourceName" => "ZOS1-SYSLOG"
}
Logstashでは、splitというfilterプラグインを使うことでこれらの配列を個別のレコードに分解できます。
参考: Split filter plugin
しかし、fluentdだと出来合いのfilterでは同じような動作をさせるのが難しいようです。そこで、Logstashのsplitと同じような動作をさせる独自の機能を用意し、fluentdのプラグインとして設定しておきます。
具体的には、fluentdのプラグイン"json_array_split"として動作する以下のrubyコードを用意し、fluentdのプラグインのパスに配置します(デフォルトだと/etc/td-agent/plugin)。
#
# Copyright 2020- TODO: Write your name
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require 'fluent/plugin/filter'
module Fluent
module Plugin
class JsonArraySplitFilter < Fluent::Plugin::Filter
Fluent::Plugin.register_filter("json_array_split", self)
config_param :target_key, :string, default: 'message'
config_param :keep_keys, :bool, default: true
def filter_stream(tag, es)
new_es = MultiEventStream.new
es.each do |time, record|
target_key_value = record[@target_key]
if target_key_value.instance_of?(Array) then
target_key_value.each { |r|
result = {}
result[@target_key] = r
if @keep_keys then
record.each do |key, value|
if key != @target_key then
result[key] = value
end
end
end
new_es.add(time, result)
}
end
end
new_es # return
end
end
end
end
これで、Logstashのsplitと類似の動きをする"json_array_split"が利用できるようになります。
fluentd設定ファイルの準備
Input/Filter/Output部分をそれぞれ別ファイルとして用意します。
最後に、各Input/Filter/OutputをIncludeする親の設定ファイルを作成することにします。
Input
ここは取得するData Streamに依存しない共通部分です。
TCPでデータを受け取ってデータをJSONとして解釈します。
取得するデータの種類によらず共通です。必要に応じてポート番号などは修正します。
<source>
@type tcp
port 8081
tag tcp.event
bind 0.0.0.0
<parse>
@type json
</parse>
</source>
Filter(共通)
以下の処理を行います。
- 上で定義したプラグイン"json_array_split"を使ってmessage部分の配列を分割
- Elasticsearchに取り込むIndex名追加
- sourceTypによるタグ付け
このFilterの定義も共通なので、上のInput用の定義と同じディレクトリに配置することにします。
<filter tcp.**>
@type json_array_split
target_key message
</filter>
<filter tcp.**>
@type record_transformer
enable_ruby
<record>
indexname ${record["sourceType"]}-${record["sysplexName"]}
</record>
</filter>
<match tcp.**>
@type rewrite_tag_filter
<rule>
key sourceType
pattern /(.*)/
tag cdp-$1
</rule>
</match>
Filter (Data Stream別)
Data StreamごとにCSVをフィールド別に解釈したり、タイムスタンプ情報を拾ってきたりする部分ですが、これはCDPzが提供するLogstash用の設定ファイル(H_xxx.lsh, N_xxx.lsh)をスクリプトにより一括変換します。
変換用シェルスクリプト
#!/bin/bash
logstash_conf_dir=$1
fluentd_conf_dir=$2
cd ${logstash_conf_dir}
find ./ -name "H_*" | sed -r -e "s/\.\/(.*).lsh/\\1/g" | xargs -I{} gawk -v output=${fluentd_conf_dir}/{}.conf '
/sourceType/ {
str = sprintf("<filter cdp-%s>\n", gensub(/^.*"(.+)".*$/, "\\1", "g") )
str = str " @type parser\n"
str = str " key_name message\n"
str = str " reserve_data true\n"
str = str " <parse>\n"
str = str " @type csv"
print str > output
}
/columns/ {
str = gensub(/^.*\[[ ]+(.*)[ ]+\]$/, "\\1", "g")
gsub(/[" ]/, "", str)
str = sprintf(" keys %s\n", str)
str = str " </parse>\n"
str = str "</filter>"
print str >> output
}
' {}.lsh
find ./ -name "N_*" | sed -r -e "s/\.\/(.*).lsh/\\1/g" | xargs -I{} gawk -v output=${fluentd_conf_dir}/{}.conf '
/sourceType/ {
str = sprintf("<filter cdp-%s>\n", gensub(/^.*"(.+)".*$/, "\\1", "g") )
str = str " @type record_transformer\n"
str = str " enable_ruby\n"
str = str " <record>"
print str > output
}
/"\[\@metadata\]\[timestamp\]" \=>/ {
date = gensub(/^ +"\[@metadata\]\[timestamp\]" \=> "\%\{([0-9A-Za-z]+)\}( \%\{([0-9A-Za-z]+)\})?"$/, "\\1", "g")
time = gensub(/^ +"\[@metadata\]\[timestamp\]" \=> "\%\{([0-9A-Za-z]+)\}( \%\{([0-9A-Za-z]+)\})?"$/, "\\3", "g")
if (time == "" && gensub(/^.*(IMS).*$/, "\\1", "g", output) == "IMS")
str = sprintf(" time_key ${require \"date\"; DateTime.strptime(record[\"%s\"] + record[\"timeZone\"], \"%%Y-%%m-%%d %%H:%%M:%%S.%%L%%z\").iso8601.to_s}\n", date)
else if (time == "")
str = sprintf(" time_key ${require \"date\"; DateTime.strptime(record[\"%s\"], \"%%y%%j %%H.%%M.%%S.%%L %%z\").iso8601(6).to_s}\n", date)
else
str = sprintf(" time_key ${require \"date\"; DateTime.strptime(record[\"%s\"] + \"T\" + record[\"%s\"] + record[\"timeZone\"], \"%%Y-%%m-%%dT%%H:%%M:%%S:%%L%%z\").iso8601.to_s}\n", date, time)
str = str " </record>\n"
str = str "</filter>"
print str >> output
}
' {}.lsh
使い方: convert.sh <logstash_dir> <flutend_dir>
<logstash_dir>
: 変換元のH_xxx.lsh, N_xxx.lshが格納されているディレクトリ
<fluentd_dir>
: 変換後のfluentd設定ファイルを出力するディレクトリ
これにより、<logstash_dir>
以下にあるH_xxx.lsh,N_xxx.lshが全てfluentd用の設定ファイルに変換されて、<fluentd_dir>
以下に、H_xxx.conf, N_xxx.conf という名前で出力されます。
取得するData Streamに合わせて、必要な定義を適当なディレクトリに配置します。
ここでは、SYSLOGとRMF Monitor IIIの情報を取得する想定で、/etc/td-agent/cdp02_syslog_rmfというディレクトリを作成し、変換後のSYSLOG関連とRMF Monitor III関連のファイルを配置することにします。
H_RMF_CPC.conf
H_RMF_DELAY.conf
H_RMF_PROCU.conf
H_RMF_SYSINFO.conf
H_SYSLOG_or_OPERLOG.conf
N_RMF_CPC.conf
N_RMF_DELAY.conf
N_RMF_PROCU.conf
N_RMF_SYSINFO.conf
N_SYSLOG_or_OPERLOG.conf
※上の変換スクリプトだと一部不備があるので、RMF Monitor III関連の DELAY, PROCU, SYSINFOに関してはH_XXX.conf, N_XXX.confを以下のようにカスタマイズします。
<filter cdp-zOS-RMF_PROCU>
@type record_transformer
enable_ruby
<record>
message ${record["message"].gsub(/N\/A/,'0')}
</record>
</filter>
<filter cdp-zOS-RMF_PROCU>
@type parser
key_name message
reserve_data true
<parse>
@type csv
keys LOCALSTART,LOCALEND,PRUPJOB,PRUPASI,PRUPCLA,PRUPCLAX,PRUPSVCL,PRUPCLP,PRUPCPT,PRUPAACT,PRUPCBCT,PRUPIICT,PRUPCPE,PRUPAAPE,PRUPCBPE,PRUPIIPE,PRUPTOTC,PRUPTOTE,PRUPTCB,PRUPSRB,PRUPPCS,PRUP
EPS,PRUTCPUT
</parse>
</filter>
H_RMF_PROCU.confのの先頭にfileterを1つ追加します。数値フィールドにN/Aが返される場合があるので、N/Aを一律0に置き換えています。H_RMF_DELAY.conf, H_RMF_SYSINFO.confも同様に修正します。
<filter cdp-zOS-RMF_PROCU>
@type record_transformer
enable_ruby
<record>
time_key ${require "date"; DateTime.strptime(record["LOCALEND"] + record["timeZone"], "%Y-%m-%d %H:%M:%S:%L%z").iso8601(6).to_s}
</record>
</filter>
time_keyの指定を上のように変更します。N_RMF_DELAY.conf,N_RMF_SYSINFO.confも同様に修正します。
また、PROCUのPRUPCLP(Service class period)というフィールドには数値と"*"が混在するので、このフィールドは明示的に文字列フィールドとしてElasticsearch のIndex Templateを作成しておくことにします。
PUT _template/cdp_procu
{
"index_patterns": ["cdp-zos-rmf_procu-*"],
"order" : 1,
"settings": {
"number_of_shards": 1,
"number_of_replicas" : 0
},
"mappings": {
"numeric_detection": true,
"properties": {
"PRUPCLP": {
"type": "text"
}
}
}
}
上の例に従って、事前にElasticsearch側でIndex Templateを作成しておきます。
Output
ここも取得するData Streamに依存しない共通部分です。
Elasticsearchへの出力に関する設定を行います。
<match cdp-**>
@type elasticsearch_dynamic
host localhost
port 9200
logstash_format true
logstash_prefix cdp-${record['indexname']}
logstash_dateformat %Y%m%d
time_key time_key
utc_index false
</match>
設定ファイルの統合
ここまでで、/etc/td-agentディレクトリ以下のファイル構造を見てみると、こんな感じになっています。
.
|-- cdp01_input
| |-- B_CDPz_Input.conf
| `-- E_CDPz_Index.conf
|-- cdp02_syslog_rmf
| |-- H_RMF_CPC.conf
| |-- H_RMF_DELAY.conf
| |-- H_RMF_PROCU.conf
| |-- H_RMF_SYSINFO.conf
| |-- H_SYSLOG_or_OPERLOG.conf
| |-- N_RMF_CPC.conf
| |-- N_RMF_DELAY.conf
| |-- N_RMF_PROCU.conf
| |-- N_RMF_SYSINFO.conf
| `-- N_SYSLOG_or_OPERLOG.conf
|-- cdp03_output_to_elastic
| `-- Q_CDPz_Elastic.conf
|-- plugin
| `-- filter_json_array_split.rb
...
さて、上で用意した各種設定ファイルを取りまとめる親玉の設定ファイル"td-agent-syslog-rmf.conf"を作成します。
@include /etc/td-agent/cdp01_input/*.conf
@include /etc/td-agent/cdp02_syslog_rmf/*.conf
@include /etc/td-agent/cdp03_output_to_elastic/*.conf
はい、これで設定ファイルはできあがりです。
fluentd実行
上で作成した設定ファイルを指定して、fluentdを起動すればOKです。
td-agent -c td-agent-syslog-rmf.conf
[root@test11 /etc/td-agent]# td-agent -c td-agent-syslog-rmf.conf
2020-10-01 15:59:20 +0900 [info]: parsing config file is succeeded path="td-agent-syslog-rmf.conf"
2020-10-01 15:59:20 +0900 [info]: gem 'fluent-plugin-elasticsearch' version '4.2.0'
2020-10-01 15:59:20 +0900 [info]: gem 'fluent-plugin-elasticsearch' version '4.0.9'
2020-10-01 15:59:20 +0900 [info]: gem 'fluent-plugin-kafka' version '0.13.0'
2020-10-01 15:59:20 +0900 [info]: gem 'fluent-plugin-prometheus' version '1.8.0'
2020-10-01 15:59:20 +0900 [info]: gem 'fluent-plugin-prometheus_pushgateway' version '0.0.2'
2020-10-01 15:59:20 +0900 [info]: gem 'fluent-plugin-record-modifier' version '2.1.0'
2020-10-01 15:59:20 +0900 [info]: gem 'fluent-plugin-rewrite-tag-filter' version '2.3.0'
2020-10-01 15:59:20 +0900 [info]: gem 'fluent-plugin-s3' version '1.3.2'
2020-10-01 15:59:20 +0900 [info]: gem 'fluent-plugin-systemd' version '1.0.2'
2020-10-01 15:59:20 +0900 [info]: gem 'fluent-plugin-td' version '1.1.0'
2020-10-01 15:59:20 +0900 [info]: gem 'fluent-plugin-td-monitoring' version '0.2.4'
2020-10-01 15:59:20 +0900 [info]: gem 'fluent-plugin-webhdfs' version '1.2.5'
2020-10-01 15:59:20 +0900 [info]: gem 'fluentd' version '1.11.1'
2020-10-01 15:59:20 +0900 [info]: adding rewrite_tag_filter rule: sourceType [#<Fluent::PluginHelper::RecordAccessor::Accessor:0x00000000025005a8 @keys="sourceType">, /(.*)/, "", "cdp-$1"]
2020-10-01 15:59:20 +0900 [info]: using configuration file: <ROOT>
<source>
@type tcp
port 8081
tag "tcp.event"
bind "0.0.0.0"
<parse>
@type "json"
</parse>
</source>
<filter tcp.**>
@type json_array_split
target_key "message"
</filter>
<filter tcp.**>
@type record_transformer
enable_ruby
<record>
indexname ${record["sourceType"]}-${record["sysplexName"]}
</record>
</filter>
<match tcp.**>
@type rewrite_tag_filter
<rule>
key "sourceType"
pattern /(.*)/
tag "cdp-$1"
</rule>
</match>
<filter cdp-zOS-RMF_CPC>
@type parser
key_name "message"
reserve_data true
<parse>
@type "csv"
keys LOCALSTART,LOCALEND,CPCHPNAM,CPCHMOD,CPCHMDL,CPCHBSTT,CPCHCMSU,CPCHWF,CPCHLMSU,CPCHGNAM,CPCHIMSU,CPCHCAP,CPCHLMAX,CPCHGLIM,CPCHGL4H,CPCHMTMS,CPCHPRDS,CPCHAMSU,CPCHRMSU,CPCHRGRP,CPCHGAUN,CPCHCPU,CPCHCPCN,CPCHCPNO,CPCHICNO,CPCHIFAN,CPCHCBPN,CPCHICFN,CPCHIFLN,CPCHSUPN,CPCHPANO,CPCHWAIT,CPCHPMSU,CPCHDEDC,CPCHDEDA,CPCHDEDO,CPCHDEDI,CPCHSHRC,CPCHSHRA,CPCHSHRO,CPCHSHRI,CPCHVCPU,CPCHWMGT,CPCHCCAI,CPCHCCCR,CPCHPRD,CPCHMCFS,CPCHMCF,CPCHCFS,CPCHCF,CPCHATDS,CPCHATD,CPCHMTM,CPCHMDLX,CPCHBSTC,CPCHCUTL,CPCHAUTL,CPCHOUTL,CPCHUUTL,CPCHLUTL,CPCHFUTL,CPCHATDI,CPCHCFI,CPCHMCFI,CPCHMTMI,CPCHPRDI,CPCPPNAM,CPCPDMSU,CPCPAMSU,CPCPCAPD,CPCPLPNO,CPCPLEFU,CPCPLTOU,CPCPPLMU,CPCPPEFU,CPCPPTOU,CPCPIND,CPCPLPND,CPCPDEDP,CPCPWGHT,CPCPLPSH,CPCPVCMH,CPCPVCMM,CPCPVCML,CPCPOSNM,CPCPLPCN,CPCPLCIW,CPCPLCMW,CPCPLCXW,CPCPCGNM,CPCPCGLT,CPCPCGEM,CPCPCGEX,CPCPCSMB,CPCPUPID,CPCPCAPI,CPCPHWCC,CPCPHGNM,CPCPHWGC,CPCPBIIP,CPCPBSPD
</parse>
</filter>
<filter cdp-zOS-RMF_DELAY>
@type record_transformer
enable_ruby
<record>
message ${record["message"].gsub(/N\/A/,'0')}
</record>
</filter>
<filter cdp-zOS-RMF_DELAY>
@type parser
key_name "message"
reserve_data true
<parse>
@type "csv"
keys LOCALSTART,LOCALEND,JDELDAN,JDELASI,JDETYPX,JDEPSVCL,JDEGMIP,JDELWFL,JDELUSG,JDELDEL,JDELIDL,JDELUKN,JDELPROC,JDELDEV,JDELSTOR,JDELSUBS,JDELOPER,JDELENQ,JDELJES,JDELHSM,JDELXCF,JDELMNT,JDELMES,JDELQUI,JDELCAP,JDELCP,JDELCBP,JDELSUP,JDELIFA,JDELREAS
</parse>
</filter>
<filter cdp-zOS-RMF_PROCU>
@type record_transformer
enable_ruby
<record>
message ${record["message"].gsub(/N\/A/,'0')}
</record>
</filter>
<filter cdp-zOS-RMF_PROCU>
@type parser
key_name "message"
reserve_data true
<parse>
@type "csv"
keys LOCALSTART,LOCALEND,PRUPJOB,PRUPASI,PRUPCLA,PRUPCLAX,PRUPSVCL,PRUPCLP,PRUPCPT,PRUPAACT,PRUPCBCT,PRUPIICT,PRUPCPE,PRUPAAPE,PRUPCBPE,PRUPIIPE,PRUPTOTC,PRUPTOTE,PRUPTCB,PRUPSRB,PRUPPCS,PRUPEPS,PRUTCPUT
</parse>
</filter>
<filter cdp-zOS-RMF_SYSINFO>
@type record_transformer
enable_ruby
<record>
message ${record["message"].gsub(/N\/A/,'0')}
</record>
</filter>
<filter cdp-zOS-RMF_SYSINFO>
@type parser
key_name "message"
reserve_data true
<parse>
@type "csv"
keys LOCALSTART,LOCALEND,SYSPARVC,SYSMODVC,SYSMDLVC,SYSTSVVC,SYSPOLVC,SYSPRVVC,SYSCUVVC,SYSTSEVC,SYSPADVC,SYSPROVC,SYSLCPVC,SYSAPOVC,SYSPATVC,SYSPRTVC,SYSAPTVC,SYSAOCVC,SYSATCVC,SYSLOAVG,SYSCVAVC,SYSCUAVC,SYSMUAVC,SYSCUOVC,SYSMUOVC,SYSCUIVC,SYSMUIVC,SYSAHPVC,SYSIHPVC,SYSPKCVC,SYSPKAVC,SYSPKOVC,SYSPKIVC,SYSPRIVC,SYSAPIVC,SYSICVVC,SYSAICVC,SYSIPVVC,SYSOPVVC,SYSVEPVC,SYSTCTVC,SYSUCTVC,SYSCCTVC,SYSCULVC,SYSNAMVC,SYSTYPVC,SYSWFLVC,SYSTUSVC,SYSAUSVC,SYSTRSVC,SYSAFCVC,SYSRSPM,SYSAUPVC,SYSAUDVC,SYSADPVC,SYSADDVC,SYSADSVC,SYSADUVC,SYSADOVC,SYSADEVC,SYSADJVC,SYSADHVC,SYSADXVC,SYSADNVC,SYSADMVC,SYSCPUVC,SYSSRBVC,SYSTCBVC,SYSIFAVC,SYSCPVC,SYSIFCVC,SYSRSPVC,SYSVELVC,SYSUGMVC,SYSUGPVC,SYSUGDVC,SYSWGDVC,SYSWGPVC,SYSDGMVC,SYSUJMVC,SYSDJMVC,SYSDGEVC,SYSDGHVC,SYSDGDVC,SYSDGJVC,SYSDGOVC,SYSDGPVC,SYSDGSVC,SYSDGUVC,SYSDGXVC,SYSDDSIN,SYSDDSIT,SYSDDSIP,SYSRCTNT,SYSEAPVC,SYSLPIVC,SYSSUPVC,SYSSUCVC,SYSPDPVC,SYSTODVC,SYSCPDVC,SYSAPDVC,SYSIPDVC,SYSRGCVC,SYSDLYVC,SYSMEMUS,SYSCBPVC,SYSCBCVC,SYSCBDVC,SYSVECVC
</parse>
</filter>
<filter cdp-zOS-SYSLOG-Console>
@type parser
key_name "message"
reserve_data true
<parse>
@type "csv"
keys rcd,ASID,TIMESTAMP,SMFID,JOBNUM,CONSOLE,ROUTECODE,DESCRIPTOR,JOBNAME,FLAGS,TEXT
</parse>
</filter>
<filter cdp-zOS-RMF_CPC>
@type record_transformer
enable_ruby
<record>
time_key ${require "date"; DateTime.strptime(record["LOCALEND"], "%y%j %H.%M.%S.%L %z").iso8601(6).to_s}
</record>
</filter>
<filter cdp-zOS-RMF_DELAY>
@type record_transformer
enable_ruby
<record>
time_key ${require "date"; DateTime.strptime(record["LOCALEND"] + record["timeZone"], "%Y-%m-%d %H:%M:%S:%L%z").iso8601(6).to_s}
</record>
</filter>
<filter cdp-zOS-RMF_PROCU>
@type record_transformer
enable_ruby
<record>
time_key ${require "date"; DateTime.strptime(record["LOCALEND"] + record["timeZone"], "%Y-%m-%d %H:%M:%S:%L%z").iso8601(6).to_s}
</record>
</filter>
<filter cdp-zOS-RMF_SYSINFO>
@type record_transformer
enable_ruby
<record>
time_key ${require "date"; DateTime.strptime(record["LOCALEND"] + record["timeZone"], "%Y-%m-%d %H:%M:%S:%L%z").iso8601(6).to_s}
</record>
</filter>
<filter cdp-zOS-SYSLOG-Console>
@type record_transformer
enable_ruby
<record>
time_key ${require "date"; DateTime.strptime(record["TIMESTAMP"], "%y%j %H.%M.%S.%L %z").iso8601(6).to_s}
</record>
</filter>
<match cdp-**>
@type elasticsearch_dynamic
host "localhost"
port 9200
logstash_format true
logstash_prefix "cdp-${record[\'indexname\']}"
logstash_dateformat "%Y%m%d"
time_key "time_key"
utc_index true
</match>
</ROOT>
2020-10-01 15:59:20 +0900 [info]: starting fluentd-1.11.1 pid=26763 ruby="2.4.10"
2020-10-01 15:59:20 +0900 [info]: spawn command to main: cmdline=["/opt/td-agent/embedded/bin/ruby", "-Eascii-8bit:ascii-8bit", "/usr/sbin/td-agent", "-c", "td-agent-syslog-rmf.conf", "--under-supervisor"]
2020-10-01 15:59:21 +0900 [info]: adding filter pattern="tcp.**" type="json_array_split"
2020-10-01 15:59:21 +0900 [info]: adding filter pattern="tcp.**" type="record_transformer"
2020-10-01 15:59:21 +0900 [info]: adding match pattern="tcp.**" type="rewrite_tag_filter"
2020-10-01 15:59:21 +0900 [info]: #0 adding rewrite_tag_filter rule: sourceType [#<Fluent::PluginHelper::RecordAccessor::Accessor:0x0000000002b2d278 @keys="sourceType">, /(.*)/, "", "cdp-$1"]
2020-10-01 15:59:21 +0900 [info]: adding filter pattern="cdp-zOS-RMF_CPC" type="parser"
2020-10-01 15:59:21 +0900 [info]: adding filter pattern="cdp-zOS-RMF_DELAY" type="record_transformer"
2020-10-01 15:59:21 +0900 [info]: adding filter pattern="cdp-zOS-RMF_DELAY" type="parser"
2020-10-01 15:59:21 +0900 [info]: adding filter pattern="cdp-zOS-RMF_PROCU" type="record_transformer"
2020-10-01 15:59:21 +0900 [info]: adding filter pattern="cdp-zOS-RMF_PROCU" type="parser"
2020-10-01 15:59:21 +0900 [info]: adding filter pattern="cdp-zOS-RMF_SYSINFO" type="record_transformer"
2020-10-01 15:59:21 +0900 [info]: adding filter pattern="cdp-zOS-RMF_SYSINFO" type="parser"
2020-10-01 15:59:21 +0900 [info]: adding filter pattern="cdp-zOS-SYSLOG-Console" type="parser"
2020-10-01 15:59:21 +0900 [info]: adding filter pattern="cdp-zOS-RMF_CPC" type="record_transformer"
2020-10-01 15:59:21 +0900 [info]: adding filter pattern="cdp-zOS-RMF_DELAY" type="record_transformer"
2020-10-01 15:59:21 +0900 [info]: adding filter pattern="cdp-zOS-RMF_PROCU" type="record_transformer"
2020-10-01 15:59:21 +0900 [info]: adding filter pattern="cdp-zOS-RMF_SYSINFO" type="record_transformer"
2020-10-01 15:59:21 +0900 [info]: adding filter pattern="cdp-zOS-SYSLOG-Console" type="record_transformer"
2020-10-01 15:59:21 +0900 [info]: adding match pattern="cdp-**" type="elasticsearch_dynamic"
2020-10-01 15:59:21 +0900 [warn]: #0 Detected ES 7.x: `_doc` will be used as the document `_type`.
2020-10-01 15:59:21 +0900 [info]: adding source type="tcp"
2020-10-01 15:59:21 +0900 [info]: #0 starting fluentd worker pid=26775 ppid=26763 worker=0
2020-10-01 15:59:21 +0900 [info]: #0 fluentd worker is now running worker=0
CDPz側は特にfluentdに特化した意識をする必要はありません。Logstashにデータを送信するのと同じように設定/実行すればOKです。
補足
上の例は、あくまでCDPz提供のものを一律fluentdに置き換える場合の手順例です。
これまでの記事で、CDPz提供のLogstashの設定ファイルを個別にカスタマイズする例をいくつか挙げています。fluentdに置き換えるとそれらの対応はそのままは適用できないので、必要に応じてfluentd用のカスタマイズを行う必要があります。