0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

CDPzを利用したz/OS-ELK連携 - (5)カスタマイズ-共通

Last updated at Posted at 2020-06-28

はじめに

CDPzを利用して各種情報を取り込む際に、その情報源となるミドルウェアやOSごとに、それぞれカスタマイズなどを行う必要が出てきます。が、それらのコンポーネントに限らず、CDPzを使う上で共通の考慮点もありますので、ここでは共通するカスタマイズ、考慮点、Tipsなど整理しておきます。
※ちなみにCDPzにあまり関連しないELKスタックそのものの一般的な考慮点には触れていませんので、念のため...。

関連記事

CDPzを利用したz/OS-ELK連携 - (1)セットアップ編
CDPzを利用したz/OS-ELK連携 - (2)SYSLOG取得編
CDPzを利用したz/OS-ELK連携 - (3)SMF-バッチ編
CDPzを利用したz/OS-ELK連携 - (4)SMF-リアルタイム編
CDPzを利用したz/OS-ELK連携 - (5)カスタマイズ-共通
CDPzを利用したz/OS-ELK連携 - (6)カスタマイズ-SYSLOG
CDPzを利用したz/OS-ELK連携 - (7)カスタマイズ-CICS
CDPzを利用したz/OS-ELK連携 - (8)カスタマイズ-RMF Monitor III
CDPzを利用したz/OS-ELK連携 - (9)カスタマイズ-fluentdの利用

Elasticsearch関連

index template

参考: Put index template API
参考: Dynamic Mapping

CDPz提供のLogstash設定ファイルのElasticsearchへの取り込み部分は以下のようになっています。

Q_CDPz_Elastic.lsh
output {
        elasticsearch {
                hosts => [ "localhost" ]
                action => "index"
                index => "cdp-%{[@metadata][indexname]}-%{+yyyyMMdd}"
        }
}

この定義に従ってElasticsearchにデータを取り込む場合、"cdp-"ではじまるindex名が付けられることになります。
また、index名はData Stream単位、日付単位に別の名前が付けられることになります。

Elasticsearchでは、index単位にフィールドの型のマッピング情報や、設定情報を持たせることができますが、上のように動的にindex名が割り当てられるケースでは、index作成時に適用されるテンプレート「index template」を作成しておくことができます。

ここでは、共通設定として、以下のindex templateを作成することにします。

共通設定
PUT _template/cdp_common
{
  "index_patterns": ["cdp-*"],
  "order" : 0,
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas" : 0
  },
  "mappings": {
    "numeric_detection": true
  }
}

<設定内容補足>

"index_patterns": ["cdp-*"]" : このテンプレート適用対象となるindex名のパターンを指定しています。"cdp-"ではじまる任意のindexが対象です。

"number_of_shards": 1,"number_of_replicas" : 0 : Elasticsearchをクラスター構成していると、データは分散して管理され、冗長化のためにレプリカも保持されます。ここでは、クラスター構成をとっていない簡易的なテスト環境(1ノード)を想定しているため、シャードの数:1, レプリカ数0を指定しています。(デフォルトではそれぞれnumber_of_shards:5, number_of_replicas:1となっており、1ノード構成だとレプリカが作成できないため、ステータスがyellowになってしまいます。)

"numeric_detection": true : これはDynamic Mappingと呼ばれる機能の1つです。Elasticsearchはスキーマレスのデータストアですので、取り込むデータの構造や各フィールドの型を事前に定義しておく必要がありません。最初にデータが取り込まれた段階で動的に構造や型を判断してくれます。型の判断もある一定のルールに従って実施してくれます。その中で、文字列として指定されている数値型のフィールドがあった場合、(例えば、JSONデータ中に、"field01": "+1.3" というようにダブルクォーテーションで括られた値があった場合)、それを数値として解釈してくれる機能があり、numeric_detectionという項目で有効化/無効化が制御できます。デフォルトでは無効化されているので、ここではこの機能を明示的に有効化しています。

"order" : 0 : 1つのindexに、複数のテンプレートがマッチする可能性があるので、その際の順序を指定しています。orderの数値が大きい方の定義が優先され、重複するものは上書きされます。ここではCDPのベースとなる設定をしているので0を指定しています。DATA STREAMごとに個別の設定が必要な場合は、orderの値を1以上にして別途テンプレートを定義することを想定しています。
たとえば、SMF Type88用に個別に定義をしたい場合、以下のようにorderを1に指定し、適宜設定を追加します。

個別定義例
PUT _template/cdp_smf_088
{
  "index_patterns": ["cdp-*smf_088*"],
  "order" : 1,
  "mappings": {
    "properties": {
      "field01": {
        "type": "text"
      },
      "field02": {
        "type": "keyword"
      },
      "field03": {
        "type": "integer"
      },
      "field04": {
        "type": "float"
      }
    }
  }
}

上の例では、いくつかのフィールドについて、明示的にtypeを指定しています。
Elasticsearchは事前に型を明示指定しなくても、データ取りこみ時に最初にIndexが作成されるときに、含まれる値からフィールド毎のtypeを自動的に判断します。そのため、例えば文字列として扱いたいフールドに"数字のみの値"と"アルファベットを含む値"の両方が存在しうるような場合には注意が必要です。
具体的にはHEXの値が含まれるフィールド"fieldHEX"があったとします。最初に投入されたレコードに含まれるfieldHEXの値が"0000"だったとします。これは数値しか含まれていないので、Index作成時にfieldHEXのtypeはlongとして認識されます。その後、"000C"のようなアルファベットを含むHEX値を持つレコードを同じIndexに投入しようとすると、long型のフィールドに文字列を投入することになるので、エラーになってしまいます。
このような場合は、事前にfieldHEXのtypeをテンプレートにて事前定義しておく必要があります。
(逆に数値として扱いたいフィールドに、"N/A"などの文字列が入ってくるケースもあり得ます。その場合は、"N/A"という文字列を0に置き換えるなどの対応が別途必要になります。)
ちなみにテンプレート作成時には、全フィールドを定義する必要がありません。明示指定したいもののみ指定すればよく、テンプレートに含まれないフィールドは動的にtypeが判断されることになります。

Logstash関連

タイムスタンプ取得時のタイムゾーン

CDPz提供のLogstash設定ファイルでは、各レコードのSMFへの書き込み時刻をそのレコードのタイムスタンプとして認識させるために、以下のように個別のフィールド情報から@timestampを設定しています。

filter {
   if [sourceType] == "zOS-SMF_110_2" {
      mutate{ add_field => {
         "[@metadata][timestamp]" => "%{SMFSTDTE} %{SMFSTTME}"
        }}

      date{ match => [
             "[@metadata][timestamp]", "yyyy-MM-dd HH:mm:ss:SS"
        ]}
   }
}

SMFSTDTE + SMFSTTME のフィールドをタイムスタンプとして解釈していますが、内部的にはタイムスタンプ情報はUTCで保持されることになります。例えば、元になる時刻の文字列が、"2020-02-05 10:00:00:00"だったとします。これは日本時間の想定なので、UTCとしては9時間ずれて"2020-02-05 01:00:00:00"で保持されるべきです。
ただ、上の設定ではタイムゾーンが明示指定されていないので、UTCへの変換にはLogstash稼働環境のタイムゾーンが影響します。これが日本以外の環境の場合、実際の時刻とはズレてUTCに変換されてしまいます。
date fileter pluginでは、明示的にタイムゾーンを指定することができるので、認識したい日時情報のタイムゾーンとLogstashの稼働環境のタイムゾーンが異なる場合、以下のように明示的にタイムゾーンを指定するようにして下さい。

 date{ match => [
        "[@metadata][timestamp]", "yyyy-MM-dd HH:mm:ss:SS"
        ]
       timezone => "Asia/Tokyo"
 }

参考: date filter plugin - timezone
参考: Logstashメモ - (2) 設定ファイル関連

indexのタイムゾーン対応

参考: Accessing Event Data and Fields in the Configuration
参考: Ruby filter plugin
参考: Ruby 2.7 - instance method Time#strftime

Logstash経由でElasticsearchにデータを取り込む際、Logstashのoutput pluginで、index名を動的に生成させることができます。この時、日付単位に動的にindex名を分けて管理する、ということがよく行われます。CDPz提供のLogstash設定ファイルでもそのようになっています(以下の %{+yyyyMMdd}という指定の部分)。

Q_CDPz_Elastic.lsh
output {
        elasticsearch {
                hosts => [ "localhost" ]
                action => "index"
                index => "cdp-%{[@metadata][indexname]}-%{+yyyyMMdd}"
        }
}

ところが、日付/時刻情報は内部的にはUTCで保持されているため、上の変数で取得される値もUTCがベースとなっています。そのため、日本のタイムゾーン(JST)の場合、9時間ずれる訳です。このことは以下の記事にもまとめられています。

参考: Elasticsearchのタイムゾーン問題

例えば日本時間で 2020/01/30 05:00:00(JST) のタイムスタンプのデータがあったとすると、それは内部的には2020/01/29 20:00:00(UTC)ということになりますので、%{+yyyMMdd}で取得される値は、"20200129"となる訳です。つまり00:00:00~08:59:59までのデータは前日の日付のindexに格納されるということになります。
これは管理をする上で色々と面倒なので、index名につけられる日付情報は日本のタイムゾーンに合わせる!、 つまり、日本時間で2020/01/30 00:00:00 ~ 2020/01/30 23:59:59 までのデータは、cdp-xxx-20200130 という名前のindexに格納する!、という方が都合がいいことが多いと思います。

そこで、上の参考記事に倣って、output plugin部分を、以下のように変更します。

Q_CDPz_Elastic_cust.lsh
filter {
        ruby {
                code => "event.set('[@metadata][local_time]',event.get('[@timestamp]').time.localtime.strftime('%Y%m%d'))"
        }
}
output {
        elasticsearch {
                hosts => [ "localhost" ]
                action => "index"
                index => "cdp-%{[@metadata][indexname]}-%{[@metadata][local_time]}"
        }
}

設定ファイルの中で"[<フィールド名>]" という書き方をすると、扱っているデータ中の指定したフィールドの値を参照することができます。手前に[@metadata]というのを指定すると、いわゆるメタデータとして扱われ、実データとして出力されません。
上の例では、ruby filter pluginで、rubyのコードを埋め込み、その中で@timestampをローカルタイムに変換し(yyyymmdd形式)、[@metadata][local_time]という変数に格納しています。
最終的に、ここで生成したlocal_timeを使ってindexの名前を生成しています。
これで、index名がローカルのタイムゾーンに準ずる日付でindex名が生成されることになります。

ただし、ここではlocaltimeというメソッドでtimestampの日時をローカルタイムに変換していますので、変換先のタイムゾーンはLogstash稼働環境に依存します。Logstashの稼働環境のタイムゾーンが日本ではない場合、以下のように動的に環境変数を上書きするコードを追加することで、指定したタイムゾーンに変換することができます。

rubyコード部分
filter {
        ruby {
                code => "ENV['TZ'] = 'Asia/Tokyo'
                         event.set('[@metadata][local_time]',event.get('[@timestamp]').time.localtime.strftime('%Y%m%d'))"
        }
}

参考: 任意のタイムゾーンで出力する方法

フィールド操作

フィールド同士の計算をさせたり(何かのエリアの使用率を計算させるなど)、TOD Clockの変換を行うなど、Visualizeをする上で事前に何かしらの計算処理を行わせたい場合があると思います。
処理を入れる箇所としては以下の選択肢が考えられます。

  1. CDPz(Data Stream定義)
    SDEの独自言語でロジックを記述します。簡単な四則演算などはできますが、複雑なロジックを組むのは難しそうです。
    参考: https://www.ibm.com/support/knowledgecenter/ja/SSGE3R_2.1.0/sde_lang_values_operators.html

  2. Logstash
    Ruby filter pluginを利用し、Rubyでロジックを記述できます。
    参考: Logstashメモ - (2) 設定ファイル関連 - ruby filter plugin

  3. Kibana
    scripted fieldという機能を利用し、painlessという言語でロジックを記述できます。可視化の度にスクリプトが動くので、事前に必要なことが分かっているのであれば、1 or 2の方法を取るのがお勧めです。
    参考:
    KibanaのScript FieldでPainlessを使う
    Painless scripting language
    z/OSの新しい管理方法を探る - (2)SYSLOGのElasticsearchへの取り込み - scripted field

  4. Elasticsearch
    一旦Elasticsearchに情報を取り込んだ後に、Elasticsearch提供のAPIを使って独立した別アプリケーションでデータを加工して再度Elasticsearchに戻すということも考えられます。より複雑な加工をするような場合はこのような方法を考慮する必要があるかもしれません。扱うデータが大量になるケースでは、Apache Sparkなどの利用が考えられます。
    参考: ElasticsearchのデータをApache Sparkで加工する

ここでは、Logstashのruby filter pluginを利用するケースを想定します。

四則演算用Rubyコード

2つの任意のフィールドに対して、足算、引算、掛算、割算、パーセンテージの処理行う汎用的なRubyコードを用意しました。
詳細はこちら => 2つのフィールドに対して四則演算を行う
これを利用して、必要な計算処理をLogstash設定ファイルに組み込むことができます。

TOD Clock変換用Rubyコード

SMFのフィールドでは、TOD Clockタイプでデータが格納されているフィールドがあります。

TOD Clockのフォーマットについては、以下のドキュメントに詳細が記載されています。
参考: z/Architecture Principles of Operation

The TOD clock nominally is incremented by adding a one in bit position 51 every microsecond. In models having a higher or lower resolution, a different bit position is incremented at such a frequency that the rate of advancing the clock is the same as if a one were added in bit position 51 every microsecond. The resolution of the TOD clock is such that the incrementing rate is comparable to the instruction-execution rate of the model.
The stepping value of TOD-clock bit position 63, if implemented, is 2^-12 microseconds, or approximately 244 picoseconds. This value is called a clock unit.

2^12=4096なので、4096で割算(右bitシフト)してあげるとmicro秒単位になり、さらに10^6で割算すれば秒単位に変換されます。すなわち、TOD Clockタイプの値は、4096E6で割算してあげると、秒単位に変換することができます。
このTOD Clock => 秒 変換用のRubyコードを用意します。

field_tod.rb
def register(params)
  @source_field = params["source_field"]
  @target_field = params["target_field"]
end

def filter(event)
  if !event.get("[#{@source_field}]").nil? then
    source_value = event.get("[#{@source_field}]").to_f
    target_value = source_value / 4096E6
    event.set("[#{@target_field}]", target_value)
  end
  return [event]
end

上のコードを利用するLogstash設定ファイルの例を以下に示します。

Logstash設定ファイル例
input {
        stdin { }
}

filter {
        csv{
                columns => ["field01","field02","field03"]
        }

        ruby {
                path => "/root/logstash/field_tod.rb"
                script_params => {
                        "source_field" => "field03"
                        "target_field" => "field_sec"
                }
        }
}

output {
        stdout{ }
}
実行例
aaa,111,122.88E3
{
     "field_sec" => 3.0e-05,
          "host" => "test08",
       "field01" => "aaa",
      "@version" => "1",
       "field02" => "111",
       "message" => "aaa,111,122.88E3",
    "@timestamp" => 2020-02-05T11:25:49.309Z,
       "field03" => "122.88E3"
}

秒単位に変換された結果がfield_secに格納されました。

この例に倣って、TOD Clockフォーマットのフィールドを秒単位に変換することができます。
※この例では、新たにfield_secというフィールドに変換結果をセットするようにしていますが、source_fieldと同一フィールド名をtarget_fieldに指定すれば、結果が同フィールドに上書きされます。

無駄なフィールドの削除

取り込まれるデータのサイズが大きくなると、当然パフォーマンスにも影響しますので、不要なフィールドはできるだけ排除してElasticsearchに取り込むのがよいでしょう。特に元のcsv形式のデータが保持されているmessageフィールドは、個々のフィールド情報を識別した後は冗長なデータとなってしまうので、削除しておいた方がよいでしょう。デバッグ目的では、messageフィールドのCSVの各値がそれぞれ適切なフィールドとしてマッピングされているかを確認するために残すというのはもちろんアリだと思います。
例えば、Mutate filter pluginのremove_filedなどが利用できます。
参考: Mutate filter plugin - remove_field

Out of Memory対応

CDPzで大量のSMFデータをバッチで転送する場合、LogstashがOut of Memoryでダンプ吐いて落ちることがあります。これは、Logstashのパイプライン処理で使わるキューが、デフォルトでは全てMemory上でハンドリングされているためです。(ちなみにこのメモリー上のキューのサイズは固定で変更できないらしい...)
Persistent Queueという機能を使って、このキューをディスクに書き出すような設定を行うことで、Out of Memoryを回避できます。
参考: Persistent Queues
具体的には、Logstashの設定ファイル(logstash.ymlもしくはpipelines.yml)で、以下のような設定を行います。

  queue.type: persisted
  path.queue: "/etc/logstash/queue"
  queue.max_bytes: 8gb

こうすることで、指定したディレクトリ下に上限max_bytesのキューのファイルが作成されます。

同一キーでのデータの結合/更新

データを取り込む際に、特定のフィールドをキーにして、同一キーのフィールドを1つのレコードにマージするような操作をすることができます(joinに近い)。
詳細は以下をご参照ください。
参考: Logstashメモ - (2) 設定ファイル関連 - 同一キーでのデータの結合/更新
ただし、この方法はindexをまだがる複数レコードでは利用できませんのでご注意ください。

また、fluentdでも同様のことが可能です。
参考: fluentdメモ - (3) 設定ファイル調査 Input/Fileter/Output編 - 同一キーでのデータの結合/更新

Kibana関連

タイムゾーンの設定

上に挙げたように、ELKスタックではタイムスタンプは内部的にUTCで保持されます。Kibanaで可視化する際に、タイムスタンプをどのTimezoneとして表示するかは、Kibana上の設定に依存します。Timezoneの設定は以下のメニューから確認/変更できます。

Kibanaの左側メニューの一番下Management - Advanced Settingを選択します。
image.png

Timezoneの設定を確認/変更します。
image.png

デフォルトでは、Browserとなっており、ブラウザの設定に依存します。プルダウンメニューから明示的にタイムゾーン(Asia/Tokyoなど)を指定することができますので、適宜環境に合わせて設定してください。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?