LoginSignup
11
3

More than 3 years have passed since last update.

LogstashのDead Letter Queueを可視化してみた

Last updated at Posted at 2019-05-26

はじめに

LogstashのDead Letter Queueを監視出来るようKibanaで可視化してみたので
ナレッジ共有として記事の投稿します^^

利用環境

product version
logstash 6.5.4
Elasticsearch 6.5.4
kibana 6.5.4
Java 1.8.0
OS(EC2) Amazon Linux2 (t3.small)
AMI ID ami-06b382aba6c5a4f2c
Region us-east-1

【構成図】
image.png

Dead Letter Queueとは

ログ分析のためにLogstashでログを加工して、Elasticsearchに格納する時にデータ型が合わず
うまくIndexingが出来ないこと(Mapping Error)があります。
Mapping Error(404)が発生するとイベントは破棄されてしまい、データ損失につながります。

その要因は様々です。

例えば、
・アプリケーションの改修や機器のバージョンアップによって突然フォーマットが変わってしまう
・テストフェーズでは想定していなかったイベントIDのログが急に出始める
などなど

データがうまくElasticsearchに格納出来ていないことを監視することは非常に難しいことです。
取り込みが出来ていないことに気がつかないことが多く、監視方法を考えることにも頭を使います。
単純にログが出ていないのか、それとも正しく取り込めていないのかを判断すること難しい
というのがその主な要因です。

Logstashにはoutput elasticsearchで失敗したイベントを蓄積記録するDead Letter Queue(以降、DLQ)
というキューイングの仕組みがあります。

/etc/logstash/logstash.ymlDead-Letter Queue Settings句に設定が書かれています。
※以下、今回検証した時の設定になります。

logstash.ymlの抜粋
# ------------ Dead-Letter Queue Settings --------------
# Flag to turn on dead-letter queue.
#
dead_letter_queue.enable: true

# If using dead_letter_queue.enable: true, the maximum size of each dead letter queue. Entries
# will be dropped if they would increase the size of the dead letter queue beyond this setting.
# Default is 1024mb
# dead_letter_queue.max_bytes: 1024mb

# If using dead_letter_queue.enable: true, the directory path where the data files will be stored.
# Default is path.data/dead_letter_queue
#
# path.dead_letter_queue:
#

※設定項目は以下の通りです。

設定項目 デフォルト値 説明
dead_letter_queue.enable false (任意) DLQの利用有無を指定します。
dead_letter_queue.max_bytes 1024mb (任意) DLQの最大サイズを指定します。
path.dead_letter_queue path.data/dead_letter_queue (任意) DLQの保存先パスを指定します。

Dead_letter_queue input plugin

LogstashでDLQ行きになったイベントを読み取るためのプラグインになります。

※Dead_letter_queue input固有の設定項目は以下の通りです。

設定項目 デフォルト値 説明
commit_offsets true (任意) sincedbを利用したオフセット値の有効化有無を指定します。
path - (必須) path.dead_letter_queueで設定したDLQのパスを指定します。※1
pipeline_id "main" (任意) 処理に失敗してDLQ行きになったpipeline idを指定します。
sincedb_path - (任意) 読みだしたイベント位置を記憶するsincedbの保存先パスを指定します。※2
start_timestamp - (任意) タイムスタンプ(ISO8601形式)で指定した位置から読み取ります。

※1 デフォルト値は、/var/lib/logstash/dead_letter_queue
※2 デフォルト値は、/var/lib/logstash/plugins/inputs/dead_letter_queue

【参考】
Dead_letter_queue input plugin

実施内容

  1. Index Templeteを設定する
  2. VPCFlowLogsの取り込みを開始する
  3. DLQを見てみる
  4. DLQ取り込み用のlogstash.confを設定する
  5. Kibanaで見てみる

1. Index Templeteを設定する

サンプルとして利用したログは、AWS VPCFlowLogsになります。
※VPCFlowLogsに関する情報はこちらにまとめてます。

わざとMapping ErrorとなるようにVPCFlowLogsのIndex Templeteをいじります。
actionのデータ型は本来はkeywordのところ、あえてfloatに変更しています。

_template/vpcflowlogsの抜粋
PUT _template/vpcflowlogs
{
  "index_patterns": ["vpcflowlogs-*"],
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas" : 1
  },
  "mappings": {
    "doc" : {
      "properties": {
        "@timestamp": {
          "type": "date"
        },
        "@version" : {
          "type" : "keyword"
        },
        "account-id" : {
          "type" : "keyword"
        },
        "action" : {
          "type" : "float"  ← 本来は`keyword`が正しい設定
          },

# 以下、省略

2. VPCFlowLogsの取り込みを開始する

logstashを起動し、VPCFlowLogsの取り込みを実施します。

[root@ip-172-31-8-221 ec2-user]# systemctl start logstash

3. DLQを見てみる

Mapping Errorとなり、DLQにイベントが書き込まれているはずです。

[root@ip-172-31-8-221 conf.d]# cd /var/lib/logstash/dead_letter_queue/
[root@ip-172-31-8-221 dead_letter_queue]# ll
total 0
drwxr-xr-x 2 logstash logstash 33 May 26 22:28 vpcflowlogs
[root@ip-172-31-8-221 dead_letter_queue]# ll vpcflowlogs/
total 848
-rw-r--r-- 1 logstash logstash 868323 May 26 22:57 1.log

catコマンドなどで直接DLQを開いてみると、、

[root@ip-172-31-8-221 vpcflowlogs]# cat 1.log
1c
Ud,2019-05-26T13:32:50.661Zqjava.util.HashMapdDATA2019-05-26T13:29:57.000Zjaccount-idorg.logstash.ConvertedListsrc_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:29:57.000Zhend_time2019-05-26T13:30:04.000Zfaction2019-05-26T13:32:50.689Zqjava.util.HashMapdDATA2019-05-26T13:26:53.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:26:53.000Zhend_time2019-05-26T13:27:05.000Zfaction2019-05-26T13:32:50.691Zȟqjava.util.HashMapdDATA2019-05-26T13:26:53.000Zjaccount-idorg.logstash.ConvertedListsrc_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:26:53.000Zhend_time2019-05-26T13:27:05.000Zfaction2019-05-26T13:32:50.696Zqjava.util.HashMapdDATA2019-05-26T13:26:53.000Zjaccount-idorg.logstash.ConvertedListsrc_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:26:53.000Zhend_time2019-05-26T13:27:05.000Zfaction2019-05-26T13:32:50.696Zqjava.util.HashMapdDATA2019-05-26T13:26:53.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:26:53.000Zhend_time2019-05-26T13:27:05.000Zfaction2019-05-26T13:32:50.697Zqjava.util.HashMapdDATA2019-05-26T13:26:53.000Zjaccount-idorg.logstash.ConvertedListsrc_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:26:53.000Zhend_time2019-05-26T13:27:05.000Zfaction2019-05-26T13:32:50.698Zqjava.util.HashMapdDATA2019-05-26T13:26:53.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:26:53.000Zhend_time2019-05-26T13:27:05.000ZfactionO
jlocal_time2019-05-26T13:32:50.699Zȟqjava.util.HashMapdDATA2019-05-26T13:26:53.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:26:53.000Zhend_time2019-05-26T13:27:05.000Zfaction2019-05-26T13:32:50.704Zqjava.util.HashMapdDATA2019-05-26T13:27:06.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:27:06.000Zhend_time2019-05-26T13:27:12.000Zfactionjlocal_time2019-05-26T13:32:50.705Zqjava.util.HashMapdDATA2019-05-26T13:27:45.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:27:45.000Zhend_time2019-05-26T13:27:45.000Zfaction2019-05-26T13:32:50.706Zqjava.util.HashMapdDATA2019-05-26T13:27:45.000Zjaccount-idorg.logstash.ConvertedListsrc_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:27:45.000Zhend_time2019-05-26T13:27:45.000Zfaction2019-05-26T13:32:50.706Zqjava.util.HashMapdDATA2019-05-26T13:27:54.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:27:54.000Zhend_time2019-05-26T13:27:57.000Zfaction2019-05-26T13:32:50.707Zqjava.util.HashMapdDATA2019-05-26T13:27:54.000Zjaccount-idorg.logstash.ConvertedListsrc_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:27:54.000Zhend_time2019-05-26T13:27:57.000Zfaction2019-05-26T13:32:50.710Zqjava.util.HashMapdDATA2019-05-26T13:27:54.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:27:54.000Zhend_time2019-05-26T13:27:57.000Zfaction2019-05-26T13:32:50.711Zqjava.util.HashMapdDATA2019-05-26T13:27:54.000Zjaccount-idorg.logstash.ConvertedListsrc_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:27:54.000Zhend_time2019-05-26T13:27:57.000Zfaction2019-05-26T13:32:50.715Zqjava.util.HashMapdDATA2019-05-26T13:28:05.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:28:05.000Zhend_time2019-05-26T13:28:12.000Zfactionjlocal_time2019-05-26T13:32:50.716Zqjava.util.HashMapdDATA2019-05-26T13:28:05.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:28:05.000Zhend_time2019-05-26T13:28:12.000Zfaction2019-05-26T13:32:50.717Zʟqjava.util.HashMapdDATA2019-05-26T13:28:05.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:28:05.000Zhend_time2019-05-26T13:28:12.000ZfactionSf5jlocal_time2019-05-26T13:32:50.726Zqjava.util.HashMapdDATA2019-05-26T13:29:57.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:29:57.000Zhend_time2019-05-26T13:30:04.000Zfaction2019-05-26T13:32:50.727ZƟqjava.util.HashMapdDATAj@timestamp2019-05-26T13:29:57.000Zjaccount-idorg.logstash.ConvertedListsrc_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:29:57.000Zhend_time2019-05-26T13:30:04.000Zfaction2019-05-26T13:32:50.727Zʟqjava.util.HashMapdDATA2019-05-26T13:28:05.000Zjaccount-idorg.logstash.ConvertedListsrc_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:28:05.000Zhend_time2019-05-26T13:28:12.000Zfaction2019-05-26T13:32:50.728Zqjava.util.HashMapdDATA2019-05-26T13:28:16.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:28:16.000Zhend_time2019-05-26T13:28:20.000Zfaction2019-05-26T13:32:50.729Zqjava.util.HashMapdDATA2019-05-26T13:28:16.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:28:16.000Zhend_time2019-05-26T13:28:20.000Zfaction2019-05-26T13:32:50.729Zqjava.util.HashMapdDATA2019-05-26T13:28:16.000Zjaccount-idorg.logstash.ConvertedListsrc_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:28:16.000Zhend_time2019-05-26T13:28:20.000Zfaction2019-05-26T13:32:50.730Zqjava.util.HashMapdDATA33333hlatitude33333ncontinent_code2019-05-26T13:28:27.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versionaelasticsearchCould not index event to Elasticsearch. status: 400, action: ["index", {:_id=>"2169627472", :_index=>"vpcflowlogs-2019-05-26", :_type=>"doc", :routing=>nil}, #<LogStash::Event:0x508cee93>], response: {"index"=>{"_index"=>"vpcflowlogs-2019-05-26", "_type"=>"doc", "_id"=>"2169627472", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [action] of type [float]", "caused_by"=>{"type"=>"number_format_exception", "reason"=>"For input string: \"REJECT\""}}}}c
YK
  t2019-05-26T13:32:50.731Zqjava.util.HashMapdDATA2019-05-26T13:28:27.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:28:27.000Zhend_time2019-05-26T13:28:46.000Zfaction2019-05-26T13:32:50.739Zǟqjava.util.HashMapdDATA2019-05-26T13:29:57.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:29:57.000Zhend_time2019-05-26T13:30:04.000Zfaction2019-05-26T13:32:50.740Z02019-05-26T13:29:57.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:29:57.000Zhend_time2019-05-26T13:30:04.000Zfaction2019-05-26T13:32:50.741Zqjava.util.HashMapdDATA2019-05-26T13:29:57.000Zjaccount-idorg.logstash.ConvertedListsrc_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:29:57.000Zhend_time2019-05-26T13:30:04.000Zfaction2019-05-26T13:32:50.741Z\clat@2\j~#hlatitude@2\j~#ncontinent_codegsrcaddr2019-05-26T13:29:57.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:29:57.000Zhend_time2019-05-26T13:30:04.000Zfaction2019-05-26T13:32:50.742Zqjava.util.HashMapdDATA2019-05-26T13:29:57.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:29:57.000Zhend_time2019-05-26T13:30:04.000Zfaction2019-05-26T13:32:50.743Z

これをそのまま利用するのは難しそうですね。。。

4. DLQ取り込み用のlogstash.confを設定する

VPCFlowLogsの取り込みとは別のPipelineとしてDLQ取り込み用のLogstashの設定を作ります。

dead_letter_queue_logstash.conf
input {
  dead_letter_queue {
    commit_offsets => false
    path => "/var/lib/logstash/dead_letter_queue"
    pipeline_id => "vpcflowlogs"
  }
}

filter {
  ruby {
    code => "event.set('timestamp', event.get('[@metadata][dead_letter_queue][entry_time]').toString())"
  }
  date {
    match => [ "timestamp", "ISO8601" ]
  }
  ruby {
    code => "event.set('[@metadata][local_time]',event.get('[@timestamp]').time.localtime.strftime('%Y-%m-%d'))"
  }
  mutate {
    add_field => {
     "reason" => "%{[@metadata][dead_letter_queue][reason]}"
     "plugin_id" => "%{[@metadata][dead_letter_queue][plugin_id]}"
     "plugin_type" => "%{[@metadata][dead_letter_queue][plugin_type]}"
    }
  }
}

output { 
  elasticsearch {
    hosts => ["<ElasticsearchのURL>"]
    index => "dead_letter_queue-%{[@metadata][local_time]}"
  }
}

【設定内容】
・テストで何度も読み込む想定でcommit_offsetsfalseにしています。本番運用時はデフォのtrueで良いと思います。
・DLQで取り込むデータはVPCFlowLogsのみなので、pipeline_idvpcflowlogsに指定します。
[@metadata][dead_letter_queue][entry_time]@timestampとするためにruby filterとdate filterを駆使しています。
mutate add_field[@metadata][dead_letter_queue]に記録されるデータを指定しています。
 Indexの失敗理由が@metadate配下のreasonに書かれるんですが、@metadataなので、そのままでは
 Elasticsearchに格納出来ないため、Field化しています。ここ超大事です!!
・output先はelasticsearchのdead_letter_queueという名前のIndexとしています。

pipeline.yml
# This file is where you define your pipelines. You can define multiple.
# For more information on multiple pipelines, see the documentation:
#   https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html

#- pipeline.id: main
#  path.config: "/etc/logstash/conf.d/*.conf"

- pipeline.id: vpcflowlogs
  path.config: "/etc/logstash/conf.d/vpcflowlogs_logstash.conf"

- pipeline.id: dead_letter_queue
  path.config: "/etc/logstash/conf.d/dead_letter_queue_logstash.conf"

念のため、dead_letter_queue_logstash.confの構文チェックを実施します。
(Configuration OKとなれば大丈夫です。)

[root@ip-172-31-8-221 ec2-user]# /usr/share/logstash/bin/logstash -tf /etc/logstash/conf.d/dead_letter_queue_logstash.conf 
WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults
Could not find log4j2 configuration at path /usr/share/logstash/config/log4j2.properties. Using default config which logs errors to the console
[WARN ] 2019-05-27 02:07:10.808 [LogStash::Runner] multilocal - Ignoring the 'pipelines.yml' file because modules or command line options are specified
Configuration OK
[INFO ] 2019-05-27 02:07:13.102 [LogStash::Runner] runner - Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash

Logstashを再起動します。

[root@ip-172-31-8-221 ec2-user]# systemctl restart logstash

5. Kibanaで見てみる

Elasticsearchに取り込んだDLQのイベントをKibanaで見ています。

DevToolsGET _cat/indices/dead*を実行して、Elasticsearchに取り込めているか確認します。
image.png

次にManagementCreate index patternからDLQ用のIndex Patternを作成します。
image.png

Time Filter field name@timestampを指定して作成します。
image.png

DevToolsGET dead_letter_queue-2019-05-26のIndex Mappingを参照すると
actionはテキスト型とKeyword型で定義されていました。
デフォルトのLogstash用のIndex Templeteで取り込まれています。
image.png

Discoverdead_letter_queue-*のIndex Patternのイベントを参照してみました。
Indexに失敗したmessagereasonが確認することが出来ました。
image.png

まとめ

いかがでしょうか。

これでlogstash.confやIndex Templeteで修正が必要なポイントがKibanaを見ることで可視化されました。
Amazon Elasticsearch ServiceやElasticCloudのAlerting機能を使ってDLQのCount数を監視すれば
イベントの取り込み失敗を監視できますね!!

【参考】
Amazon Elasticsearch ServiceのAlertingを使ってAWS ConsoleLoginを監視してみた

11
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
3