0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

CDPzを利用したz/OS-ELK連携 - (9)カスタマイズ-fluentdの利用

Last updated at Posted at 2020-06-28

はじめに

タイトルにELK連携とあるように、基本的にはElasticsearch/Logstash/Kibanaをベースに考えてきましたが、Logstashは大量データを扱う場合など、パフォーマンス上ちょっと厳しいケースが出てきます。
ここではLogstashの代わりにfluentdを利用してCDPzのデータをElasticsearchに取り込む方法について記載します。

関連記事

CDPzを利用したz/OS-ELK連携 - (1)セットアップ編
CDPzを利用したz/OS-ELK連携 - (2)SYSLOG取得編
CDPzを利用したz/OS-ELK連携 - (3)SMF-バッチ編
CDPzを利用したz/OS-ELK連携 - (4)SMF-リアルタイム編
CDPzを利用したz/OS-ELK連携 - (5)カスタマイズ-共通
CDPzを利用したz/OS-ELK連携 - (6)カスタマイズ-SYSLOG
CDPzを利用したz/OS-ELK連携 - (7)カスタマイズ-CICS
CDPzを利用したz/OS-ELK連携 - (8)カスタマイズ-RMF Monitor III
CDPzを利用したz/OS-ELK連携 - (9)カスタマイズ-fluentdの利用

fluentdインストール

この辺を参考に...
fluentdメモ - (1) インストール/簡易操作
Fluentd - Installation

プラグインの設定

CDPzから送信されてくるデータは、以下のように、messageの項目内にいくつかのレコードが配列としてまとめられて送付されてきます。以下はSYSLOGの例ですがSMFのデータでも同様です。

{
      "seq" => {
        "w" => "1",
        "c" => "2"
      },
      "sysplexName" => "EPLEX",
       "@timestamp" => 2020-01-30T10:36:29.289Z,
       "systemName" => "ZOS1",
       "message" => [
        [0] "NE,007C,20022 15.02.43.770 +0900,ZOS1    ,STC00809,        ,00000000000000000000000000000000,00000290,CICS0041,00,\" IEA630I  OPERATOR CICS004$ NOW ACTIVE,   SYSTEM=ZOS1    , LU=0944FEF3\"\n",
        [1] "NE,007C,20022 15.02.43.770 +0900,ZOS1    ,STC00809,CICS004$,00000000000000000000000000000000,00000290,CICS0041,80,\" D T\"\n",
        [2] "NE,007C,20022 15.02.43.770 +0900,ZOS1    ,STC00809,CICS004$,00000000000000000000000000000000,00000090,CICS0041,40,\" IEE136I LOCAL: TIME=15.02.43 DATE=2020.022  UTC: TIME=06.02.43 DATE=2020.022\"\n",
        [3] "NE,007C,20022 15.02.44.790 +0900,ZOS1    ,STC00809,        ,00000000000000000000000000000000,00000290,CICS0041,00,\" IEA631I  OPERATOR CICS004$ NOW INACTIVE, SYSTEM=ZOS1    , LU=0944FEF3\"\n"
       ],
       "host" => "EPLEX1",
       "port" => 46768,
       "sourceType" => "zOS-SYSLOG-Console",
       "path" => "SYSLOG",
       "timeZone" => "+0900",
       "inputsequence" => "20200122060248560:000000",
       "@version" => "1",
       "sourceName" => "ZOS1-SYSLOG"
}

Logstashでは、splitというfilterプラグインを使うことでこれらの配列を個別のレコードに分解できます。
参考: Split filter plugin
しかし、fluentdだと出来合いのfilterでは同じような動作をさせるのが難しいようです。そこで、Logstashのsplitと同じような動作をさせる独自の機能を用意し、fluentdのプラグインとして設定しておきます。

具体的には、fluentdのプラグイン"json_array_split"として動作する以下のrubyコードを用意し、fluentdのプラグインのパスに配置します(デフォルトだと/etc/td-agent/plugin)。

/etc/td-agent/plugin/filter_json_array_split.rb
#
# Copyright 2020- TODO: Write your name
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


require 'fluent/plugin/filter'

module Fluent
  module Plugin
    class JsonArraySplitFilter < Fluent::Plugin::Filter
      Fluent::Plugin.register_filter("json_array_split", self)

      config_param :target_key, :string, default: 'message'
      config_param :keep_keys, :bool, default: true

      def filter_stream(tag, es)

        new_es = MultiEventStream.new

        es.each do |time, record|

          target_key_value = record[@target_key]

          if target_key_value.instance_of?(Array) then
            target_key_value.each { |r|
              result = {}
              result[@target_key] = r

              if @keep_keys then
                record.each do |key, value|
                  if key != @target_key then
                    result[key] = value
                  end
                end
              end

              new_es.add(time, result)
            }
          end
        end

        new_es # return
      end
    end
  end
end

これで、Logstashのsplitと類似の動きをする"json_array_split"が利用できるようになります。

fluentd設定ファイルの準備

Input/Filter/Output部分をそれぞれ別ファイルとして用意します。
最後に、各Input/Filter/OutputをIncludeする親の設定ファイルを作成することにします。

Input

ここは取得するData Streamに依存しない共通部分です。
TCPでデータを受け取ってデータをJSONとして解釈します。
取得するデータの種類によらず共通です。必要に応じてポート番号などは修正します。

/etc/td-agent/cdp01_input/B_CDPz_Input.conf
<source>
  @type tcp
  port 8081
  tag tcp.event
  bind 0.0.0.0
  <parse>
    @type json
  </parse>
</source>

Filter(共通)

以下の処理を行います。

  • 上で定義したプラグイン"json_array_split"を使ってmessage部分の配列を分割
  • Elasticsearchに取り込むIndex名追加
  • sourceTypによるタグ付け

このFilterの定義も共通なので、上のInput用の定義と同じディレクトリに配置することにします。

/etc/td-agent/cdp01_input/E_CDPz_Index.conf
<filter tcp.**>
  @type json_array_split
  target_key message
</filter>

<filter tcp.**>
  @type record_transformer
  enable_ruby
  <record>
    indexname ${record["sourceType"]}-${record["sysplexName"]}
  </record>
</filter>

<match tcp.**>
  @type rewrite_tag_filter
  <rule>
    key sourceType
    pattern /(.*)/
    tag cdp-$1
  </rule>
</match>

Filter (Data Stream別)

Data StreamごとにCSVをフィールド別に解釈したり、タイムスタンプ情報を拾ってきたりする部分ですが、これはCDPzが提供するLogstash用の設定ファイル(H_xxx.lsh, N_xxx.lsh)をスクリプトにより一括変換します。

変換用シェルスクリプト

convert.sh
#!/bin/bash

logstash_conf_dir=$1
fluentd_conf_dir=$2

cd ${logstash_conf_dir}

find ./ -name "H_*" |  sed -r -e "s/\.\/(.*).lsh/\\1/g" | xargs -I{} gawk -v output=${fluentd_conf_dir}/{}.conf '
/sourceType/ {
str = sprintf("<filter cdp-%s>\n", gensub(/^.*"(.+)".*$/, "\\1", "g") )
str = str "  @type parser\n"
str = str "  key_name message\n"
str = str "  reserve_data true\n"
str = str "  <parse>\n"
str = str "    @type csv"
print str > output
}
/columns/ {
str = gensub(/^.*\[[ ]+(.*)[ ]+\]$/, "\\1", "g")
gsub(/[" ]/, "", str)
str = sprintf("    keys %s\n", str)
str = str "  </parse>\n"
str = str "</filter>"
print str >> output
}
' {}.lsh


find ./ -name "N_*" |  sed -r -e "s/\.\/(.*).lsh/\\1/g" | xargs -I{} gawk -v output=${fluentd_conf_dir}/{}.conf '
/sourceType/ {
str = sprintf("<filter cdp-%s>\n", gensub(/^.*"(.+)".*$/, "\\1", "g") )
str = str "  @type record_transformer\n"
str = str "  enable_ruby\n"
str = str "  <record>"
print str > output
}
/"\[\@metadata\]\[timestamp\]" \=>/ {
date = gensub(/^ +"\[@metadata\]\[timestamp\]" \=> "\%\{([0-9A-Za-z]+)\}( \%\{([0-9A-Za-z]+)\})?"$/, "\\1", "g")
time = gensub(/^ +"\[@metadata\]\[timestamp\]" \=> "\%\{([0-9A-Za-z]+)\}( \%\{([0-9A-Za-z]+)\})?"$/, "\\3", "g")
if (time == "" && gensub(/^.*(IMS).*$/, "\\1", "g", output) == "IMS")
  str = sprintf("    time_key ${require \"date\"; DateTime.strptime(record[\"%s\"] + record[\"timeZone\"], \"%%Y-%%m-%%d %%H:%%M:%%S.%%L%%z\").iso8601.to_s}\n", date)
else if (time == "")
  str = sprintf("    time_key ${require \"date\"; DateTime.strptime(record[\"%s\"], \"%%y%%j %%H.%%M.%%S.%%L %%z\").iso8601(6).to_s}\n", date)
else
  str = sprintf("    time_key ${require \"date\"; DateTime.strptime(record[\"%s\"] + \"T\" + record[\"%s\"] + record[\"timeZone\"], \"%%Y-%%m-%%dT%%H:%%M:%%S:%%L%%z\").iso8601.to_s}\n", date, time)
str = str "  </record>\n"
str = str "</filter>"
print str >> output
}
' {}.lsh

使い方: convert.sh <logstash_dir> <flutend_dir>
<logstash_dir>: 変換元のH_xxx.lsh, N_xxx.lshが格納されているディレクトリ
<fluentd_dir>: 変換後のfluentd設定ファイルを出力するディレクトリ

これにより、<logstash_dir>以下にあるH_xxx.lsh,N_xxx.lshが全てfluentd用の設定ファイルに変換されて、<fluentd_dir>以下に、H_xxx.conf, N_xxx.conf という名前で出力されます。

取得するData Streamに合わせて、必要な定義を適当なディレクトリに配置します。
ここでは、SYSLOGとRMF Monitor IIIの情報を取得する想定で、/etc/td-agent/cdp02_syslog_rmfというディレクトリを作成し、変換後のSYSLOG関連とRMF Monitor III関連のファイルを配置することにします。
H_RMF_CPC.conf
H_RMF_DELAY.conf
H_RMF_PROCU.conf
H_RMF_SYSINFO.conf
H_SYSLOG_or_OPERLOG.conf
N_RMF_CPC.conf
N_RMF_DELAY.conf
N_RMF_PROCU.conf
N_RMF_SYSINFO.conf
N_SYSLOG_or_OPERLOG.conf

※上の変換スクリプトだと一部不備があるので、RMF Monitor III関連の DELAY, PROCU, SYSINFOに関してはH_XXX.conf, N_XXX.confを以下のようにカスタマイズします。

H_RMF_PROCU.conf
<filter cdp-zOS-RMF_PROCU>
  @type record_transformer
  enable_ruby
  <record>
    message ${record["message"].gsub(/N\/A/,'0')}
  </record>
</filter>

<filter cdp-zOS-RMF_PROCU>
  @type parser
  key_name message
  reserve_data true
  <parse>
    @type csv
    keys LOCALSTART,LOCALEND,PRUPJOB,PRUPASI,PRUPCLA,PRUPCLAX,PRUPSVCL,PRUPCLP,PRUPCPT,PRUPAACT,PRUPCBCT,PRUPIICT,PRUPCPE,PRUPAAPE,PRUPCBPE,PRUPIIPE,PRUPTOTC,PRUPTOTE,PRUPTCB,PRUPSRB,PRUPPCS,PRUP
EPS,PRUTCPUT
  </parse>
</filter>

H_RMF_PROCU.confのの先頭にfileterを1つ追加します。数値フィールドにN/Aが返される場合があるので、N/Aを一律0に置き換えています。H_RMF_DELAY.conf, H_RMF_SYSINFO.confも同様に修正します。

N_RMF_PROCU.conf
<filter cdp-zOS-RMF_PROCU>
  @type record_transformer
  enable_ruby
  <record>
    time_key ${require "date"; DateTime.strptime(record["LOCALEND"] + record["timeZone"], "%Y-%m-%d %H:%M:%S:%L%z").iso8601(6).to_s}
  </record>
</filter>

time_keyの指定を上のように変更します。N_RMF_DELAY.conf,N_RMF_SYSINFO.confも同様に修正します。

また、PROCUのPRUPCLP(Service class period)というフィールドには数値と"*"が混在するので、このフィールドは明示的に文字列フィールドとしてElasticsearch のIndex Templateを作成しておくことにします。

PUT _template/cdp_procu
{
  "index_patterns": ["cdp-zos-rmf_procu-*"],
  "order" : 1,
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas" : 0
  },
  "mappings": {
    "numeric_detection": true,
    "properties": {
      "PRUPCLP": {
        "type": "text"
      }
    }
  }
}

上の例に従って、事前にElasticsearch側でIndex Templateを作成しておきます。

Output

ここも取得するData Streamに依存しない共通部分です。
Elasticsearchへの出力に関する設定を行います。

/etc/td-agent/cdp03_output_to_elastic/Q_CDPz_Elastic.conf
<match cdp-**>
  @type elasticsearch_dynamic
  host localhost
  port 9200
  logstash_format true
  logstash_prefix cdp-${record['indexname']}
  logstash_dateformat %Y%m%d
  time_key time_key
  utc_index false
</match>

設定ファイルの統合

ここまでで、/etc/td-agentディレクトリ以下のファイル構造を見てみると、こんな感じになっています。

.
|-- cdp01_input
|   |-- B_CDPz_Input.conf
|   `-- E_CDPz_Index.conf
|-- cdp02_syslog_rmf
|   |-- H_RMF_CPC.conf
|   |-- H_RMF_DELAY.conf
|   |-- H_RMF_PROCU.conf
|   |-- H_RMF_SYSINFO.conf
|   |-- H_SYSLOG_or_OPERLOG.conf
|   |-- N_RMF_CPC.conf
|   |-- N_RMF_DELAY.conf
|   |-- N_RMF_PROCU.conf
|   |-- N_RMF_SYSINFO.conf
|   `-- N_SYSLOG_or_OPERLOG.conf
|-- cdp03_output_to_elastic
|   `-- Q_CDPz_Elastic.conf
|-- plugin
|   `-- filter_json_array_split.rb
...

さて、上で用意した各種設定ファイルを取りまとめる親玉の設定ファイル"td-agent-syslog-rmf.conf"を作成します。

/etc/td-agent/td-agent-syslog-rmf.conf
@include /etc/td-agent/cdp01_input/*.conf
@include /etc/td-agent/cdp02_syslog_rmf/*.conf
@include /etc/td-agent/cdp03_output_to_elastic/*.conf

はい、これで設定ファイルはできあがりです。

fluentd実行

上で作成した設定ファイルを指定して、fluentdを起動すればOKです。
td-agent -c td-agent-syslog-rmf.conf

起動例
[root@test11 /etc/td-agent]# td-agent -c td-agent-syslog-rmf.conf
2020-10-01 15:59:20 +0900 [info]: parsing config file is succeeded path="td-agent-syslog-rmf.conf"
2020-10-01 15:59:20 +0900 [info]: gem 'fluent-plugin-elasticsearch' version '4.2.0'
2020-10-01 15:59:20 +0900 [info]: gem 'fluent-plugin-elasticsearch' version '4.0.9'
2020-10-01 15:59:20 +0900 [info]: gem 'fluent-plugin-kafka' version '0.13.0'
2020-10-01 15:59:20 +0900 [info]: gem 'fluent-plugin-prometheus' version '1.8.0'
2020-10-01 15:59:20 +0900 [info]: gem 'fluent-plugin-prometheus_pushgateway' version '0.0.2'
2020-10-01 15:59:20 +0900 [info]: gem 'fluent-plugin-record-modifier' version '2.1.0'
2020-10-01 15:59:20 +0900 [info]: gem 'fluent-plugin-rewrite-tag-filter' version '2.3.0'
2020-10-01 15:59:20 +0900 [info]: gem 'fluent-plugin-s3' version '1.3.2'
2020-10-01 15:59:20 +0900 [info]: gem 'fluent-plugin-systemd' version '1.0.2'
2020-10-01 15:59:20 +0900 [info]: gem 'fluent-plugin-td' version '1.1.0'
2020-10-01 15:59:20 +0900 [info]: gem 'fluent-plugin-td-monitoring' version '0.2.4'
2020-10-01 15:59:20 +0900 [info]: gem 'fluent-plugin-webhdfs' version '1.2.5'
2020-10-01 15:59:20 +0900 [info]: gem 'fluentd' version '1.11.1'
2020-10-01 15:59:20 +0900 [info]: adding rewrite_tag_filter rule: sourceType [#<Fluent::PluginHelper::RecordAccessor::Accessor:0x00000000025005a8 @keys="sourceType">, /(.*)/, "", "cdp-$1"]
2020-10-01 15:59:20 +0900 [info]: using configuration file: <ROOT>
  <source>
    @type tcp
    port 8081
    tag "tcp.event"
    bind "0.0.0.0"
    <parse>
      @type "json"
    </parse>
  </source>
  <filter tcp.**>
    @type json_array_split
    target_key "message"
  </filter>
  <filter tcp.**>
    @type record_transformer
    enable_ruby
    <record>
      indexname ${record["sourceType"]}-${record["sysplexName"]}
    </record>
  </filter>
  <match tcp.**>
    @type rewrite_tag_filter
    <rule>
      key "sourceType"
      pattern /(.*)/
      tag "cdp-$1"
    </rule>
  </match>
  <filter cdp-zOS-RMF_CPC>
    @type parser
    key_name "message"
    reserve_data true
    <parse>
      @type "csv"
      keys LOCALSTART,LOCALEND,CPCHPNAM,CPCHMOD,CPCHMDL,CPCHBSTT,CPCHCMSU,CPCHWF,CPCHLMSU,CPCHGNAM,CPCHIMSU,CPCHCAP,CPCHLMAX,CPCHGLIM,CPCHGL4H,CPCHMTMS,CPCHPRDS,CPCHAMSU,CPCHRMSU,CPCHRGRP,CPCHGAUN,CPCHCPU,CPCHCPCN,CPCHCPNO,CPCHICNO,CPCHIFAN,CPCHCBPN,CPCHICFN,CPCHIFLN,CPCHSUPN,CPCHPANO,CPCHWAIT,CPCHPMSU,CPCHDEDC,CPCHDEDA,CPCHDEDO,CPCHDEDI,CPCHSHRC,CPCHSHRA,CPCHSHRO,CPCHSHRI,CPCHVCPU,CPCHWMGT,CPCHCCAI,CPCHCCCR,CPCHPRD,CPCHMCFS,CPCHMCF,CPCHCFS,CPCHCF,CPCHATDS,CPCHATD,CPCHMTM,CPCHMDLX,CPCHBSTC,CPCHCUTL,CPCHAUTL,CPCHOUTL,CPCHUUTL,CPCHLUTL,CPCHFUTL,CPCHATDI,CPCHCFI,CPCHMCFI,CPCHMTMI,CPCHPRDI,CPCPPNAM,CPCPDMSU,CPCPAMSU,CPCPCAPD,CPCPLPNO,CPCPLEFU,CPCPLTOU,CPCPPLMU,CPCPPEFU,CPCPPTOU,CPCPIND,CPCPLPND,CPCPDEDP,CPCPWGHT,CPCPLPSH,CPCPVCMH,CPCPVCMM,CPCPVCML,CPCPOSNM,CPCPLPCN,CPCPLCIW,CPCPLCMW,CPCPLCXW,CPCPCGNM,CPCPCGLT,CPCPCGEM,CPCPCGEX,CPCPCSMB,CPCPUPID,CPCPCAPI,CPCPHWCC,CPCPHGNM,CPCPHWGC,CPCPBIIP,CPCPBSPD
    </parse>
  </filter>
  <filter cdp-zOS-RMF_DELAY>
    @type record_transformer
    enable_ruby
    <record>
      message ${record["message"].gsub(/N\/A/,'0')}
    </record>
  </filter>
  <filter cdp-zOS-RMF_DELAY>
    @type parser
    key_name "message"
    reserve_data true
    <parse>
      @type "csv"
      keys LOCALSTART,LOCALEND,JDELDAN,JDELASI,JDETYPX,JDEPSVCL,JDEGMIP,JDELWFL,JDELUSG,JDELDEL,JDELIDL,JDELUKN,JDELPROC,JDELDEV,JDELSTOR,JDELSUBS,JDELOPER,JDELENQ,JDELJES,JDELHSM,JDELXCF,JDELMNT,JDELMES,JDELQUI,JDELCAP,JDELCP,JDELCBP,JDELSUP,JDELIFA,JDELREAS
    </parse>
  </filter>
  <filter cdp-zOS-RMF_PROCU>
    @type record_transformer
    enable_ruby
    <record>
      message ${record["message"].gsub(/N\/A/,'0')}
    </record>
  </filter>
  <filter cdp-zOS-RMF_PROCU>
    @type parser
    key_name "message"
    reserve_data true
    <parse>
      @type "csv"
      keys LOCALSTART,LOCALEND,PRUPJOB,PRUPASI,PRUPCLA,PRUPCLAX,PRUPSVCL,PRUPCLP,PRUPCPT,PRUPAACT,PRUPCBCT,PRUPIICT,PRUPCPE,PRUPAAPE,PRUPCBPE,PRUPIIPE,PRUPTOTC,PRUPTOTE,PRUPTCB,PRUPSRB,PRUPPCS,PRUPEPS,PRUTCPUT
    </parse>
  </filter>
  <filter cdp-zOS-RMF_SYSINFO>
    @type record_transformer
    enable_ruby
    <record>
      message ${record["message"].gsub(/N\/A/,'0')}
    </record>
  </filter>
  <filter cdp-zOS-RMF_SYSINFO>
    @type parser
    key_name "message"
    reserve_data true
    <parse>
      @type "csv"
      keys LOCALSTART,LOCALEND,SYSPARVC,SYSMODVC,SYSMDLVC,SYSTSVVC,SYSPOLVC,SYSPRVVC,SYSCUVVC,SYSTSEVC,SYSPADVC,SYSPROVC,SYSLCPVC,SYSAPOVC,SYSPATVC,SYSPRTVC,SYSAPTVC,SYSAOCVC,SYSATCVC,SYSLOAVG,SYSCVAVC,SYSCUAVC,SYSMUAVC,SYSCUOVC,SYSMUOVC,SYSCUIVC,SYSMUIVC,SYSAHPVC,SYSIHPVC,SYSPKCVC,SYSPKAVC,SYSPKOVC,SYSPKIVC,SYSPRIVC,SYSAPIVC,SYSICVVC,SYSAICVC,SYSIPVVC,SYSOPVVC,SYSVEPVC,SYSTCTVC,SYSUCTVC,SYSCCTVC,SYSCULVC,SYSNAMVC,SYSTYPVC,SYSWFLVC,SYSTUSVC,SYSAUSVC,SYSTRSVC,SYSAFCVC,SYSRSPM,SYSAUPVC,SYSAUDVC,SYSADPVC,SYSADDVC,SYSADSVC,SYSADUVC,SYSADOVC,SYSADEVC,SYSADJVC,SYSADHVC,SYSADXVC,SYSADNVC,SYSADMVC,SYSCPUVC,SYSSRBVC,SYSTCBVC,SYSIFAVC,SYSCPVC,SYSIFCVC,SYSRSPVC,SYSVELVC,SYSUGMVC,SYSUGPVC,SYSUGDVC,SYSWGDVC,SYSWGPVC,SYSDGMVC,SYSUJMVC,SYSDJMVC,SYSDGEVC,SYSDGHVC,SYSDGDVC,SYSDGJVC,SYSDGOVC,SYSDGPVC,SYSDGSVC,SYSDGUVC,SYSDGXVC,SYSDDSIN,SYSDDSIT,SYSDDSIP,SYSRCTNT,SYSEAPVC,SYSLPIVC,SYSSUPVC,SYSSUCVC,SYSPDPVC,SYSTODVC,SYSCPDVC,SYSAPDVC,SYSIPDVC,SYSRGCVC,SYSDLYVC,SYSMEMUS,SYSCBPVC,SYSCBCVC,SYSCBDVC,SYSVECVC
    </parse>
  </filter>
  <filter cdp-zOS-SYSLOG-Console>
    @type parser
    key_name "message"
    reserve_data true
    <parse>
      @type "csv"
      keys rcd,ASID,TIMESTAMP,SMFID,JOBNUM,CONSOLE,ROUTECODE,DESCRIPTOR,JOBNAME,FLAGS,TEXT
    </parse>
  </filter>
  <filter cdp-zOS-RMF_CPC>
    @type record_transformer
    enable_ruby
    <record>
      time_key ${require "date"; DateTime.strptime(record["LOCALEND"], "%y%j %H.%M.%S.%L %z").iso8601(6).to_s}
    </record>
  </filter>
  <filter cdp-zOS-RMF_DELAY>
    @type record_transformer
    enable_ruby
    <record>
      time_key ${require "date"; DateTime.strptime(record["LOCALEND"] + record["timeZone"], "%Y-%m-%d %H:%M:%S:%L%z").iso8601(6).to_s}
    </record>
  </filter>
  <filter cdp-zOS-RMF_PROCU>
    @type record_transformer
    enable_ruby
    <record>
      time_key ${require "date"; DateTime.strptime(record["LOCALEND"] + record["timeZone"], "%Y-%m-%d %H:%M:%S:%L%z").iso8601(6).to_s}
    </record>
  </filter>
  <filter cdp-zOS-RMF_SYSINFO>
    @type record_transformer
    enable_ruby
    <record>
      time_key ${require "date"; DateTime.strptime(record["LOCALEND"] + record["timeZone"], "%Y-%m-%d %H:%M:%S:%L%z").iso8601(6).to_s}
    </record>
  </filter>
  <filter cdp-zOS-SYSLOG-Console>
    @type record_transformer
    enable_ruby
    <record>
      time_key ${require "date"; DateTime.strptime(record["TIMESTAMP"], "%y%j %H.%M.%S.%L %z").iso8601(6).to_s}
    </record>
  </filter>
  <match cdp-**>
    @type elasticsearch_dynamic
    host "localhost"
    port 9200
    logstash_format true
    logstash_prefix "cdp-${record[\'indexname\']}"
    logstash_dateformat "%Y%m%d"
    time_key "time_key"
    utc_index true
  </match>
</ROOT>
2020-10-01 15:59:20 +0900 [info]: starting fluentd-1.11.1 pid=26763 ruby="2.4.10"
2020-10-01 15:59:20 +0900 [info]: spawn command to main:  cmdline=["/opt/td-agent/embedded/bin/ruby", "-Eascii-8bit:ascii-8bit", "/usr/sbin/td-agent", "-c", "td-agent-syslog-rmf.conf", "--under-supervisor"]
2020-10-01 15:59:21 +0900 [info]: adding filter pattern="tcp.**" type="json_array_split"
2020-10-01 15:59:21 +0900 [info]: adding filter pattern="tcp.**" type="record_transformer"
2020-10-01 15:59:21 +0900 [info]: adding match pattern="tcp.**" type="rewrite_tag_filter"
2020-10-01 15:59:21 +0900 [info]: #0 adding rewrite_tag_filter rule: sourceType [#<Fluent::PluginHelper::RecordAccessor::Accessor:0x0000000002b2d278 @keys="sourceType">, /(.*)/, "", "cdp-$1"]
2020-10-01 15:59:21 +0900 [info]: adding filter pattern="cdp-zOS-RMF_CPC" type="parser"
2020-10-01 15:59:21 +0900 [info]: adding filter pattern="cdp-zOS-RMF_DELAY" type="record_transformer"
2020-10-01 15:59:21 +0900 [info]: adding filter pattern="cdp-zOS-RMF_DELAY" type="parser"
2020-10-01 15:59:21 +0900 [info]: adding filter pattern="cdp-zOS-RMF_PROCU" type="record_transformer"
2020-10-01 15:59:21 +0900 [info]: adding filter pattern="cdp-zOS-RMF_PROCU" type="parser"
2020-10-01 15:59:21 +0900 [info]: adding filter pattern="cdp-zOS-RMF_SYSINFO" type="record_transformer"
2020-10-01 15:59:21 +0900 [info]: adding filter pattern="cdp-zOS-RMF_SYSINFO" type="parser"
2020-10-01 15:59:21 +0900 [info]: adding filter pattern="cdp-zOS-SYSLOG-Console" type="parser"
2020-10-01 15:59:21 +0900 [info]: adding filter pattern="cdp-zOS-RMF_CPC" type="record_transformer"
2020-10-01 15:59:21 +0900 [info]: adding filter pattern="cdp-zOS-RMF_DELAY" type="record_transformer"
2020-10-01 15:59:21 +0900 [info]: adding filter pattern="cdp-zOS-RMF_PROCU" type="record_transformer"
2020-10-01 15:59:21 +0900 [info]: adding filter pattern="cdp-zOS-RMF_SYSINFO" type="record_transformer"
2020-10-01 15:59:21 +0900 [info]: adding filter pattern="cdp-zOS-SYSLOG-Console" type="record_transformer"
2020-10-01 15:59:21 +0900 [info]: adding match pattern="cdp-**" type="elasticsearch_dynamic"
2020-10-01 15:59:21 +0900 [warn]: #0 Detected ES 7.x: `_doc` will be used as the document `_type`.
2020-10-01 15:59:21 +0900 [info]: adding source type="tcp"
2020-10-01 15:59:21 +0900 [info]: #0 starting fluentd worker pid=26775 ppid=26763 worker=0
2020-10-01 15:59:21 +0900 [info]: #0 fluentd worker is now running worker=0

CDPz側は特にfluentdに特化した意識をする必要はありません。Logstashにデータを送信するのと同じように設定/実行すればOKです。

補足

上の例は、あくまでCDPz提供のものを一律fluentdに置き換える場合の手順例です。
これまでの記事で、CDPz提供のLogstashの設定ファイルを個別にカスタマイズする例をいくつか挙げています。fluentdに置き換えるとそれらの対応はそのままは適用できないので、必要に応じてfluentd用のカスタマイズを行う必要があります。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?