logstashにはElasticsearchのinput/filter pluginがあります。
挙動の違いを確認したので、備忘的に書きます。
input plugin
input {
# Read all documents from Elasticsearch matching the given query
elasticsearch {
hosts => "localhost"
query => '{ "query": { "match": { "statuscode": 200 } }, "sort": [ "_doc" ] }'
}
}
サンプルとしてこういうのが公式には書かれています。
これは、localhostのelasticsearchからqueryによって、200番台のdocsを取得してソースにするということです。
普通にやると1回だけですが、Scheduling
を設定すると定期的にポーリングしてくれます。
ただし、どこまで読んだかという情報は記録しません。
ファイルの読み込みであればsincedbという名称で永続化しますが、ES inputの場合はしないようです。
そのため、クエリで絞り込みをしないと重複した内容をなんども読み込むことになります。
絞り込みをしてバッチ的に利用する、あるいはデータを一度だけ読み込むような場合に使われるかもしれません。
filter plugin
公式にはこのようなサンプルが書かれています。
if [type] == "end" {
elasticsearch {
hosts => ["es-server"] # 検索対象のESサーバ
query_template => "template.json" # 検索クエリ
fields => { "@timestamp" => "started" } # 検索結果をfieldに付加する
}
date {
match => ["[started]", "ISO8601"]
target => "[started]"
}
ruby {
code => "event.set('duration_hrs', (event.get('@timestamp') - event.get('started')) / 3600)"
}
}
{
"size": 1,
"sort" : [ { "@timestamp" : "desc" } ],
"query": {
"query_string": {
"query": "type:start AND operation:%{[opid]}"
}
},
"_source": ["@timestamp"]
}
logstash.confには色々書いていますが、elasticsearchのpluginだけ見ましょう。
検索対象のESサーバ、クエリの書かれているjsonファイル、検索結果から付加するFieldsを設定しています。
template.jsonを見ると"type:start AND operation:%{[opid]}"
で絞っています。これは、logstashのinputから来たopidかつ、type:start(起動時など)の条件を設定しています。
デフォルトのresult_sizeは1です。Filterは、Inputからの条件を元にESのIndexから属性をもって来て付加する、という役割なように思います。
おそらく、サンプルコードの結果はこんな感じになるでしょう。
(他の利用例でのfilter結果から手動で作りました。正確でない可能性があります。)
{
"opid" => "a1"
"started" => "2019-01-01T15:05:00.562Z"
}
ESから帰ってくるresult_sizeを大きくした場合かつ、結果が一意に定まらない場合はこうなると思われます。
{
"opid" => "a1"
"started" => [
[0] "2019-01-01T15:05:00.562Z",
[1] "2018-12-31T15:05:00.562Z",
[2] "2018-12-30T15:05:00.562Z"
]
}
複数つくので、やや加工が大変なのと、result_sizeを無制限にするオプションは見当たらなかったので、基本的には一意に定められるような条件で設定することが必要な気がします。