Comprehensive Guide to Optimize Data Workloads | DatabricksのセクションData Spilling — Why It Happens and How to Get Rid of Itの翻訳です。
- 本書は著者が手動で翻訳したものであり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
- 2023年時点の内容です。一部情報が古いものがあります。
データのシャッフルは、join、集計処理、ウィンドウオペレーションなどのようなワイドな変換処理の結果として発生します。これは、ワーカーノード間でのネットワークを通じたデータの送信が行われるため、高コストなプロセスです。シャッフルを排除、あるいはシャッフルの効率とスピードを改善するためのいくつかの最適化アプローチが存在しています。
1. ブロードキャストハッシュジョイン
データのシャッフルを完全に回避するには、joinされる2つのテーブルやデータフレームの1つ(小さい方)をブロードキャストします。ドライバーによってテーブルがブロードキャストされ、全てのワーカーノードにコピーします。
joinを実行する際、Sparkは自動で10MBより小さいテーブルをブロードキャストします。しかし、以下に示しているように、さらに大きなテーブルをブロードキャストするようにこの挙動を調整したいと思うかもしれません:
set spark.sql.autoBroadcastJoinThreshold = <size in bytes>
あなたのクエリーのいくつかのテーブルが小さいことを知っているならば、推奨のオプションであるヒントを明示的に用いて、Sparkにそれらをブロードキャストするように指示することができます。
SELECT /*+ BROADCAST(t) */ * FROM <table-name> t
Spark 3.0以降では、ソートマージjoinのいずれかのjoinサイドの実行時の統計がデフォルトが30MBであるアダプティブブロードキャストハッシュジョインの閾値よりも小さい場合に、ソートマージjoinをブロードキャストハッシュジョイン(BHJ)に変換することもできるAQE (Adaptive Query Execution)を提供しています。以下の設定を変更することでこの閾値を増やすこともできます:
set spark.databricks.adaptive.autoBroadcastJoinThreshold = <size in bytes>
-
full outer joinではブロードキャストハッシュジョインはサポートされていないことに注意してください。right outer joinでは、左側のテーブルのみをブロードキャストすることができ、他のleft joinでは右側のテーブルのみをブロードキャストすることができます。
-
大量のメモリー(32GB+)を持つドライバーを実行している場合、ブロードキャストの閾値を安全に200MBなどの値に増やすことができます。
set spark.sql.autoBroadcastJoinThreshold = 209715200; set spark.databricks.adaptive.autoBroadcastJoinThreshold = 209715200;
-
ヒントやPySparkのbroadcast関数を用いて、常に明示的に小さい方のテーブルをブロードキャストしましょう。
-
AQEが自動で小さなテーブルをブロードキャストできる場合に、なぜ明示的に小さなテーブルをブロードキャストするのでしょうか?この理由は、AQEはクエリーが実行されている過程で最適化を行うからです。
- Sparkは両サイドのデータをシャッフルする必要があり、AQWはシャッフルステージの統計情報に基づいて物理計画を変更することができ、ブロードキャストジョインに変換できるのみです。
- このため、ヒントを用いて明示的に小さい方のテーブルをブロードキャストすると、シャッフルを完全にスキップし、あなたのジョブは計画を最適化するためのAQEの介入を待つ必要がなくなります。
-
ブロードキャストはドライバー経由で行われるので、結成て1GBより大きなテーブルをブロードキャストしないでください。さもないと、ドライバーでOOMを引き起こしたり、大規模なGCによる停止によってドライバーが反応しなくなります。
-
ディスク上のテーブルのサイズとメモリー上のテーブルのサイズが同じではないことに注意してください。DeltaテーブルはParquetファイルをベースとしており、データによって圧縮のレベルが異なります。そして、Sparkはディスクにおけるテーブルサイズに基づいてブロードキャストする場合があります。しかし、それらは実際には解凍後、列フォーマットから行フォーマットに変換した後のメモリー上では大きなもの(8GB以上)になる場合があります。このため、このような状況ではあなたのジョブは例外とともに失敗することでしょう。この場合のソリューションは、
spark.sql.autoBroadcastJoinThreshold
を-1に設定することでブロードキャストを無効化し、ディスクで本当に小さいテーブルのヒント(やPySparkのbroadcast関数)を用いて明示的なブロードキャストを行うか、spark.sql.autoBroadcastJoinThreshold
の閾値を-1ではなく、100MBや50MBのような小さな値に設定します。 -
ドライバーはいかなる時でもメモリーに最大1GBのデータしか収集することができず、それ以外のケースではドライバーでエラーを発生させ、ジョブが失敗します。しかし、10MBよりも大きなテーブルをブロードキャストしたいと考えるので、この問題に直面するリスクを持つことになります。この問題は、以下のドライバーの設定値を増やすことで解決することができます。
- これはドライバーの設定なので、クラスターが起動したら変更できないことを覚えておいてください。このため、クラスターの高度な設定のSpark設定として設定する必要があります。多くの場合、32GB以上のメモリーを持つドライバーで、このパラメーターを8GBに設定するとうまくいきます。ブロードキャストハッシュジョインが非常に大きなテーブルをブロードキャストするような特定のケースでは、この値を16GBに設定することも合理的と言えます。
- Photonではエグゼキューターサイドでのブロードキャストを使うことができます。このため、Photon有効化のDatabricksランタイムを使っているのであれば、以下のドライバー設定を変更する必要はありません。
spark.driver.maxResultSize 16g
2. ソートマージジョインよりもシャッフルハッシュジョインを
多くの場合、Sparkがテーブルをブロードキャストできない場合、ソートマージジョイン(SMJ)を選択します。ソートマージジョインは最も高コストなものです。シャッフルハッシュジョイン(SHJ)は、SMJのように追加のソートステップを必要としないため、いくつかの状況(全てではありません)では、シャッフルハッシュジョインの方が高速になります。SparkにSMJよりもSHJを選択するようにアドバイスできる設定があり、それによってSparkは可能な限りSMJではなく、SHJを使おうとします。これは、Sparkが常にSMJではなくSHJを選択することを意味するのではないことに注意してください。このオプションでシンプルにあなたの好みを定義するだけです。
set spark.sql.join.preferSortMergeJoin = false
また、DatabricksのPhotonも、クエリーの性能をブーストするために、ソートマージジョインをシャッフルハッシュジョインで置き換えます。
- それぞれのジョブで
preferSortMergeJoin
の設定オプションをfalseに設定する必要はありません。対象のジョブの最初の実行においては、デフォルト(true)のままにしておくことができます。 - 対象のジョブが、大量のデータシャッフルを伴う大量のjoinを実行し、期待するSLAを達成することが困難な場合、このオプションを使うことができ、
preferSortMergeJoin
をfalseに変更することができます。
3. コストベースオプティマイザ(CBO)の活用
Spark SQLでは、クエリープランを改善するためにコストベースオプティマイザ(CBO)を使うことができます。これは、複数のjoinを伴うクエリーでは特に有用です。CBOはデフォルトで有効化されています。以下の設定を変更することでCBOを無効化できます:
set spark.sql.cbo.enabled = false
CBOが動作するためには、テーブルとカラムの統計情報を収集し、最新に保つことが重要となります。この統計情報に基づいて、CBOは最も経済的なjoin戦略を選択します。このため、統計情報を計算するためにテーブルに対して、以下のSQLコマンドを実行する必要があります。この統計情報はHiveメタストアに格納されます。
ANALYZE TABLE table_name COMPUTE STATISTICS FROM COLUMNS col1, col2, ...;
Joinの再オーダー
クエリー実行を高速にするために、テーブルをどの順序でjoinするのが最適なのか(例えば、最初に小さなテーブルをjoinすることで劇的にパフォーマンスを改善できるなど)を特定するためにも、ANALYZE TABLEコマンドによって計算された統計情報を活用することができます。Joinの再オーダーは、INNERとCROSS joinでのみ動作します。この機能を活用するには、以下の設定を行います:
set spark.sql.cbo.enabled = true
set spark.sql.cbo.joinReorder.enabled = true
set spark.sql.statistics.histogram.enabled = true
-- CostBasedJoinReorder requires statistics on the table row count at the very least and its accuracy is improved by having statistics on the columns that are being used as join keys and filters.
ANALYZE TABLE table_name COMPUTE STATISTICS FOR COLUMNS col1, col2..;
-- The maximum number of tables in a query for which this joinReorder can be used (default is 12)
set spark.sql.cbo.joinReorder.dp.threshold = <number of tables>
- CBOの最適化を適切に活用するには、定期的にANALYZE TABLEコマンドを実行する必要があります(一日一回、あるいは10%以上のデータが変化した際)。
- 日次ベースでDeltaテーブルが再作成されたり、上書きされる際には、同じジョブやパイプラインの一部としてテーブルが上書きされた後即座にANALYZE TABLEコマンドを実行すべきです。これは、あなたのパイプライン全体のSLAに影響を及ぼします。このため、このようなケースでは、後段のパフォーマンスの改善と現在のジョブの実行時間とのトレードオフが存在します。現在のジョブのSLAがCBOの最適化処理による影響を受けたくない場合には、オフにすることができます。
- ジョブの一部で決してANALYZE TABLEコマンドを実行しないでください。別のジョブクラスターの別のジョブとして実行すべきです。例えば、Optimize、Z-order、Vacuumのようなコマンドを実行する夜間のノートブックと一緒に実行することができます。
- 単一のクエリーで大量のinner joinやcross joinが実行される場合にはjoinの再オーダーを活用しましょう。
- クエリーの実行時にクエリープランをより良いものに変更するSparkのAdaptive Query Execution (AQE)もまた、ANALYZE TABLEコマンドによって計算される統計情報を活用します。このため、テーブルの統計情報が更新され続けるように、定期的にANALYZE TABLEコマンドを実行することをお勧めします。