LoginSignup
0
0

More than 1 year has passed since last update.

ElasticSearchで、documentをupsertする

Last updated at Posted at 2021-10-18

TL; DR

ElasticSearchは、以下のような特徴があります。

  • indexとdocument idの組み合わせてデータを管理し、不要なデータはindex単位で削除していく
    • 公式で使用されている通り、時系列データを1日単位でindexを作る
    • 古い不要なindexを1日単位で削除できるようにする
    • index単位の処理は高速
  • document idもElasticSearchに任せ、こちらで指定しないほうが性能的に良い

なので、つまり

  • document idをユーザ側で指定することは非推奨
    • どこかに書いてあったはずですが失念しました。。。
  • その指定したdocument idのdocumentをupdateすることは非推奨
  • 当然、upsertも非推奨

ということになります。
でも、必要に迫らせてやらざるを得ない場合もあります。
これをどうやるか?というお話です。

確認環境

docker-composeで環境を準備します。
kibanaのDev toolsで確認することにします。

version: '3.9'

services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.15.1
    environment:
      - discovery.type=single-node
      - cluster.name=docker-cluster
      - node.master=true
      - node.data=true
      - xpack.security.enabled=false
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms1024m -Xmx1024m"
    volumes:
      - ./elasticsearch:/usr/share/elasticsearch/data
    ulimits:
      memlock:
        soft: -1
        hard: -1
    networks:
      - esnet

  kibana:
    image: docker.elastic.co/kibana/kibana:7.15.1
    ports:
      - 5601:5601
    depends_on:
      - elasticsearch
    environment:
      ELASTICSEARCH_HOSTS: http://elasticsearch:9200
    networks:
      - esnet

volumes:
  esdata:
    driver: local

networks:
  esnet:

Dev toolsは、ここから使えます。
devtools.jpg

upsertとは(やりたいこと)

  • documentが存在しない場合は、insertする
  • documentが存在する場合は、updateする
    • ただし全置換ではない
    • 更新対象のkey:valueだけをPOSTしたら、そのkeyに対応するvalueだけがupdateされる
    • 存在しないkeyの場合は追加される

insert(指定したidのdocumentが存在しない場合)

documentは、bulk APIでrequestするものとします。

公式のここを見ると、bulk APIでPOSTする際、request bodyに、 '_op_type': 'index' を指定するとよいようです。

実際のbulk APIでの例を示します。

POST _bulk
{"index":{"_index":"index1","_id":"1"}}
{"field":"value"}

responseは以下となりました。

{
  "took" : 2434,
  "errors" : false,
  "items" : [
    {
      "index" : {
        "_index" : "index1",
        "_type" : "_doc",
        "_id" : "1",
        "_version" : 1,
        "result" : "created",
        "_shards" : {
          "total" : 2,
          "successful" : 1,
          "failed" : 0
        },
        "_seq_no" : 0,
        "_primary_term" : 1,
        "status" : 201
      }
    }
  ]
}

こうすることで、indexが"index1"、idが"1"のdocumentがindexされます。
responseを確認してみましょう。
ちゃんとindexされていました。

# GET index1/_doc/1
{
  "_index" : "index1",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 1,
  "_seq_no" : 0,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "field" : "value"
  }
}

ところが、公式を参照すると、こうも書かれています。

If the document exists, replaces the document and increments the version. The following line must contain the source data to be indexed.

要するに、後勝ちで、documentの全fieldがreplaceされてしまいます。
updateといえばそうかもしれませんが、必要なところだけ更新したいと思った時には、このままではうまく使えません。

事前に対象documentをgetしておいて、必要なところだけ差し替えたdocumentを作り、その後にPOSTしなければいけません。
アプリ側に余計な処理の作りこみが発生してしまいます。。。。
DB側でできる処理は、できるだけDB側に寄せたいです。

update(指定したidのdocumentが存在する場合)

公式のここを見ると、bulk APIでPOSTする際、request bodyに、 '_op_type': 'update' を指定するとよいようです。

実際のbulk APIでの例を示します。

# POST _bulk
{ "update" : {"_id" : "1", "_index" : "index1"} }
{"doc" : {"field" : "value2"} }

responseは、以下となりました。

{
  "took" : 19,
  "errors" : false,
  "items" : [
    {
      "update" : {
        "_index" : "index1",
        "_type" : "_doc",
        "_id" : "1",
        "_version" : 2,
        "result" : "updated",
        "_shards" : {
          "total" : 2,
          "successful" : 1,
          "failed" : 0
        },
        "_seq_no" : 1,
        "_primary_term" : 1,
        "status" : 200
      }
    }
  ]
}

indexされたはずの内容を確認してみましょう。
responseは以下の通りで、key field の値がちゃんと value2 となっていました。

# GET index1/_doc/1
{
  "_index" : "index1",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 2,
  "_seq_no" : 1,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "field" : "value2"
  }
}

しかし、これでもやりたかったupsertはできません。
なぜなら、updateの場合は、対象documentがないとエラーになるようです。
つまり、updateするdocument idが存在しない場合、errorとなってしまいます。

やりたかったupsert(本命)

指定したidのdocumentが存在し、あと勝ちで部分的に更新したい場合です。
こちらが本命です。

結論からすると、こちらにやり方が書かれていました
updateのオプションに、doc_as_upsert : true と指定しろと書かれています。
書き方のサンプルが、公式のここに書かれていました

まず、上記の操作で作成されている既存のindexを削除してしまいます。

# DELETE index1
{
  "acknowledged" : true
}

upsertの挙動を確認するために、連続して以下3つのqueryをPOSTします。

# POST _bulk
{ "update" : {"_id" : "1", "_index" : "index1", "retry_on_conflict" : 3} }
{"doc" : {"field" : "value"}, "doc_as_upsert" : true }

# POST _bulk
{ "update" : {"_id" : "1", "_index" : "index1", "retry_on_conflict" : 3} }
{"doc" : {"field2" : "value2", "field5" : "value5"}, "doc_as_upsert" : true }

# POST _bulk
{ "update" : {"_id" : "1", "_index" : "index1", "retry_on_conflict" : 3} }
{"doc" : {"field2" : "value2-1"}, "doc_as_upsert" : true }

上記3つのPOSTの結果を確認してみます。

# GET index1/_doc/1
{
  "_index" : "index1",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 5,
  "_seq_no" : 4,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "field" : "value",     # <- 1. insert: 最初に追加
    "field2" : "value2-1", # <- 3. update: value2  value2-1 へ更新
    "field5" : "value5"    # <- 2. upsert: keyとvalueを追加
  }
}

意図通り、upsertが出来ています。
更新したいkey : valueだけをupdateすることも出来ました。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0