How to Speed up SQL Queries with Adaptive Query Executionの翻訳です。
この記事は、Databricks Apache Sparkエンジニアリングチーム、Wenchen Fan、Herman van Hovell、MaryAnn Xueと、Intelのエンジニアリングチーム、Ke Jia、Haifeng Chen、Carson Wangの共著です。
2020年の記事です。
以下でカバーされているソリューションのデモ用AQEノートブック
数年をかけて、高品質なクエリー実行計画を生成するために、Spark SQLのクエリーオプティマイザとプランナーを改善するための弛まない努力がはらわれてきました。その中でも最大のものが、Sparkが適切な計画を選択できるように、様々な統計データ(行数、固有な値の数、NULL値、最大値、最小値など)を収集、活用するコストベースの最適化フレームワークです。これらのコストベースの最適化技術の例としては、適切なjoinタイプの選択、hash joinにおける適切な小規模表(build side)の選択、multi-way joinにおけるjoin順序の調整などが含まれます。しかし、古い統計情報や不完全なカーディナリティの推定値は、不適切なクエリー計画を生み出すことになります。Databricksランタイム 7.0で利用できる最新のApache Spark™️ 3.0リリースでは、新たにAdaptive Query Executionが導入され、クエリー実行時に収集される実行時統計情報に基づき、クエリー計画の最適化、調整を行うことで、これらの問題に対応しています。
Adaptive Query Execution(AQE)フレームワーク
Adaptive Query Executionに対する最も重要な質問は、いつ再度の最適化を行うのかということです。Sparkのオペレーターは多くの場合、並列のプロセスでパイプライン化され実行されます。しかし、シャッフルやブロードキャストによるやりとりは、この並列プロセスを壊してしまいます。我々はこれらをマテリアライゼーションポイントと呼び、クエリーにおいてこれらのマテリアライゼーションポイントで分割されるサブセクションを示す「クエリーステージ」という単語を用います。それぞれのクエリーステージは、中間結果をマテリアライズし、マテリアライゼーションを実行する全ての並列プロセスが完了して初めて、以降のステージに進むことができます。この時点で、全てのパーティションにおける統計情報が利用でき、以降のオペレーションがまだ開始していない状態となるので、ここが再最適化(reoptimization)の自然なタイミングとなります。
クエリーがスタートすると、Adaptive Query Executionフレームワークは、まず初めに全てのリーフステージ、他のステージに依存しないステージをキックします。これらのステージの1つ以上のステージがマテリアライゼーションを完了すると、フレームワークはこれらを物理クエリープラン上で完了とマークし、完了したステージから実行時統計情報を収集し、対応する論理クエリープランもアップデートします。これら新たな統計情報に基づき、フレームワークは(選択された論理最適化ルールを伴い)オプティマイザ、物理プランナー、そして、通常の物理ルール、パーティションの結合、skew joinの取り扱いのようなadaptive-execution固有のルールを含む物理最適化ルールを実行します。この時点で、いくつか完了したステージと、新たに最適化されたクエリープランを手に入れたことになり、adapdive executionフレームワークは、子供のステージがマテリアライズされた新規クエリーステージを検索・実行し、クエリー全体が完了するまで上述の実行・再最適化・実行プロセスを繰り返します。
Spark 3.0におけるAQEフレームワークには以下の3つの機能が含まれています。
- シャッフルパーティションの動的結合
- join戦略の動的切り替え
- skew joinの動的最適化
以下のセクションでは、これら3つの機能の詳細を説明します。
シャッフルパーティションの動的結合
非常に大きいデータを取り扱うためにSparkでクエリーを実行する際、通常シャッフルは他のものと比較してクエリー性能に大きなインパクトをもたらします。シャッフルは後段のオペレーターに必要とされるような形でデータを再分配するために、ネットワーク間でのデータ移動を必要とするため、高コストなオペレーターとなります。
シャッフルにおいてキーとなる属性はパーティションの数です。パーティションの最適な数はデータに依存しますが、データサイズはステージごと、クエリーごとに大きく異なるため、数をチューニングすることが困難となります。
- パーティションの数が少ないと、それぞれのパーティションのサイズは非常に大きくなり、これらの大規模データを処理するタスクは(ソートや集計が必要となった際に)データをディスクに溢れさせる(spill)必要が出るかもしれません。そして、結果としてクエリーが遅いものになります。
- パーティションの数が多いと、それぞれのパーティションのデータサイズは非常に小さいものになり、シャッフルブロックを読み取るために小規模のネットワークフェッチが発生し、非効率なI/Oパターンとなり、これもまたクエリーを遅いものにします。大量のタスクを有することは、Sparkのタスクスケジューラーに多大なる負荷をもたらすことにもなります。
この問題を解決するために、我々は最初に比較的大きな数のシャッフルパーティション数を設定し、シャッフルファイルの統計情報を見ながら、実行時に隣接する小さなパーティションより大きなパーティションに結合します。
例えば、クエリーSELECT max(i) FROM tbl GROUP BY j
を実行しているとします。入力データであるtbl
は比較的小さいものであり、グルーピングを行う前には2つのパーティションしかありません。初期状態のシャッフルパーティション数は5に設定され、ローカルのグルーピングによって、部分的にグルーピングされたデータは、5つのパーティションにシャッフルされます。AQEがない場合、Sparkは最終的な集計を行うために、5つのタスクを起動します。しかし、この場合、非常に小さいパーティションが3つ存在しており、それぞれに対して別々のタスクを起動することは無駄になるでしょう。
代わりに、AQEはこれらの小さいパーティションを1つに結合し、結果として、最終的な集計処理においては、5つではなく3つのタスクを実行するだけで済みます。
join戦略の動的切り替え
Sparkは様々なjoin戦略をサポートしており、joinの一方がメモリーに収まる場合には、通常hash joinがもっとも性能が高いものとなります。このことから、Sparkはjoinのリレーションの推定サイズがbroadcastサイズの閾値以下の場合には、broadcast hash joinを計画します。しかし、様々な事柄がこのサイズ推定を間違ったものにしてしまいます。例えば、非常に選択的なフィルター、単なるスキャン以上の複雑なオペレーターの組み合わせによるjoinのリレーションなどです。
この問題を解決するために、AQEは最も精度の高いjoinリレーションのサイズに基づいて、実行時にjoin戦略を再度プランニングします。以下の例に示すように、joinの右側が推定時よりも小さく、broadcastするのに十分であることがわかり、AQEによる再度の最適化後には、もともとsort merge joinと計画されていたものが、broadcash hash joinに変換されます。
実行時にbroadcast hash joinに変換されることで、さらに一般的なシャッフルを局所的なシャッフル(reducerベースではなくmapperごとに読み込みを行うシャッフル)に最適化し、ネットワークトラフィックを削減することができます。
skew joinの動的最適化
クラスターにおいてパーティションが均等に分配されていない場合にデータスキュー(偏り)が発生します。深刻なskewはクエリー性能、特にjoin性能を劇的に悪化させます。AQEのskew join最適化は、シャッフルファイルの統計情報から自動的にこのような偏りを検知します。このような偏りのあるパーティションを小規模なサブパーティションに分割し、もう一方で対応するパーティションとそれぞれがjoinされます。
テーブルAとテーブルBの例を見ていきましょう。ここで、テーブルAは他のパーティションよりもはるかに大きいパーティションA0を有しています。
skew join最適化は、パーティションA0を2つのサブパーティションに分割し、それぞれを対応するテーブルBのパーティションB0とjoinします。
この最適化なしには、1つのタスクに非常に時間を要するsort merge joinを実行する4つのタスクが存在することになります。この最適化によって、joinを実行する5つのタスクが存在することになりますが、それぞれのタスクは概ね同じ時間で処理を完了し、全体的に優れたパフォーマンスを示すことになります。
AQEによるTCP-DSパフォーマンスの改善
TCP-DSデータとクエリーを用いた我々の実験においては、クエリー性能においてAdaptive Query Executionは最大8倍の高速化を達成し、32クエリーにおいては、1.1倍以上のスピードアップを確認できました。以下には最もAQEによる性能改善が認められた10個のTCP-DSクエリーを示しています。
TCP-DSはランタイムに生成されたデータであり、データの偏りがないため、これらの改善の多くは、動的なパーティション結合と動的join戦略切り替えによるものです。しかし、実運用環境においては、AQEの3つ全ての機能を活用することで性能改善が認められています。
AQEの有効化
SQL設定config spark.sql.adaptive.enabled
をtrueに設定(Spark 3.0ではデフォルトはfalseです)することで、AQEを有効化することができます。以下の条件を満たす場合には有効化してください。
- ストリーミングクエリーではない。
- データ交換、サブクエリーが少なくとも1つ存在(通常join、集計、windowオペレーターが存在するケース)する。
クエリーの最適化が静的な統計情報に依存しないようにすることで、AQEはSparkのコストベース最適化における最も困難な課題、統計情報収集のオーバーヘッドと推定精度のバランスを解決しました。最適な推定精度、最適な計画結果を達成するためには、通常は詳細かつ最新の統計情報を維持する必要があり、選択性、カーディナリティを改善したり、偏りを検知するために持ちいられるカラムのヒストグラムのような収集にコストを要するものも存在します。AQEは、このような統計情報の必要性や手動によるチューニングの手間を大幅に削減しました。さらに、AQEは任意のUDF(ユーザー定義関数)の存在や、データサイズの急激な増加、減少、あるいは、高頻度、ランダムなデータの偏りのような予期できないデータセット変更などがあったとしても、SQLクエリーの最適化をより回復力のあるものにしました。もはや、事前にデータを「知る」必要はありません。AQEはデータを理解し、クエリーの実行時にクエリー計画を改善し、より迅速な分析、システムパフォーマンスを実現するためにクエリーのパフォーマンスを改善します。
Spark 3.0について詳細を知りたいのでしたら、ウェビナーに参加ください。Spark 3.0でAQEを試してみてください。Databricksランタイム 7.0上で無料で利用できます。