3
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

elasticsearchのlogstashで同一IDのデータを更新/マージする方法(updateとdoc_as_upsert)

Last updated at Posted at 2019-11-21

やりたいこと

logstashを使ってCSV形式でデータをelasticsearchに取り込んでいる。
以下のような取り込みパターンが必要になったので設定した。

  • 一度取り込んだデータに対して、同一IDで更新データを取り込む。データのカラム構成は最初の取り込みも更新時も同じ。
  • 一度取り込んだデータに対して、同一IDで追加データを取り込む。追加データは、最初のデータとは別のカラムで、そのIDのデータに対してカラム追加になる

上記を見ると、取り込んだときの挙動は以下がある

action
Value type is string
Default value is "index"
Protocol agnostic (i.e. non-http, non-java specific) configs go here Protocol agnostic methods The Elasticsearch action to perform. Valid actions are:

  • index: indexes a document (an event from Logstash).
  • delete: deletes a document by id (An id is required for this action)
  • create: indexes a document, fails if a document by that id already exists in the index.
  • update: updates a document by id. Update has a special case where you can upsert — update a document if not already present. See the upsert option. NOTE: This does not work and is not supported in Elasticsearch 1.x. Please upgrade to ES 2.x or greater to use this feature with Logstash!
  • A sprintf style string to change the action based on the content of the event. The value %{[foo]} would use the foo field for the action

基本形

CSVを取り込む基本形

  • pathでファイル指定。*で日にち毎のファイルとかでも対応
  • start_positionは、logstashの起動タイミングでのすでにあるファイルに対して、最初から読み込むかどうか
  • tagsは、ifとか使うときの条件に使う。
  • fileterのCSVでseparator指定して、カラム名を書くだけなのでとても簡単
sample.conf
input {
  file {
      path => "/hogepath/hogedata*.csv"
      start_position => "beginning"
      tags => ["hogetype"]
  }
}

filter {
    csv {
        columns => ["hogeid","hogecolumn11","hogecolumn2"]
        separator => ","
    }
}

output {
    elasticsearch {
        hosts => ["hogeserver:9200"]
        index => "hogeindex"
    }
}

すでにあるカラムのデータ更新(更新パターン)

同一IDになるように取り込めばよい。

Default value is "index"

  • index: indexes a document (an event from Logstash).

参考
https://discuss.elastic.co/t/topic/48313
https://www.google.com/search?q=logstash+%E5%90%8C%E4%B8%80ID&oq=logstash+%E5%90%8C%E4%B8%80ID&aqs=chrome..69i57.9999j0j7&sourceid=chrome&ie=UTF-8

一意になるIDがデータに含まれているなら、
document_idにそのデータ内のIDを入れる下記だけでよい

output {
    elasticsearch {
        hosts => ["testserver:9200"]
        index => "testindex"
        document_id => "%{hogeid}"
    }
}

別のカラムのデータを追加する(追加パターン)

outputに下記の追加設定が必要
doc_as_upsert => true
action => "update"
参考
https://discuss.elastic.co/t/elasticsearch/175429/2

output {
    elasticsearch {
        hosts => ["testserver:9200"]
        index => "testindex"
        document_id => "%{hogeid}"
        doc_as_upsert => true
        action => "update"
    }
}

#苦労
自分の環境では複数のlogstashに分割して、
上記の別カラムの追加パターンをやろうとしたところ、
doc_as_upsert => true action => "update"を入れてなかったので、
追加カラムのデータが反映されないと悩んだ。

doc_as_upsert => true action => "update"に気づいた後も
片方のlogstashに設定を入れ忘れたりして、追加カラムのデータが消えたりした。

単純にデータが取り込まれていないように見えていたので、
logstashの取り込みで同じID同士が上書きして消しているのに気づくのに時間がかかった。

3
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?