8
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

シングルマシンで1兆レコードを秒で処理する:どうしてNested Loop Joinがこれほど高速になったのか

Last updated at Posted at 2021-10-22

Processing a Trillion Rows Per Second on a Single Machine: How Can Nested Loop Joins be this Fast? - The Databricks Blogの翻訳です。

2017年の記事です。

クエリーが「速すぎる」ことによって起きる失敗テストケースをデバッグする

この記事では、cross joinクエリーが「速すぎる」ことで引き起こされた失敗テストケースのデバッグの経験を説明します。この失敗テストケースの根本原因は、Apache SparkからJVMのJITコンパイラまで多層に跨っていたため、この記事で我々の分析結果をシェアさせてください。

コンパイラとしてのSpark

ビッグデータSQL、MPPエンジンの大部分は、分析ワークロードにおいては非効率的なVolcanoイテレーターアーキテクチャを採用しています。Spark 2.0のリリース以来、Apache Sparkにおける新たなTungsten実行エンジンは、クエリー全体を単一の関数に分割するために、モダンなコンパイラからインスパイアされたテクニックであるwhole-stage generationを実装しています。このJITコンパイラアプローチは、行ごとの処理や、他のエンジンで採用されているコード生成モデルよりも遥かに優れたアーキテクチャであり、Sparkをマーケットで最も効率的なものにしています。過去の記事では、Spark 2.0がbroadcast hash joinを用いることで、ラップトップで10億レコード/秒を生成できることをデモンストレーションしました。

Spark 2.0は、scan、filter、aggregate、hash joinのような基本的なSQLオペレーターの大部分でwhole-stage code generationを実装しています。お客様からのフィードバックに基づき、Databricksにおけるbroadcast nested loop joinでもwhole-stage code generationを実装し、2倍から10倍の改善を達成しました。

失敗テストケースのミステリー

この改善に対して我々はとてもハッピーでしたが、Databricksにおけるテストケースの一つが失敗し始めたことに気づきました。ハングしているクエリーをシミュレーションするために、テストケースでは1兆レコードを生成するcross joinを実行していました。

spark.range(1000 * 1000).crossJoin(spark.range(1000 * 1000)).count()

シングルノードにおいて、このクエリーは永遠に実行し続けるか、「ハングする」ことを期待していました。驚くべきことに、Jenkinsインフラストラクチャ上で時々このテストケースは、我々がこのクエリーに設定したタイムリミットである1秒以内に完了して、テストケースが非決定論的に失敗することを見始めることになりました。

失敗したケースの一つにおいては、以下に示すように40コアを使ったbroadcast nested loop joinを実行していることに気づきました。すなわち、それぞれのコアが1秒当たり250億レコードを処理していることになります。我々がパフォーマンス改善を喜んでいるときに、何かがおかしかったのです。CPUは4GHz以下で稼働しています。だとしたら、どうやったらコアは、joinにおいてサイクルごとに6行以上を処理できるのでしょうか?

Sparkクエリーの一生

原因を明らかにする前に、どのようにSparkのクエリーの実行が動作するのかをウォークスルーしましょう。

Sparkは内部的に、クエリーやデータフレームを論理的プランとして表現します。Catalystオプティマイザは、論理的プランに対してルールベースののぞき穴的最適化とコストベースの最適化の両方を適用します。論理的クエリーの最適化の後で、Catalystは論理的プランを、どのようにクエリーを実行すべきかに関するより多くの情報を含む物理的プランに変換します。例として、「join」は論理的なプランのノードであり、物理的にjoinをどのように実行すべきかは記述していません。一方、「hash join」や「nested loop join」は、どのようにクエリーを実行すべきかを指定している物理的プランのノードとなります。

whole-stage code generationの前に、それぞれの物理的プランは、実行を定義するコードを含むクラスとなっています。whole-stage code generationによって、プランツリーにある全ての物理プランのノードは、実行に対応する単一関数におけるJavaコードを生成するために一緒に動作します。このJavaコードは高速なJavaコンパイラであるJaninoを用いてJVMバイトコードに変換されます。そして、さらにバイトコードを最適化するために、JVMのJITがキックされ、最終的にはマシン語にコンパイルされます。

このケースでは、生成されたJavaコードは以下のようになります(説明のためにシンプルにしています)。

long agg_value1 = 0L;
while (range_number != range_batchEnd) {
  range_number += 1L;

  for(int bnlj_index = 0; bnlj_index < broadcast.length; ++bnlj_index) {
    InternalRow row = broadcast[bnlj_index];
    agg_value1 += 1L;
  }
}

我々の最初の推理は、JVMのJITが賢く、JITがバイトコードを分析した結果、inner loop(内側のループ)はいくつかのカウンターをインクリメントするだけで副作用がないことを発見し、inner loopを削除したのではないかというものでした。その場合、JITはコードを以下のように書き直すはずです。

long agg_value1 = 0L;
while (range_number != range_batchEnd) {
  range_number += 1L;
  agg_value1 += broadcast.length;
}

これは、**O(outer * inner)のオペレーションをO(outer)**に変換したことになります。これを検証するために、フラグ-XX:PrintAssemblyを指定してJITによるアセンブラコードをダンプし、アセンブラコードを調査しました。生成されたアセンブラは以下のようになります(注釈を追加しています。完全なバージョンはこちらから参照できます)。

0x00007f4d0510fb6f: jne    0x00007f4d0510fc85
<strong>0x00007f4d0510fb75: mov    r10d,DWORD PTR [rbx+0xc]  ;r10d ← bnlj_broadcast.length
0x00007f4d0510fb79: mov    r9d,r10d
<strong>0x00007f4d0510fb7c: add    r9d,0xfffffff1 ; r9d ← bnlj_broadcast.length - 15
0x00007f4d0510fb80: xor    r8d,r8d
...
0x00007f4d0510fba6: je     0x00007f4d0510fc4e
0x00007f4d0510fbac: add    r13,0x1    ; outer loop increment
0x00007f4d0510fbb0: xor    ebp,ebp
0x00007f4d0510fbb2: cmp    ebp,r10d   ; inner loop condition
0x00007f4d0510fbb5: jge    0x00007f4d0510fb9b 
...
0x00007f4d0510fbcd: mov    r11d,ebp
0x00007f4d0510fbd0: inc    r11d               ; inner loop increment by 1
0x00007f4d0510fbd3: add    r14,0x1            ; agg_value1 increment by 1

0x00007f4d0510fbd7: cmp    r11d,r10d        ; inner loop condition
0x00007f4d0510fbda: jge    0x00007f4d0510fb9b 
...
0x00007f4d0510fc14: mov    QWORD PTR [rcx+0x68],rdi
0x00007f4d0510fc18: add    r14,0x10           ; agg_value1 += 16
0x00007f4d0510fc1c: add    r11d,0x10          ; inner loop increment by 16
0x00007f4d0510fc20: cmp    r11d,r9d   ; inner loop condition
; (bnlj_index < bnlj_broadcast.length - 15
0x00007f4d0510fc23: jl     0x00007f4d0510fc10 
0x00007f4d0510fc25: cmp    r11d,r10d  ; inner loop condition
; (bnlj_index < bnlj_broadcast.length
0x00007f4d0510fc28: jge    0x00007f4d0510fb9b
0x00007f4d0510fc2e: xchg   ax,ax

アセンブラは少々ごちゃごちゃしていますが、agg_value1 += 1の指示は0x01add 0x10のアセンブラ命令の両方を用いて実装されていることには容易に気づくことができます。このことは、inner loopが16の因子で展開されているので、さらなる最適化が可能であることを示唆しています。bnlj_broadcast.lengthは16の倍数ではない場合があるので、add 0x01指令はループを完了するために依然として必要となります。

そして、本当に起きていたのは、以下のようにnested loopが書き直されたということです。

long agg_value1 = 0L;
while (range_number != range_batchEnd) {
  range_number += 1L;

  for(int bnlj_index = 0; bnlj_index < broadcast.length; bnlj_index += 16) {
    agg_value1 += 16L;
  }
  ...
}

得られた教訓

謎は解けました。**実際のところ、この最適化は実際のところ問題になるのでしょうか?**cross joinのカウントを計算するクエリーを実行していないのであれば、おそらくそうはならないでしょう。

しかし、我々はこの経験と原因は魅惑的なものであり、探究心からこのことを共有したいと考えました。inner loopを解く固有の最適化ルールを実装することなしに、別のレイヤーの抽象化、すなわちJVM JITに存在する最適化技術を活用することで、この最適化を実現しました。別の興味深い教訓は、複数のレイヤーの最適化によって、パフォーマンスエンジニアリングはチャレンジングなものとなり、意図した最適化の計測するためのベンチマークをデザインする際には、別レイヤーの最適化が予期しない高速化をもたらし、間違った結論に至る可能性があるため、特段の注意が必要となるというものです。

しかしながら、broadcast nested loop joinの改善は広く利用できるものであり、すべてのDatabricksユーザーは自動でこの最適化の恩恵を受けることができます。

Databricks 無料トライアル

Databricks 無料トライアル

8
8
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?