はじめに
LogstashのDead Letter Queueを監視出来るようKibanaで可視化してみたので
ナレッジ共有として記事の投稿します^^
利用環境
product | version |
---|---|
logstash | 6.5.4 |
Elasticsearch | 6.5.4 |
kibana | 6.5.4 |
Java | 1.8.0 |
OS(EC2) | Amazon Linux2 (t3.small) |
AMI ID | ami-06b382aba6c5a4f2c |
Region | us-east-1 |
Dead Letter Queueとは
ログ分析のためにLogstashでログを加工して、Elasticsearchに格納する時にデータ型が合わず
うまくIndexingが出来ないこと(Mapping Error)があります。
Mapping Error(404)が発生するとイベントは破棄されてしまい、データ損失につながります。
その要因は様々です。
例えば、
・アプリケーションの改修や機器のバージョンアップによって突然フォーマットが変わってしまう
・テストフェーズでは想定していなかったイベントIDのログが急に出始める
などなど
データがうまくElasticsearchに格納出来ていないことを監視することは非常に難しいことです。
取り込みが出来ていないことに気がつかないことが多く、監視方法を考えることにも頭を使います。
単純にログが出ていないのか、それとも正しく取り込めていないのかを判断すること難しい
というのがその主な要因です。
Logstashにはoutput elasticsearchで失敗したイベントを蓄積記録する**Dead Letter Queue(以降、DLQ)**
というキューイングの仕組みがあります。
/etc/logstash/logstash.yml
のDead-Letter Queue Settings
句に設定が書かれています。
※以下、今回検証した時の設定になります。
# ------------ Dead-Letter Queue Settings --------------
# Flag to turn on dead-letter queue.
#
dead_letter_queue.enable: true
# If using dead_letter_queue.enable: true, the maximum size of each dead letter queue. Entries
# will be dropped if they would increase the size of the dead letter queue beyond this setting.
# Default is 1024mb
# dead_letter_queue.max_bytes: 1024mb
# If using dead_letter_queue.enable: true, the directory path where the data files will be stored.
# Default is path.data/dead_letter_queue
#
# path.dead_letter_queue:
#
※設定項目は以下の通りです。
設定項目 | デフォルト値 | 説明 |
---|---|---|
dead_letter_queue.enable | false (任意) | DLQの利用有無を指定します。 |
dead_letter_queue.max_bytes | 1024mb (任意) | DLQの最大サイズを指定します。 |
path.dead_letter_queue | path.data/dead_letter_queue (任意) | DLQの保存先パスを指定します。 |
Dead_letter_queue input plugin
LogstashでDLQ行きになったイベントを読み取るためのプラグインになります。
※Dead_letter_queue input固有の設定項目は以下の通りです。
設定項目 | デフォルト値 | 説明 |
---|---|---|
commit_offsets | true (任意) | sincedbを利用したオフセット値の有効化有無を指定します。 |
path | - (必須) | path.dead_letter_queueで設定したDLQのパスを指定します。※1 |
pipeline_id | "main" (任意) | 処理に失敗してDLQ行きになったpipeline idを指定します。 |
sincedb_path | - (任意) | 読みだしたイベント位置を記憶するsincedbの保存先パスを指定します。※2 |
start_timestamp | - (任意) | タイムスタンプ(ISO8601形式)で指定した位置から読み取ります。 |
※1 デフォルト値は、/var/lib/logstash/dead_letter_queue
|
||
※2 デフォルト値は、/var/lib/logstash/plugins/inputs/dead_letter_queue
|
【参考】
・ Dead_letter_queue input plugin
実施内容
- Index Templeteを設定する
- VPCFlowLogsの取り込みを開始する
- DLQを見てみる
- DLQ取り込み用のlogstash.confを設定する
- Kibanaで見てみる
1. Index Templeteを設定する
サンプルとして利用したログは、AWS VPCFlowLogsになります。
※VPCFlowLogsに関する情報はこちらにまとめてます。
わざとMapping ErrorとなるようにVPCFlowLogsのIndex Templeteをいじります。
action
のデータ型は本来はkeyword
のところ、あえてfloat
に変更しています。
PUT _template/vpcflowlogs
{
"index_patterns": ["vpcflowlogs-*"],
"settings": {
"number_of_shards": 1,
"number_of_replicas" : 1
},
"mappings": {
"doc" : {
"properties": {
"@timestamp": {
"type": "date"
},
"@version" : {
"type" : "keyword"
},
"account-id" : {
"type" : "keyword"
},
"action" : {
"type" : "float" ← 本来は`keyword`が正しい設定
},
# 以下、省略
2. VPCFlowLogsの取り込みを開始する
logstashを起動し、VPCFlowLogsの取り込みを実施します。
[root@ip-172-31-8-221 ec2-user]# systemctl start logstash
3. DLQを見てみる
Mapping Errorとなり、DLQにイベントが書き込まれているはずです。
[root@ip-172-31-8-221 conf.d]# cd /var/lib/logstash/dead_letter_queue/
[root@ip-172-31-8-221 dead_letter_queue]# ll
total 0
drwxr-xr-x 2 logstash logstash 33 May 26 22:28 vpcflowlogs
[root@ip-172-31-8-221 dead_letter_queue]# ll vpcflowlogs/
total 848
-rw-r--r-- 1 logstash logstash 868323 May 26 22:57 1.log
catコマンドなどで直接DLQを開いてみると、、
[root@ip-172-31-8-221 vpcflowlogs]# cat 1.log
1c
Ud,2019-05-26T13:32:50.661Zqjava.util.HashMapdDATA2019-05-26T13:29:57.000Zjaccount-idorg.logstash.ConvertedListsrc_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:29:57.000Zhend_time2019-05-26T13:30:04.000Zfaction2019-05-26T13:32:50.689Zqjava.util.HashMapdDATA2019-05-26T13:26:53.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:26:53.000Zhend_time2019-05-26T13:27:05.000Zfaction2019-05-26T13:32:50.691Zȟqjava.util.HashMapdDATA2019-05-26T13:26:53.000Zjaccount-idorg.logstash.ConvertedListsrc_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:26:53.000Zhend_time2019-05-26T13:27:05.000Zfaction2019-05-26T13:32:50.696Zqjava.util.HashMapdDATA2019-05-26T13:26:53.000Zjaccount-idorg.logstash.ConvertedListsrc_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:26:53.000Zhend_time2019-05-26T13:27:05.000Zfaction2019-05-26T13:32:50.696Zqjava.util.HashMapdDATA2019-05-26T13:26:53.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:26:53.000Zhend_time2019-05-26T13:27:05.000Zfaction2019-05-26T13:32:50.697Zqjava.util.HashMapdDATA2019-05-26T13:26:53.000Zjaccount-idorg.logstash.ConvertedListsrc_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:26:53.000Zhend_time2019-05-26T13:27:05.000Zfaction2019-05-26T13:32:50.698Zqjava.util.HashMapdDATA2019-05-26T13:26:53.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:26:53.000Zhend_time2019-05-26T13:27:05.000ZfactionO
jlocal_time2019-05-26T13:32:50.699Zȟqjava.util.HashMapdDATA2019-05-26T13:26:53.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:26:53.000Zhend_time2019-05-26T13:27:05.000Zfaction2019-05-26T13:32:50.704Zqjava.util.HashMapdDATA2019-05-26T13:27:06.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:27:06.000Zhend_time2019-05-26T13:27:12.000Zfactionjlocal_time2019-05-26T13:32:50.705Zqjava.util.HashMapdDATA2019-05-26T13:27:45.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:27:45.000Zhend_time2019-05-26T13:27:45.000Zfaction2019-05-26T13:32:50.706Zqjava.util.HashMapdDATA2019-05-26T13:27:45.000Zjaccount-idorg.logstash.ConvertedListsrc_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:27:45.000Zhend_time2019-05-26T13:27:45.000Zfaction2019-05-26T13:32:50.706Zqjava.util.HashMapdDATA2019-05-26T13:27:54.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:27:54.000Zhend_time2019-05-26T13:27:57.000Zfaction2019-05-26T13:32:50.707Zqjava.util.HashMapdDATA2019-05-26T13:27:54.000Zjaccount-idorg.logstash.ConvertedListsrc_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:27:54.000Zhend_time2019-05-26T13:27:57.000Zfaction2019-05-26T13:32:50.710Zqjava.util.HashMapdDATA2019-05-26T13:27:54.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:27:54.000Zhend_time2019-05-26T13:27:57.000Zfaction2019-05-26T13:32:50.711Zqjava.util.HashMapdDATA2019-05-26T13:27:54.000Zjaccount-idorg.logstash.ConvertedListsrc_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:27:54.000Zhend_time2019-05-26T13:27:57.000Zfaction2019-05-26T13:32:50.715Zqjava.util.HashMapdDATA2019-05-26T13:28:05.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:28:05.000Zhend_time2019-05-26T13:28:12.000Zfactionjlocal_time2019-05-26T13:32:50.716Zqjava.util.HashMapdDATA2019-05-26T13:28:05.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:28:05.000Zhend_time2019-05-26T13:28:12.000Zfaction2019-05-26T13:32:50.717Zʟqjava.util.HashMapdDATA2019-05-26T13:28:05.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:28:05.000Zhend_time2019-05-26T13:28:12.000ZfactionSf5jlocal_time2019-05-26T13:32:50.726Zqjava.util.HashMapdDATA2019-05-26T13:29:57.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:29:57.000Zhend_time2019-05-26T13:30:04.000Zfaction2019-05-26T13:32:50.727ZƟqjava.util.HashMapdDATAj@timestamp2019-05-26T13:29:57.000Zjaccount-idorg.logstash.ConvertedListsrc_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:29:57.000Zhend_time2019-05-26T13:30:04.000Zfaction2019-05-26T13:32:50.727Zʟqjava.util.HashMapdDATA2019-05-26T13:28:05.000Zjaccount-idorg.logstash.ConvertedListsrc_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:28:05.000Zhend_time2019-05-26T13:28:12.000Zfaction2019-05-26T13:32:50.728Zqjava.util.HashMapdDATA2019-05-26T13:28:16.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:28:16.000Zhend_time2019-05-26T13:28:20.000Zfaction2019-05-26T13:32:50.729Zqjava.util.HashMapdDATA2019-05-26T13:28:16.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:28:16.000Zhend_time2019-05-26T13:28:20.000Zfaction2019-05-26T13:32:50.729Zqjava.util.HashMapdDATA2019-05-26T13:28:16.000Zjaccount-idorg.logstash.ConvertedListsrc_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:28:16.000Zhend_time2019-05-26T13:28:20.000Zfaction2019-05-26T13:32:50.730Zqjava.util.HashMapdDATA33333hlatitude33333ncontinent_code2019-05-26T13:28:27.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versionaelasticsearchCould not index event to Elasticsearch. status: 400, action: ["index", {:_id=>"2169627472", :_index=>"vpcflowlogs-2019-05-26", :_type=>"doc", :routing=>nil}, #<LogStash::Event:0x508cee93>], response: {"index"=>{"_index"=>"vpcflowlogs-2019-05-26", "_type"=>"doc", "_id"=>"2169627472", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [action] of type [float]", "caused_by"=>{"type"=>"number_format_exception", "reason"=>"For input string: \"REJECT\""}}}}c
YK
t2019-05-26T13:32:50.731Zqjava.util.HashMapdDATA2019-05-26T13:28:27.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:28:27.000Zhend_time2019-05-26T13:28:46.000Zfaction2019-05-26T13:32:50.739Zǟqjava.util.HashMapdDATA2019-05-26T13:29:57.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:29:57.000Zhend_time2019-05-26T13:30:04.000Zfaction2019-05-26T13:32:50.740Z02019-05-26T13:29:57.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:29:57.000Zhend_time2019-05-26T13:30:04.000Zfaction2019-05-26T13:32:50.741Zqjava.util.HashMapdDATA2019-05-26T13:29:57.000Zjaccount-idorg.logstash.ConvertedListsrc_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:29:57.000Zhend_time2019-05-26T13:30:04.000Zfaction2019-05-26T13:32:50.741Z\clat@2\j~#hlatitude@2\j~#ncontinent_codegsrcaddr2019-05-26T13:29:57.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:29:57.000Zhend_time2019-05-26T13:30:04.000Zfaction2019-05-26T13:32:50.742Zqjava.util.HashMapdDATA2019-05-26T13:29:57.000Zjaccount-idorg.logstash.ConvertedListdst_geoip_lookup_failureh@versiona1gdstaddr2019-05-26T13:29:57.000Zhend_time2019-05-26T13:30:04.000Zfaction2019-05-26T13:32:50.743Z
これをそのまま利用するのは難しそうですね。。。
4. DLQ取り込み用のlogstash.confを設定する
VPCFlowLogsの取り込みとは別のPipelineとしてDLQ取り込み用のLogstashの設定を作ります。
input {
dead_letter_queue {
commit_offsets => false
path => "/var/lib/logstash/dead_letter_queue"
pipeline_id => "vpcflowlogs"
}
}
filter {
ruby {
code => "event.set('timestamp', event.get('[@metadata][dead_letter_queue][entry_time]').toString())"
}
date {
match => [ "timestamp", "ISO8601" ]
}
ruby {
code => "event.set('[@metadata][local_time]',event.get('[@timestamp]').time.localtime.strftime('%Y-%m-%d'))"
}
mutate {
add_field => {
"reason" => "%{[@metadata][dead_letter_queue][reason]}"
"plugin_id" => "%{[@metadata][dead_letter_queue][plugin_id]}"
"plugin_type" => "%{[@metadata][dead_letter_queue][plugin_type]}"
}
}
}
output {
elasticsearch {
hosts => ["<ElasticsearchのURL>"]
index => "dead_letter_queue-%{[@metadata][local_time]}"
}
}
【設定内容】
・テストで何度も読み込む想定でcommit_offsets
はfalse
にしています。本番運用時はデフォのtrue
で良いと思います。
・DLQで取り込むデータはVPCFlowLogsのみなので、pipeline_id
をvpcflowlogs
に指定します。
・[@metadata][dead_letter_queue][entry_time]
を@timestamp
とするためにruby filterとdate filterを駆使しています。
・mutate add_field
で[@metadata][dead_letter_queue]
に記録されるデータを指定しています。
Indexの失敗理由が@metadate
配下のreason
に書かれるんですが、@metadata
なので、そのままでは
Elasticsearchに格納出来ないため、Field化しています。ここ超大事です!!
・output先はelasticsearchのdead_letter_queueという名前のIndexとしています。
# This file is where you define your pipelines. You can define multiple.
# For more information on multiple pipelines, see the documentation:
# https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html
# - pipeline.id: main
# path.config: "/etc/logstash/conf.d/*.conf"
- pipeline.id: vpcflowlogs
path.config: "/etc/logstash/conf.d/vpcflowlogs_logstash.conf"
- pipeline.id: dead_letter_queue
path.config: "/etc/logstash/conf.d/dead_letter_queue_logstash.conf"
念のため、dead_letter_queue_logstash.conf
の構文チェックを実施します。
(Configuration OK
となれば大丈夫です。)
[root@ip-172-31-8-221 ec2-user]# /usr/share/logstash/bin/logstash -tf /etc/logstash/conf.d/dead_letter_queue_logstash.conf
WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults
Could not find log4j2 configuration at path /usr/share/logstash/config/log4j2.properties. Using default config which logs errors to the console
[WARN ] 2019-05-27 02:07:10.808 [LogStash::Runner] multilocal - Ignoring the 'pipelines.yml' file because modules or command line options are specified
Configuration OK
[INFO ] 2019-05-27 02:07:13.102 [LogStash::Runner] runner - Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash
Logstashを再起動します。
[root@ip-172-31-8-221 ec2-user]# systemctl restart logstash
5. Kibanaで見てみる
Elasticsearchに取り込んだDLQのイベントをKibanaで見ています。
DevTools
でGET _cat/indices/dead*
を実行して、Elasticsearchに取り込めているか確認します。
次にManagement
でCreate index pattern
からDLQ用のIndex Patternを作成します。
Time Filter field name
に@timestamp
を指定して作成します。
DevTools
でGET dead_letter_queue-2019-05-26
のIndex Mappingを参照すると
action
はテキスト型とKeyword型で定義されていました。
デフォルトのLogstash用のIndex Templeteで取り込まれています。
Discover
でdead_letter_queue-*
のIndex Patternのイベントを参照してみました。
Indexに失敗したmessage
とreason
が確認することが出来ました。
まとめ
いかがでしょうか。
これでlogstash.confやIndex Templeteで修正が必要なポイントがKibanaを見ることで可視化されました。
Amazon Elasticsearch ServiceやElasticCloudのAlerting機能を使ってDLQのCount数を監視すれば
イベントの取り込み失敗を監視できますね!!
【参考】
・Amazon Elasticsearch ServiceのAlertingを使ってAWS ConsoleLoginを監視してみた