本エントリの内容
Goのelasticsearchクライアントパッケージ elastic を使ってElasticsearch(以下ES)にBulk APIリクエストを送ります。
elasticパッケージのNewBulkUpdateRequestメソッドを使用しますが、使い方がわかりづらく時間がかかってしまったのでまとめておきます。
elastic: https://github.com/olivere/elastic
NewBulkUpdateRequestのGoDoc: https://godoc.org/gopkg.in/olivere/elastic.v5#BulkUpdateRequest
検証環境
elasticパッケージ: v3
ES: v2.4.0
Go: v1.6.2
やること
ESのこういう形式のドキュメント群に
{
"_index": "shelf",
"_type": "book",
"_id": "12345",
"_source": {
"title": "hogehoge",
}
}
- string型フィールドを追加します
- array型フィールドを追加します
string型フィールドを追加する
基本的なコードは以下のとおりです。
b := client.Bulk()
for _, book := range books {
bookID := strconv.Itoa(int(key))
req := elastic.NewBulkUpdateRequest().
Index("shelf").
Type("book").
Id(book.ID).
Doc(map[string]string{
"author_name": book.AuthorName,
}).
DocAsUpsert(true)
s.Add(req)
}
bulkResponse, err := b.Do()
1行目*elastic.Client.Bulk()で *BulkService を作ります。
2行目からのループでアップデートするデータを作成していきます。
IDでESのドキュメントidを指定し、Docで更新するデータを指定します。
上記コードではDocにmap[string]string型を指定しているのでESのドキュメントは次のように更新されます。つまりmapのキー文字列がそのままフィールド名となります。
{
"_index": "shelf",
"_type": "book",
"_id": "12345",
"_source": {
"title": "hogehoge",
"author_name": "fugafuga"
}
}
ループ処理でデータを準備し終わったら最後に *BulkService.Do() でアップデートリクエストを送信します。
この戻り値のerrがnilの場合でも正常に更新できているとは限りません。bulkResponse.Errorsがtrueの場合は更新に失敗しています。
errはelasticパッケージのエラーでbulkResponse.ErrorsはESの更新エラーという区別のようです。
array型フィールドを追加する
次にarray型のフィールドbook_tagsを追加してみます。
と言ってもDocの中身を変えるだけです。
Doc(map[string][]bookTag{
"book_tags": bookTags,
}).
bookTagは次の構造のstructとなっています。json表記を指定しておくとESのフィールド名をよしなにしてくれます。
bookTag struct {
TagID string `json:"tag_id"`
TagName string `json:"tag_name"`
}
これでESのドキュメントは以下のようになります。
{
"_index": "shelf",
"_type": "book",
"_id": "12345",
"_source": {
"title": "hogehoge",
"author_name": "fugafuga",
"book_tags": [{
"tag_id": "1",
"tag_name": "JavaScript"
}, {
"tag_id": "2",
"tag_name": "Ruby"
}],
}
}
メモ
- 追加するフィールド名を"tags"にするとESの更新エラーとなります。
- ESのscriptを使うともっと細かいことができるようですが(参考)、ちょっとうまく動かなかったです。自分で需要ができたらもう少し調べます。