2017年の記事です。
クエリーが「速すぎる」ことによって起きる失敗テストケースをデバッグする
この記事では、cross joinクエリーが「速すぎる」ことで引き起こされた失敗テストケースのデバッグの経験を説明します。この失敗テストケースの根本原因は、Apache SparkからJVMのJITコンパイラまで多層に跨っていたため、この記事で我々の分析結果をシェアさせてください。
コンパイラとしてのSpark
ビッグデータSQL、MPPエンジンの大部分は、分析ワークロードにおいては非効率的なVolcanoイテレーターアーキテクチャを採用しています。Spark 2.0のリリース以来、Apache Sparkにおける新たなTungsten実行エンジンは、クエリー全体を単一の関数に分割するために、モダンなコンパイラからインスパイアされたテクニックであるwhole-stage generationを実装しています。このJITコンパイラアプローチは、行ごとの処理や、他のエンジンで採用されているコード生成モデルよりも遥かに優れたアーキテクチャであり、Sparkをマーケットで最も効率的なものにしています。過去の記事では、Spark 2.0がbroadcast hash joinを用いることで、ラップトップで10億レコード/秒を生成できることをデモンストレーションしました。
Spark 2.0は、scan、filter、aggregate、hash joinのような基本的なSQLオペレーターの大部分でwhole-stage code generationを実装しています。お客様からのフィードバックに基づき、Databricksにおけるbroadcast nested loop joinでもwhole-stage code generationを実装し、2倍から10倍の改善を達成しました。
失敗テストケースのミステリー
この改善に対して我々はとてもハッピーでしたが、Databricksにおけるテストケースの一つが失敗し始めたことに気づきました。ハングしているクエリーをシミュレーションするために、テストケースでは1兆レコードを生成するcross joinを実行していました。
spark.range(1000 * 1000).crossJoin(spark.range(1000 * 1000)).count()
シングルノードにおいて、このクエリーは永遠に実行し続けるか、「ハングする」ことを期待していました。驚くべきことに、Jenkinsインフラストラクチャ上で時々このテストケースは、我々がこのクエリーに設定したタイムリミットである1秒以内に完了して、テストケースが非決定論的に失敗することを見始めることになりました。
失敗したケースの一つにおいては、以下に示すように40コアを使ったbroadcast nested loop joinを実行していることに気づきました。すなわち、それぞれのコアが1秒当たり250億レコードを処理していることになります。我々がパフォーマンス改善を喜んでいるときに、何かがおかしかったのです。CPUは4GHz以下で稼働しています。だとしたら、どうやったらコアは、joinにおいてサイクルごとに6行以上を処理できるのでしょうか?
Sparkクエリーの一生
原因を明らかにする前に、どのようにSparkのクエリーの実行が動作するのかをウォークスルーしましょう。
Sparkは内部的に、クエリーやデータフレームを論理的プランとして表現します。Catalystオプティマイザは、論理的プランに対してルールベースののぞき穴的最適化とコストベースの最適化の両方を適用します。論理的クエリーの最適化の後で、Catalystは論理的プランを、どのようにクエリーを実行すべきかに関するより多くの情報を含む物理的プランに変換します。例として、「join」は論理的なプランのノードであり、物理的にjoinをどのように実行すべきかは記述していません。一方、「hash join」や「nested loop join」は、どのようにクエリーを実行すべきかを指定している物理的プランのノードとなります。
whole-stage code generationの前に、それぞれの物理的プランは、実行を定義するコードを含むクラスとなっています。whole-stage code generationによって、プランツリーにある全ての物理プランのノードは、実行に対応する単一関数におけるJavaコードを生成するために一緒に動作します。このJavaコードは高速なJavaコンパイラであるJaninoを用いてJVMバイトコードに変換されます。そして、さらにバイトコードを最適化するために、JVMのJITがキックされ、最終的にはマシン語にコンパイルされます。
このケースでは、生成されたJavaコードは以下のようになります(説明のためにシンプルにしています)。
long agg_value1 = 0L;
while (range_number != range_batchEnd) {
range_number += 1L;
for(int bnlj_index = 0; bnlj_index < broadcast.length; ++bnlj_index) {
InternalRow row = broadcast[bnlj_index];
agg_value1 += 1L;
}
}
我々の最初の推理は、JVMのJITが賢く、JITがバイトコードを分析した結果、inner loop(内側のループ)はいくつかのカウンターをインクリメントするだけで副作用がないことを発見し、inner loopを削除したのではないかというものでした。その場合、JITはコードを以下のように書き直すはずです。
long agg_value1 = 0L;
while (range_number != range_batchEnd) {
range_number += 1L;
agg_value1 += broadcast.length;
}
これは、**O(outer * inner)のオペレーションをO(outer)**に変換したことになります。これを検証するために、フラグ-XX:PrintAssembly
を指定してJITによるアセンブラコードをダンプし、アセンブラコードを調査しました。生成されたアセンブラは以下のようになります(注釈を追加しています。完全なバージョンはこちらから参照できます)。
0x00007f4d0510fb6f: jne 0x00007f4d0510fc85
<strong>0x00007f4d0510fb75: mov r10d,DWORD PTR [rbx+0xc] ;r10d ← bnlj_broadcast.length
0x00007f4d0510fb79: mov r9d,r10d
<strong>0x00007f4d0510fb7c: add r9d,0xfffffff1 ; r9d ← bnlj_broadcast.length - 15
0x00007f4d0510fb80: xor r8d,r8d
...
0x00007f4d0510fba6: je 0x00007f4d0510fc4e
0x00007f4d0510fbac: add r13,0x1 ; outer loop increment
0x00007f4d0510fbb0: xor ebp,ebp
0x00007f4d0510fbb2: cmp ebp,r10d ; inner loop condition
0x00007f4d0510fbb5: jge 0x00007f4d0510fb9b
...
0x00007f4d0510fbcd: mov r11d,ebp
0x00007f4d0510fbd0: inc r11d ; inner loop increment by 1
0x00007f4d0510fbd3: add r14,0x1 ; agg_value1 increment by 1
0x00007f4d0510fbd7: cmp r11d,r10d ; inner loop condition
0x00007f4d0510fbda: jge 0x00007f4d0510fb9b
...
0x00007f4d0510fc14: mov QWORD PTR [rcx+0x68],rdi
0x00007f4d0510fc18: add r14,0x10 ; agg_value1 += 16
0x00007f4d0510fc1c: add r11d,0x10 ; inner loop increment by 16
0x00007f4d0510fc20: cmp r11d,r9d ; inner loop condition
; (bnlj_index < bnlj_broadcast.length - 15
0x00007f4d0510fc23: jl 0x00007f4d0510fc10
0x00007f4d0510fc25: cmp r11d,r10d ; inner loop condition
; (bnlj_index < bnlj_broadcast.length
0x00007f4d0510fc28: jge 0x00007f4d0510fb9b
0x00007f4d0510fc2e: xchg ax,ax
アセンブラは少々ごちゃごちゃしていますが、agg_value1 += 1
の指示は0x01
とadd 0x10
のアセンブラ命令の両方を用いて実装されていることには容易に気づくことができます。このことは、inner loopが16の因子で展開されているので、さらなる最適化が可能であることを示唆しています。bnlj_broadcast.length
は16の倍数ではない場合があるので、add 0x01
指令はループを完了するために依然として必要となります。
そして、本当に起きていたのは、以下のようにnested loopが書き直されたということです。
long agg_value1 = 0L;
while (range_number != range_batchEnd) {
range_number += 1L;
for(int bnlj_index = 0; bnlj_index < broadcast.length; bnlj_index += 16) {
agg_value1 += 16L;
}
...
}
得られた教訓
謎は解けました。**実際のところ、この最適化は実際のところ問題になるのでしょうか?**cross joinのカウントを計算するクエリーを実行していないのであれば、おそらくそうはならないでしょう。
しかし、我々はこの経験と原因は魅惑的なものであり、探究心からこのことを共有したいと考えました。inner loopを解く固有の最適化ルールを実装することなしに、別のレイヤーの抽象化、すなわちJVM JITに存在する最適化技術を活用することで、この最適化を実現しました。別の興味深い教訓は、複数のレイヤーの最適化によって、パフォーマンスエンジニアリングはチャレンジングなものとなり、意図した最適化の計測するためのベンチマークをデザインする際には、別レイヤーの最適化が予期しない高速化をもたらし、間違った結論に至る可能性があるため、特段の注意が必要となるというものです。
しかしながら、broadcast nested loop joinの改善は広く利用できるものであり、すべてのDatabricksユーザーは自動でこの最適化の恩恵を受けることができます。