10
10

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

ElasticsearchAdvent Calendar 2015

Day 18

Elasticsearchの処理性能をRでモデル化する試み

Posted at

この記事はElasticsearch Advent Calendar 2015 18日目のエントリーです。

Elasticsearchの性能を最大限に引き出すために、適切なチューニングを行う必要があります。チューニングの正しいやり方については、「Elasticsearchのインデキシングに関するパフォーマンス検討」「Indexing Performance Tips」とが参考になります。

ポイントは1台サーバで1か所の設定を変えて30分以上テストする。この繰り返しです。気の遠くなるような作業ですね。テストしなくても設定値でパフォーマンスを推測できればいいなと、漠然とですが考えていました。

#パラメータの選択
 パフォーマンスに影響する要素は多数あるため、問題を単純化しないと、手のつけようがありません。勝手に下記の3つのパラメータを選びました。

  • Bulkサイズ:Bulk APIを使って複数行のデータ(複数のドキュメント)を一括でインポートすることができます。このデータの固まりのサイズはBulkサイズと呼びます。「Indexing Performance Tips」で書いているように、行数(ドキュメント数)はいい単位ではありません。1行(1ドキュメント)のサイズがかわると、同じ行数(ドキュメント数)でもBulkサイズ全然違うから。
  • シャード数:1つのElasticsearchのIndexを複数のLucene Indexに分けて保存できます。それぞれのLucene IndexはElasticsearchのShard(シャード)と呼びます。
  • クライアントの並行度:同時にBulk APIを呼ぶ出すHTTPクライアントの数です。Javaで実装するとスレッドになりますが、Goで実装したから、Goroutine(軽量なスレッド)となり、だから並行度(Concurrency)というパラメータ名にしました。

#テストのやり方
 2MBのBulkサイズ、1つのシャード、1つのGoroutineから始めて、徐々に設定値を増やしていく。最後は、28MBのBulkサイズ、10つのシャード、5つのGoroutineになる。合計700の組み合わせでテストしました。下記のように、シャード数を増やすと、複数のLuceneエンジンでインデックス化の作業を並列して実行できます。Bulkサイズを増やすと、1回でElasticsearchに送るデータ量が増えます。並行度を増やすと、Gopher[1][2]の数が増えます。
image

Goクライアントはサンプルデータの入ったファイルからデータを読み込んで、指定されたサイズのBufferを作成します。後は、下記のようにElasticsearchにPOSTします。指定された時間が過ぎたら、テストを終了します。ソースコードが長いので、Gistに貼りつけました。

main.go
req, err := http.NewRequest("POST", url, reader)
if err != nil {
	return
}
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")

resp, err := http.DefaultClient.Do(req)
if err != nil {
	return
}
defer resp.Body.Close()
...

 下記のスクリプトでパラメータを徐々に増やしてテストを実行しました。

run.sh
#!/bin/bash

rm upload.log
echo "Delete index."
curl -XDELETE http://localhost:9200/testtest

for c in {1..5}
do
	for i in {1..14}
	do
		for s in {1..10}
		do
			size="$((i*2))"
			echo "Bulk size ${size}MB, shard is $s, concurrency is $c"
			echo "Create index index."
			echo "{\"settings\":{\"number_of_shards\":$s}}"
			curl -XPUT http://localhost:9200/testtest -d "{\"settings\":{\"number_of_shards\":$s}}"

			echo "Import data."
			./essizing input.txt ${size}m 5m http://localhost:9200/testtest/log/_bulk $c
			echo "Get status."
			curl http://localhost:9200/testtest/_status?pretty > ${size}m_${s}shard_c${c}.json

			echo "Delete index."
			curl -XDELETE http://localhost:9200/testtest
			echo "Sleep."
			sleep 30s
		done
	done
done

#テスト結果の加工

 次のようなテスト結果を取得しましたが、シャード数、並行度等情報が入っていないので、pythonのスクリプトでシャード数、並行度やElasticsearchの出力情報等を追加します。

time,post,posted,line,byte,err,bulksize,bulkline
1449828385,5,4,217888,8388688,0,2097172,54472
1449828395,4,4,217888,8388688,0,2097172,54472
1449828405,4,4,217888,8388688,0,2097172,54472
1449828415,4,4,217888,8388688,0,2097172,54472
1449828425,4,4,217888,8388688,0,2097172,54472
...

 加工したデータは下記のようになります。Goクライアントは10秒間隔でログを出力しています。

  • post:10秒間で送信を始めた数。
  • posted:10秒間で送信を完了した数。
  • line:10秒間で送った行数。
  • byte:10秒間で送ったバイト数。
  • err: エラーの数。0でなければテスト結果を信頼できません。
  • bulksize:Bulkのバイト数。
  • bulkline:Bulkの行数。
  • shard:シャード数。
  • concurrency:並行度。
  • essize:1回のテストを実行し完了したら取得したテストIndexのサイズ(バイト数)
  • esdoc:1回のテストを実行し完了したら取得したテストIndexのサイズ(行数、またはドキュメント数)
  • round:テストID(何回目のテスト)。
time,post,posted,line,byte,err,bulksize,bulkline,shard,concurrency,essize,esdoc,round
1449828385,5,4,217888,8388688,0,2097172,54472,1,1,108649967,6531966,1
1449828395,4,4,217888,8388688,0,2097172,54472,1,1,108649967,6531966,1
1449828405,4,4,217888,8388688,0,2097172,54472,1,1,108649967,6531966,1
1449828415,4,4,217888,8388688,0,2097172,54472,1,1,108649967,6531966,1
1449828425,4,4,217888,8388688,0,2097172,54472,1,1,108649967,6531966,1
...

#テスト結果の時系列分析

 次は、このデータをRで分析しモデルを作ります。やっと本番って感じですね。とりあえず時間軸で描画してみます。下記のように、全データが多いので傾向よく見えません。
image
 最初10回のテストだけの図であれば、シャード数を増やすと転送スペードが上がったことがわかります。しかし、シャード7からちょっと落ちました。テストの間に30秒間の間隔を入れたので、テストをはっきりと区別できます。
image

timeseries.r
input<-"20151211_new2_finished\\upload_new2_finished.log"
upload<-read.table(input, header=T, sep=",")

par(mfrow=c(3,1))
plot(upload[, 5]/1024/1024, type="l", ylab="MB/10s", xlab="10 seconds interval", main="concurrency=1~5, bulk size=2~28MB, shard=1~10")
plot(upload[upload$round<=10, 5]/1024/1024, type="l", ylab="MB/10s", xlab="10 seconds interval", main="concurrency=1, bulk size=2MB, shard=1~10")
text(15,5, "shard 1")
text(45,5, "shard 2")
text(80,5, "shard 3")
text(110,5, "shard 4")
text(140,5, "shard 5")
text(170,5, "shard 6")
text(200,5, "shard 7")
text(235,5, "shard 8")
text(265,5, "shard 9")
text(295,5, "shard 10")

#Rで平均速度を描画してみる
 10秒間隔のインポード速度はばらつきがあるので、テストごとの平均速度を計算します。なお、他のテスト結果と比較するために、今回行数単位で計算します。下記のように、シャード数を増やすと、速度が上がるような傾向が見えます。並行度は色で区別してみると、同じ傾向が見えます。
image

speed.r
input<-"20151211_new2_finished\\upload_new2_finished.log"
upload<-read.table(input, header=T, sep=",")

grp<-aggregate(time ~ bulksize+bulkline+shard+concurrency+essize+esdoc+round, data=upload, FUN=min)
time2<-aggregate(time ~ round, data=upload, FUN=max)
grp$time2<-time2$time
grp$durationSec<-grp$time2 - grp$time + 10 # post began 10 seconds before (after buffer created)
grp$bytePerSec<-grp$essize / grp$durationSec
grp$linePerSec<-grp$esdoc / grp$durationSec


plot(grp$shard, grp$linePerSec, xlab="shard", ylab="line/s",
    col=ifelse(grp$concurrency==1, "firebrick", 
    ifelse(grp$concurrency==2, "gold", 
    ifelse(grp$concurrency==3, "chartreuse", 
    ifelse(grp$concurrency==4, "deepskyblue", "darkviolet"))))) 
legend("topleft", legend=c("c1","c2","c3","c4","c5"), 
    col=c("firebrick","gold","chartreuse","deepskyblue","darkviolet"),
    pch=rep(16,5))

#単回帰分析

 次は学習してみます。速度をシャード数の上に回帰します。回帰分析のためlm関数を使います。linePerSec~shardは、回帰式「linePerSec = a + b*shard」の意味です。

lm1.r
model1<-lm(linePerSec~shard, data = grp)
print(summary(model1))
abline(model1)

 結果のモデルを描画してみたら、ぜんぜん実際のデータと合っていない気がします。
image

 summary関数の出力を確認したら、やはり決定係数(R-squared)は28%しかありません。決定係数は、独立変数(shard)が従属変数(linePerSec)のどれくらいを説明できるかを表す値です。つまり、このモデルだと、シャード数で速度の変動をあまり説明できません。

lm1_output.r
Call:
lm(formula = linePerSec ~ shard, data = grp)

Residuals:
   Min     1Q Median     3Q    Max 
-36386  -4609   3134   8493  16365 

Coefficients:
            Estimate Std. Error t value Pr(>|t|)    
(Intercept)  53824.4      930.4   57.85   <2e-16 ***
shard         2474.4      150.0   16.50   <2e-16 ***
---
Signif. codes:  0 *** 0.001 ** 0.01 * 0.05 . 0.1   1

Residual standard error: 11400 on 698 degrees of freedom
Multiple R-squared:  0.2806,    Adjusted R-squared:  0.2796 
F-statistic: 272.3 on 1 and 698 DF,  p-value: < 2.2e-16

#シャードの2次、3次、平方根、対数

 2次関数、3次関数、平方根、対数等いろいろ試してみたら、3次関数と対数は52%説明できます。
image

nonlinear.r
grp$shard2<-grp$shard^2
grp$shard3<-grp$shard^3
grp$shardR<-grp$shard^0.5
grp$shardL<-log(grp$shard)

lm.shard2<-lm(linePerSec~shard+shard2, data=grp)
lm.shard3<-lm(linePerSec~shard+shard2+shard3, data=grp)
lm.shardR<-lm(linePerSec~shard+shardR, data=grp)
lm.shardL<-lm(linePerSec~shard+shardL, data=grp)

print(summary(lm.shard2))
print(summary(lm.shard3))
print(summary(lm.shardR))
print(summary(lm.shardL))

shards<-seq(1,10)
pred.shard2<-predict(lm.shard2, list(shard=shards, shard2=shards^2))
pred.shard3<-predict(lm.shard3, list(shard=shards, shard2=shards^2, shard3=shards^3))
pred.shardR<-predict(lm.shardR, list(shard=shards, shardR=shards^0.5))
pred.shardL<-predict(lm.shardL, list(shard=shards, shardL=log(shards)))

lines(shards, pred.shard3, col="green", lwd=2)
lines(shards, pred.shardL, col="darkgreen", lwd=2)
lines(shards, pred.shardR, col="red", lwd=2)
lines(shards, pred.shard2, col="blue", lwd=2)

legend("bottomright", lty=rep(1,4), lwd=rep(2,4),
    legend=c("Cube (52%)","Logarithm (52%)","Square Root (51%)","Quadratic (47%)"), 
    col=c("green","darkgreen","red","blue"))

#並行度とシャード

 半分しか説明できないので、やはり並行度も考える必要があります。

multi.r
grp$concurrency2<-grp$concurrency^2
grp$concurrency3<-grp$concurrency^3
grp$concurrencyR<-grp$concurrency^0.5
grp$concurrencyL<-log(grp$concurrency)

lm.bs2c2<-lm(linePerSec~bulkline+shard+shard2+concurrency+concurrency2, data=grp)
lm.bs3c3<-lm(linePerSec~bulkline+shard+shard2+shard3+concurrency+concurrency2+concurrency3, data=grp)
lm.bsRcR<-lm(linePerSec~bulkline+shard+shardR+concurrency+concurrencyR, data=grp)
lm.bsLcL<-lm(linePerSec~bulkline+shard+shardL+concurrency+concurrencyL, data=grp)

print(summary(lm.bs2c2))
print(summary(lm.bs3c3))
print(summary(lm.bsRcR))
print(summary(lm.bsLcL))

 上記コードを実行すると、下記のような結果が出力されます。3次関数と対数は90%以上説明できます。

multi_output.r
lm(formula = linePerSec ~ bulkline + shard + shard2 + concurrency + 
    concurrency2, data = grp)

Multiple R-squared:  0.8402,    Adjusted R-squared:  0.8391 
-----------------------------------------------------------------------
lm(formula = linePerSec ~ bulkline + shard + shard2 + shard3 + 
    concurrency + concurrency2 + concurrency3, data = grp)

Multiple R-squared:  0.9134,    Adjusted R-squared:  0.9125 
-----------------------------------------------------------------------
lm(formula = linePerSec ~ bulkline + shard + shardR + concurrency + 
    concurrencyR, data = grp)

Multiple R-squared:  0.8979,    Adjusted R-squared:  0.8972 
-----------------------------------------------------------------------
lm(formula = linePerSec ~ bulkline + shard + shardL + concurrency + 
    concurrencyL, data = grp)
...
Multiple R-squared:  0.907,     Adjusted R-squared:  0.9063 

#最大速度はどこ?

 下記のように、3次関数と対数のモデルで、最大速度のパラメータがわかります。

best.r
bestAll<-grp[, c("bulkline", "shard", "concurrency")]
bestAll$pbs3c3<-apply(best, 1, function(row) predict_bs3c3(row["shard"], row["concurrency"], row["bulkline"]))
bestAll$pbsLcL<-apply(best, 1, function(row) predict_bsLcL(row["shard"], row["concurrency"], row["bulkline"]))
print(best[which.max(best$pbs3c3),])
print(best[which.max(best$pbsLcL),])

 下記の出力によると、3次関数モデルの場合は、5つのシャードのインデックスに、2MBのBulkサイズで5つのクライアントが同時にインポートすると最高速度が実現できます。対数モデルの場合は、7つのシャードのインデックスに、2MBのBulkサイズで4つのクライアントが同時にインポートすると最高速度が実現できます。

best_output.r
     bulkline shard concurrency   pbs3c3  pbsLcL
565    54472     5           5 80922.23 78892.3
    bulkline shard concurrency   pbs3c3   pbsLcL
427    54472     7           4 78685.83 81365.69

 2つのモデルは、一番小さい2MBのBulkサイズで最高速度が出たので、Bulkサイズを2MBに固定して、モデルを描画してみました。下記は3次関数の図です。赤い点は最大速度です。5つのシャードと並行度5のときに実現できます。
image

 違う角度から見た3次関数の図です。
image

 対数もよい結果出たので、下記の図をご参照ください。最大速度は7つのシャードと並行度4のときに実現できます。
image

 違う角度から見た対数の図です。
image

コード一覧
main.go
enrich.py
speed.r

#結論

 今回のモデルによると、インデックスのシャード数を5~7に設定し、4,5のスレッドで同時にBulkインポートを行えば、最大の速度が出ます。しかし、今回のテストでは、Bulkサイズが小さいほうがよい、という結果がでました。原因を調べる必要があります。

 また、速度に影響を与えるパラメータは、サーバスペックとデータの特徴等あるので、大量のテストデータを取得して、より精度の高いモデルを作る必要があります。

10
10
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
10

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?