Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
Community
OrganizationAdvent CalendarQiitadon (β)
Service
Qiita JobsQiita ZineQiita Blog
0
Help us understand the problem. What is going on with this article?
@nijinagome

PowerShellでElasticsearchのBulkAPIを利用する

Elasticsearchに大量のデータを投入したい場合、1件ずつ送るよりBulkAPIを利用してまとめて投げ込んだほうが早い。ちょっと早い程度ではなくすごく早いので、使えるならBulkAPIを使いたい。
でもデータをBulkAPIで使える形に整形しなければならないので、ちょっと面倒。
PowerShellで書く場合どんな様子になるのかという話。

BulkAPI(Elasticsearch公式)

環境

Elasticsearchバージョン:7.8.1
リモートサーバ:WindowsServer2012R2
クライアント:Windows10

サンプル

Elasticsearch公式にサンプルデータセットがある。これをBulkAPIで投入する。
「顧客の銀行口座情報に関する架空のJSONドキュメント例」、1000件。
最初からBulkAPIが指定する形になっているので、このまま投げ込めばよしなに取り扱ってくれる。
公式ドキュメント データの調査
サンプルデータセット(直リンク)

sample_bulk.json
{"index":{"_id":"1"}}
{"account_number":1,"balance":39225,"firstname":"Amber","lastname":"Duke","age":32,"gender":"M","address":"880 Holmes Lane","employer":"Pyrami","email":"amberduke@pyrami.com","city":"Brogan","state":"IL"}
{"index":{"_id":"6"}}
{"account_number":6,"balance":5686,"firstname":"Hattie","lastname":"Bond","age":36,"gender":"M","address":"671 Bristol Street","employer":"Netagy","email":"hattiebond@netagy.com","city":"Dante","state":"TN"}
{"index":{"_id":"13"}}
{"account_number":13,"balance":32838,"firstname":"Nanette","lastname":"Bates","age":28,"gender":"F","address":"789 Madison Street","employer":"Quility","email":"nanettebates@quility.com","city":"Nogal","state":"VA"}

PowerShellでBulkAPIを利用する

利用するといっても適切なアドレスに送るだけである。

bulkapi.ps1
# BulkAPI利用
$Param = @{
  Uri = "http://remoteserver:9200/sample_bulk.2020-08/_bulk"
  InFile = "sample_bulk.json"
  Method = "Post"
  ContentType = "application/x-ndjson"
}
Invoke-RestMethod @Param

# リフレッシュ
Invoke-RestMethod -Uri "http://remoteserver:9200/sample_bulk.2020-08/_refresh" -Method Post
took errors items                                          
---- ------ -----                                          
  66  False {@{index=}, @{index=}, @{index=}, @{index=}...}

remoteserverのElasticsearchにあるインデックスsample_bulk.2020-08へデータ投入する。
InFileパラメータでJSONファイル(サンプルそのまま)を指定。
ContentTypeはapplication/x-ndjsonを指定する。(普通のJSON送信ではapplication/json)

実行するとすぐに完了する。構造浅くて要素少なめで項目1000件程度のファイルでは1秒かからない。「本当に入ったのか?」てビビるがちゃんと入っているので安心して欲しい。

戻り表示のerrorsFalseなのにハテナが浮かぶかもしれないが、これは見たまま「エラーはありません」の意味。
itemsプロパティの中には投入したデータがひとつひとつ入っている。
これら戻り値を使いたければInvoke-RestMethodの結果を変数に入れよう。

最後リフレッシュしているのは、投入後にすぐ cat API 呼んでも投入した結果が反映されていない場合があるため。普通に1件ずつ投入する場合とはElasticsearchが自分でリフレッシュするタイミングが異なるようだ。(調べてない)

通常の投入方法と比較する

サンプルのJSONをCSVに変換したものを用いて、通常の1件ずつ送る方法で書いてみる。
下のサンプルは一行目がヘッダー行になっている。

sample_csv.csv
"account_number","balance","firstname","lastname","age","gender","address","employer","email","city","state"
"1","39225","Amber","Duke","32","M","880 Holmes Lane","Pyrami","amberduke@pyrami.com","Brogan","IL"
"6","5686","Hattie","Bond","36","M","671 Bristol Street","Netagy","hattiebond@netagy.com","Dante","TN"
"13","32838","Nanette","Bates","28","F","789 Madison Street","Quility","nanettebates@quility.com","Nogal","VA"

indexapi.ps1
Import-Csv -LiteralPath "sample_csv.csv" -Encoding Default |
ForEach-Object{
  $Input_JSON = $_ | ConvertTo-Json
  $Param = @{
    Uri = "http://remoteserver:9200/sample_csv.2020-08/"
    Method = "Post"
    ContentType = "application/json"
    Body = [system.text.encoding]::UTF8.GetBytes($Input_JSON)
  }
  Invoke-RestMethod @Param > $NULL
}
Invoke-RestMethod -Uri "http://remoteserver:9200/sample_csv.2020-08/_refresh" -Method Post

CSVインポートして行ごとにJSONに変換し1件ずつInvoke-RestMethodで投入。
localhostならこれでも秒で終わるのだが、リモートサーバへ1件ずつHTTP通信するのでは「そりゃ遅いだろ」て思う。
実際に比較すると下のようになる。

Measure-Commandで比較
Measure-Command{.\bulkapi.ps1}

Days              : 0
Hours             : 0
Minutes           : 0
Seconds           : 0
Milliseconds      : 422
Ticks             : 4221031
TotalDays         : 4.8854525462963E-06
TotalHours        : 0.000117250861111111
TotalMinutes      : 0.00703505166666667
TotalSeconds      : 0.4221031
TotalMilliseconds : 422.1031


Measure-Command{.\indexapi.ps1}

Days              : 0
Hours             : 0
Minutes           : 1
Seconds           : 6
Milliseconds      : 734
Ticks             : 667340938
TotalDays         : 0.000772385344907407
TotalHours        : 0.0185372482777778
TotalMinutes      : 1.11223489666667
TotalSeconds      : 66.7340938
TotalMilliseconds : 66734.0938

0.42秒 vs 66.73秒。勝負になってない…。
データが10万件あったら40秒 vs 1時間50分である。
これじゃBulkAPI以外使う気になれないね、と言いたいところだが、BulkAPIは「データをAPIが指定するJSON形式に加工しないと」使えないので、そこを一山超えなければならない。

CSV読み込んで整形してBulkAPIへ送信

Import-Csvで読んだ行をJSON化してつぎはぎして整えて投げればいいかな、とこんなのを書いた。が、エラーになる。

convertto-bulk.ps1(失敗)
$BulkList = New-Object System.Collections.ArrayList
$Bulk_Index = '{"index":{}}'

# CSV読み込んで指定のJSON形式に整形
Import-Csv -LiteralPath "sample_csv.csv" -Encoding Default |
ForEach-Object{
  $BulkList.Add($Bulk_Index) > $null
  $Bulk_Data = $_ | ConvertTo-Json -Compress
  $BulkList.Add($Bulk_Data) > $null
}
# 最後は改行で終わっていないとエラーになる
$BulkList.Add("`r`n")

# BulkAPIでデータ投入
$Param = @{
  Uri = "http://remoteserver:9200/sample_bulk.2020-08/_bulk"
  Body = $BulkList
  Method = "Post"
  ContentType = "application/x-ndjson"
}
Invoke-RestMethod @Param

# リフレッシュ
Invoke-RestMethod -Uri "http://remoteserver:9200/sample_bulk.2020-08/_refresh" -Method Post -ContentType "application/json"
Invoke-RestMethod : {"error":{"root_cause":[{"type":"action_request_validation_exception","reason":"Validation Failed: 1: no requests added;"}],"type":"action_request_validation_exception","reason":"Vali
dation Failed: 1: no requests added;"},"status":400}
発生場所 行:17 文字:1
+ Invoke-RestMethod @Param
+ ~~~~~~~~~~~~~~~~~~~~~~~~
    + CategoryInfo          : InvalidOperation: (System.Net.HttpWebRequest:HttpWebRequest) [Invoke-RestMethod]、WebException
    + FullyQualifiedErrorId : WebCmdletWebResponseException,Microsoft.PowerShell.Commands.InvokeRestMethodCommand

action_request_validation_exceptionValidation Failed: 1: no requests added。来たデータ解釈できないぞ…みたいな?

部分ごとにあれこれ確認して、つぎはぎしたJSONは間違っていないことは確認できた。とすると間違っているのはInvoke-RestMethodの送り方だ。Bodyパラメータで送るとダメなのだろうか?

公式ドキュメントを読んでみたところ、「BodyパラメータはType:Object、InFileパラメータはType:String」との記述を発見した。これだ。
Invoke-Restmethod(MS公式)

Body指定でStringで送るようなパラメータ・オプションは見つけられなかったので、やむを得ず中間ファイルを生成しInFileパラメータで送ることにした。

convertto-bulk.ps1(成功)
$BulkList = New-Object System.Collections.ArrayList
$Bulk_Index = '{"index":{}}'

# CSV読み込んで指定のJSON形式に整形
Import-Csv -LiteralPath "sample_csv.csv" -Encoding Default |
ForEach-Object{
  $BulkList.Add($Bulk_Index) > $null
  $Bulk_Data = $_ | ConvertTo-Json -Compress
  $BulkList.Add($Bulk_Data) > $null
}
Out-File -InputObject $BulkList -LiteralPath "sample_temp.json" -Encoding default

# BulkAPIでデータ投入
$Param = @{
  Uri = "http://remoteserver:9200/sample_bulk.2020-08/_bulk"
  InFile = "sample_temp.json"
  Method = "Post"
  ContentType = "application/x-ndjson"
}
Invoke-RestMethod @Param

Invoke-RestMethod -Uri "http://remoteserver:9200/sample_bulk.2020-08/_refresh" -Method Post -ContentType "application/json"

これにて投入成功。
中間ファイルは終わったら削除してもいいかも。
ファイルを生成し書き込む分少し時間がかかるが、それ以上にデータ投入にかかる時間が短縮される。

調べてないこと

汚いデータはどうしよう

複数の区分のデータがまとめて書き込まれてるようなデータ。
中間データ生成の際に区分ごとにファイルを分けてしまえばいいだろうか?
全部Elasticsearchに投げて任せるのは…都合が良すぎる考えか。

リフレッシュのタイミング

通常のデータ投入だとデータ増加がリアルタイム確認できるのだけど、BulkAPIでまとめ投げした場合はそうでもないようだ。リフレッシュは手動でしたほうがいいのか、システム側に任せた方がいいのか?

0
Help us understand the problem. What is going on with this article?
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
nijinagome
PowerShellであれこれやることを学んでいます。

Comments

No comments
Sign up for free and join this conversation.
Sign Up
If you already have a Qiita account Login
0
Help us understand the problem. What is going on with this article?