この記事はElasticsearch Advent Calendar 2015 18日目のエントリーです。
Elasticsearchの性能を最大限に引き出すために、適切なチューニングを行う必要があります。チューニングの正しいやり方については、「Elasticsearchのインデキシングに関するパフォーマンス検討」と「Indexing Performance Tips」とが参考になります。
ポイントは1台サーバで1か所の設定を変えて30分以上テストする。この繰り返しです。気の遠くなるような作業ですね。テストしなくても設定値でパフォーマンスを推測できればいいなと、漠然とですが考えていました。
#パラメータの選択
パフォーマンスに影響する要素は多数あるため、問題を単純化しないと、手のつけようがありません。勝手に下記の3つのパラメータを選びました。
- Bulkサイズ:Bulk APIを使って複数行のデータ(複数のドキュメント)を一括でインポートすることができます。このデータの固まりのサイズはBulkサイズと呼びます。「Indexing Performance Tips」で書いているように、行数(ドキュメント数)はいい単位ではありません。1行(1ドキュメント)のサイズがかわると、同じ行数(ドキュメント数)でもBulkサイズ全然違うから。
- シャード数:1つのElasticsearchのIndexを複数のLucene Indexに分けて保存できます。それぞれのLucene IndexはElasticsearchのShard(シャード)と呼びます。
- クライアントの並行度:同時にBulk APIを呼ぶ出すHTTPクライアントの数です。Javaで実装するとスレッドになりますが、Goで実装したから、Goroutine(軽量なスレッド)となり、だから並行度(Concurrency)というパラメータ名にしました。
#テストのやり方
2MBのBulkサイズ、1つのシャード、1つのGoroutineから始めて、徐々に設定値を増やしていく。最後は、28MBのBulkサイズ、10つのシャード、5つのGoroutineになる。合計700の組み合わせでテストしました。下記のように、シャード数を増やすと、複数のLuceneエンジンでインデックス化の作業を並列して実行できます。Bulkサイズを増やすと、1回でElasticsearchに送るデータ量が増えます。並行度を増やすと、Gopher[1][2]の数が増えます。
Goクライアントはサンプルデータの入ったファイルからデータを読み込んで、指定されたサイズのBufferを作成します。後は、下記のようにElasticsearchにPOSTします。指定された時間が過ぎたら、テストを終了します。ソースコードが長いので、Gistに貼りつけました。
req, err := http.NewRequest("POST", url, reader)
if err != nil {
return
}
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return
}
defer resp.Body.Close()
...
下記のスクリプトでパラメータを徐々に増やしてテストを実行しました。
#!/bin/bash
rm upload.log
echo "Delete index."
curl -XDELETE http://localhost:9200/testtest
for c in {1..5}
do
for i in {1..14}
do
for s in {1..10}
do
size="$((i*2))"
echo "Bulk size ${size}MB, shard is $s, concurrency is $c"
echo "Create index index."
echo "{\"settings\":{\"number_of_shards\":$s}}"
curl -XPUT http://localhost:9200/testtest -d "{\"settings\":{\"number_of_shards\":$s}}"
echo "Import data."
./essizing input.txt ${size}m 5m http://localhost:9200/testtest/log/_bulk $c
echo "Get status."
curl http://localhost:9200/testtest/_status?pretty > ${size}m_${s}shard_c${c}.json
echo "Delete index."
curl -XDELETE http://localhost:9200/testtest
echo "Sleep."
sleep 30s
done
done
done
#テスト結果の加工
次のようなテスト結果を取得しましたが、シャード数、並行度等情報が入っていないので、pythonのスクリプトでシャード数、並行度やElasticsearchの出力情報等を追加します。
time,post,posted,line,byte,err,bulksize,bulkline
1449828385,5,4,217888,8388688,0,2097172,54472
1449828395,4,4,217888,8388688,0,2097172,54472
1449828405,4,4,217888,8388688,0,2097172,54472
1449828415,4,4,217888,8388688,0,2097172,54472
1449828425,4,4,217888,8388688,0,2097172,54472
...
加工したデータは下記のようになります。Goクライアントは10秒間隔でログを出力しています。
- post:10秒間で送信を始めた数。
- posted:10秒間で送信を完了した数。
- line:10秒間で送った行数。
- byte:10秒間で送ったバイト数。
- err: エラーの数。0でなければテスト結果を信頼できません。
- bulksize:Bulkのバイト数。
- bulkline:Bulkの行数。
- shard:シャード数。
- concurrency:並行度。
- essize:1回のテストを実行し完了したら取得したテストIndexのサイズ(バイト数)
- esdoc:1回のテストを実行し完了したら取得したテストIndexのサイズ(行数、またはドキュメント数)
- round:テストID(何回目のテスト)。
time,post,posted,line,byte,err,bulksize,bulkline,shard,concurrency,essize,esdoc,round
1449828385,5,4,217888,8388688,0,2097172,54472,1,1,108649967,6531966,1
1449828395,4,4,217888,8388688,0,2097172,54472,1,1,108649967,6531966,1
1449828405,4,4,217888,8388688,0,2097172,54472,1,1,108649967,6531966,1
1449828415,4,4,217888,8388688,0,2097172,54472,1,1,108649967,6531966,1
1449828425,4,4,217888,8388688,0,2097172,54472,1,1,108649967,6531966,1
...
#テスト結果の時系列分析
次は、このデータをRで分析しモデルを作ります。やっと本番って感じですね。とりあえず時間軸で描画してみます。下記のように、全データが多いので傾向よく見えません。
最初10回のテストだけの図であれば、シャード数を増やすと転送スペードが上がったことがわかります。しかし、シャード7からちょっと落ちました。テストの間に30秒間の間隔を入れたので、テストをはっきりと区別できます。
input<-"20151211_new2_finished\\upload_new2_finished.log"
upload<-read.table(input, header=T, sep=",")
par(mfrow=c(3,1))
plot(upload[, 5]/1024/1024, type="l", ylab="MB/10s", xlab="10 seconds interval", main="concurrency=1~5, bulk size=2~28MB, shard=1~10")
plot(upload[upload$round<=10, 5]/1024/1024, type="l", ylab="MB/10s", xlab="10 seconds interval", main="concurrency=1, bulk size=2MB, shard=1~10")
text(15,5, "shard 1")
text(45,5, "shard 2")
text(80,5, "shard 3")
text(110,5, "shard 4")
text(140,5, "shard 5")
text(170,5, "shard 6")
text(200,5, "shard 7")
text(235,5, "shard 8")
text(265,5, "shard 9")
text(295,5, "shard 10")
#Rで平均速度を描画してみる
10秒間隔のインポード速度はばらつきがあるので、テストごとの平均速度を計算します。なお、他のテスト結果と比較するために、今回行数単位で計算します。下記のように、シャード数を増やすと、速度が上がるような傾向が見えます。並行度は色で区別してみると、同じ傾向が見えます。
input<-"20151211_new2_finished\\upload_new2_finished.log"
upload<-read.table(input, header=T, sep=",")
grp<-aggregate(time ~ bulksize+bulkline+shard+concurrency+essize+esdoc+round, data=upload, FUN=min)
time2<-aggregate(time ~ round, data=upload, FUN=max)
grp$time2<-time2$time
grp$durationSec<-grp$time2 - grp$time + 10 # post began 10 seconds before (after buffer created)
grp$bytePerSec<-grp$essize / grp$durationSec
grp$linePerSec<-grp$esdoc / grp$durationSec
plot(grp$shard, grp$linePerSec, xlab="shard", ylab="line/s",
col=ifelse(grp$concurrency==1, "firebrick",
ifelse(grp$concurrency==2, "gold",
ifelse(grp$concurrency==3, "chartreuse",
ifelse(grp$concurrency==4, "deepskyblue", "darkviolet")))))
legend("topleft", legend=c("c1","c2","c3","c4","c5"),
col=c("firebrick","gold","chartreuse","deepskyblue","darkviolet"),
pch=rep(16,5))
#単回帰分析
次は学習してみます。速度をシャード数の上に回帰します。回帰分析のためlm関数を使います。linePerSec~shardは、回帰式「linePerSec = a + b*shard」の意味です。
model1<-lm(linePerSec~shard, data = grp)
print(summary(model1))
abline(model1)
結果のモデルを描画してみたら、ぜんぜん実際のデータと合っていない気がします。
summary関数の出力を確認したら、やはり決定係数(R-squared)は28%しかありません。決定係数は、独立変数(shard)が従属変数(linePerSec)のどれくらいを説明できるかを表す値です。つまり、このモデルだと、シャード数で速度の変動をあまり説明できません。
Call:
lm(formula = linePerSec ~ shard, data = grp)
Residuals:
Min 1Q Median 3Q Max
-36386 -4609 3134 8493 16365
Coefficients:
Estimate Std. Error t value Pr(>|t|)
(Intercept) 53824.4 930.4 57.85 <2e-16 ***
shard 2474.4 150.0 16.50 <2e-16 ***
---
Signif. codes: 0 ‘***’ 0.001 ‘**’ 0.01 ‘*’ 0.05 ‘.’ 0.1 ‘ ’ 1
Residual standard error: 11400 on 698 degrees of freedom
Multiple R-squared: 0.2806, Adjusted R-squared: 0.2796
F-statistic: 272.3 on 1 and 698 DF, p-value: < 2.2e-16
#シャードの2次、3次、平方根、対数
2次関数、3次関数、平方根、対数等いろいろ試してみたら、3次関数と対数は52%説明できます。
grp$shard2<-grp$shard^2
grp$shard3<-grp$shard^3
grp$shardR<-grp$shard^0.5
grp$shardL<-log(grp$shard)
lm.shard2<-lm(linePerSec~shard+shard2, data=grp)
lm.shard3<-lm(linePerSec~shard+shard2+shard3, data=grp)
lm.shardR<-lm(linePerSec~shard+shardR, data=grp)
lm.shardL<-lm(linePerSec~shard+shardL, data=grp)
print(summary(lm.shard2))
print(summary(lm.shard3))
print(summary(lm.shardR))
print(summary(lm.shardL))
shards<-seq(1,10)
pred.shard2<-predict(lm.shard2, list(shard=shards, shard2=shards^2))
pred.shard3<-predict(lm.shard3, list(shard=shards, shard2=shards^2, shard3=shards^3))
pred.shardR<-predict(lm.shardR, list(shard=shards, shardR=shards^0.5))
pred.shardL<-predict(lm.shardL, list(shard=shards, shardL=log(shards)))
lines(shards, pred.shard3, col="green", lwd=2)
lines(shards, pred.shardL, col="darkgreen", lwd=2)
lines(shards, pred.shardR, col="red", lwd=2)
lines(shards, pred.shard2, col="blue", lwd=2)
legend("bottomright", lty=rep(1,4), lwd=rep(2,4),
legend=c("Cube (52%)","Logarithm (52%)","Square Root (51%)","Quadratic (47%)"),
col=c("green","darkgreen","red","blue"))
#並行度とシャード
半分しか説明できないので、やはり並行度も考える必要があります。
grp$concurrency2<-grp$concurrency^2
grp$concurrency3<-grp$concurrency^3
grp$concurrencyR<-grp$concurrency^0.5
grp$concurrencyL<-log(grp$concurrency)
lm.bs2c2<-lm(linePerSec~bulkline+shard+shard2+concurrency+concurrency2, data=grp)
lm.bs3c3<-lm(linePerSec~bulkline+shard+shard2+shard3+concurrency+concurrency2+concurrency3, data=grp)
lm.bsRcR<-lm(linePerSec~bulkline+shard+shardR+concurrency+concurrencyR, data=grp)
lm.bsLcL<-lm(linePerSec~bulkline+shard+shardL+concurrency+concurrencyL, data=grp)
print(summary(lm.bs2c2))
print(summary(lm.bs3c3))
print(summary(lm.bsRcR))
print(summary(lm.bsLcL))
上記コードを実行すると、下記のような結果が出力されます。3次関数と対数は90%以上説明できます。
lm(formula = linePerSec ~ bulkline + shard + shard2 + concurrency +
concurrency2, data = grp)
Multiple R-squared: 0.8402, Adjusted R-squared: 0.8391
-----------------------------------------------------------------------
lm(formula = linePerSec ~ bulkline + shard + shard2 + shard3 +
concurrency + concurrency2 + concurrency3, data = grp)
Multiple R-squared: 0.9134, Adjusted R-squared: 0.9125
-----------------------------------------------------------------------
lm(formula = linePerSec ~ bulkline + shard + shardR + concurrency +
concurrencyR, data = grp)
Multiple R-squared: 0.8979, Adjusted R-squared: 0.8972
-----------------------------------------------------------------------
lm(formula = linePerSec ~ bulkline + shard + shardL + concurrency +
concurrencyL, data = grp)
...
Multiple R-squared: 0.907, Adjusted R-squared: 0.9063
#最大速度はどこ?
下記のように、3次関数と対数のモデルで、最大速度のパラメータがわかります。
bestAll<-grp[, c("bulkline", "shard", "concurrency")]
bestAll$pbs3c3<-apply(best, 1, function(row) predict_bs3c3(row["shard"], row["concurrency"], row["bulkline"]))
bestAll$pbsLcL<-apply(best, 1, function(row) predict_bsLcL(row["shard"], row["concurrency"], row["bulkline"]))
print(best[which.max(best$pbs3c3),])
print(best[which.max(best$pbsLcL),])
下記の出力によると、3次関数モデルの場合は、5つのシャードのインデックスに、2MBのBulkサイズで5つのクライアントが同時にインポートすると最高速度が実現できます。対数モデルの場合は、7つのシャードのインデックスに、2MBのBulkサイズで4つのクライアントが同時にインポートすると最高速度が実現できます。
bulkline shard concurrency pbs3c3 pbsLcL
565 54472 5 5 80922.23 78892.3
bulkline shard concurrency pbs3c3 pbsLcL
427 54472 7 4 78685.83 81365.69
2つのモデルは、一番小さい2MBのBulkサイズで最高速度が出たので、Bulkサイズを2MBに固定して、モデルを描画してみました。下記は3次関数の図です。赤い点は最大速度です。5つのシャードと並行度5のときに実現できます。
対数もよい結果出たので、下記の図をご参照ください。最大速度は7つのシャードと並行度4のときに実現できます。
コード一覧
main.go
enrich.py
speed.r
#結論
今回のモデルによると、インデックスのシャード数を5~7に設定し、4,5のスレッドで同時にBulkインポートを行えば、最大の速度が出ます。しかし、今回のテストでは、Bulkサイズが小さいほうがよい、という結果がでました。原因を調べる必要があります。
また、速度に影響を与えるパラメータは、サーバスペックとデータの特徴等あるので、大量のテストデータを取得して、より精度の高いモデルを作る必要があります。