やりたいこと
logstashを使ってCSV形式でデータをelasticsearchに取り込んでいる。
以下のような取り込みパターンが必要になったので設定した。
- 一度取り込んだデータに対して、同一IDで更新データを取り込む。データのカラム構成は最初の取り込みも更新時も同じ。
- 一度取り込んだデータに対して、同一IDで追加データを取り込む。追加データは、最初のデータとは別のカラムで、そのIDのデータに対してカラム追加になる
上記を見ると、取り込んだときの挙動は以下がある
action
Value type is string
Default value is "index"
Protocol agnostic (i.e. non-http, non-java specific) configs go here Protocol agnostic methods The Elasticsearch action to perform. Valid actions are:
- index: indexes a document (an event from Logstash).
- delete: deletes a document by id (An id is required for this action)
- create: indexes a document, fails if a document by that id already exists in the index.
- update: updates a document by id. Update has a special case where you can upsert — update a document if not already present. See the upsert option. NOTE: This does not work and is not supported in Elasticsearch 1.x. Please upgrade to ES 2.x or greater to use this feature with Logstash!
- A sprintf style string to change the action based on the content of the event. The value %{[foo]} would use the foo field for the action
基本形
CSVを取り込む基本形
- pathでファイル指定。*で日にち毎のファイルとかでも対応
- start_positionは、logstashの起動タイミングでのすでにあるファイルに対して、最初から読み込むかどうか
- tagsは、ifとか使うときの条件に使う。
- fileterのCSVでseparator指定して、カラム名を書くだけなのでとても簡単
input {
file {
path => "/hogepath/hogedata*.csv"
start_position => "beginning"
tags => ["hogetype"]
}
}
filter {
csv {
columns => ["hogeid","hogecolumn11","hogecolumn2"]
separator => ","
}
}
output {
elasticsearch {
hosts => ["hogeserver:9200"]
index => "hogeindex"
}
}
すでにあるカラムのデータ更新(更新パターン)
同一IDになるように取り込めばよい。
Default value is "index"
略
- index: indexes a document (an event from Logstash).
参考
https://discuss.elastic.co/t/topic/48313
https://www.google.com/search?q=logstash+%E5%90%8C%E4%B8%80ID&oq=logstash+%E5%90%8C%E4%B8%80ID&aqs=chrome..69i57.9999j0j7&sourceid=chrome&ie=UTF-8
一意になるIDがデータに含まれているなら、
document_idにそのデータ内のIDを入れる下記だけでよい
output {
elasticsearch {
hosts => ["testserver:9200"]
index => "testindex"
document_id => "%{hogeid}"
}
}
別のカラムのデータを追加する(追加パターン)
outputに下記の追加設定が必要
doc_as_upsert => true
action => "update"
参考
https://discuss.elastic.co/t/elasticsearch/175429/2
output {
elasticsearch {
hosts => ["testserver:9200"]
index => "testindex"
document_id => "%{hogeid}"
doc_as_upsert => true
action => "update"
}
}
#苦労
自分の環境では複数のlogstashに分割して、
上記の別カラムの追加パターンをやろうとしたところ、
doc_as_upsert => true
action => "update"
を入れてなかったので、
追加カラムのデータが反映されないと悩んだ。
doc_as_upsert => true
action => "update"
に気づいた後も
片方のlogstashに設定を入れ忘れたりして、追加カラムのデータが消えたりした。
単純にデータが取り込まれていないように見えていたので、
logstashの取り込みで同じID同士が上書きして消しているのに気づくのに時間がかかった。