Comprehensive Guide to Optimize Data Workloads | DatabricksのセクションData Skipping and Pruningの翻訳です。
- 本書は著者が手動で翻訳したものであり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
- 2023年時点の内容です。一部情報が古いものがあります。
処理するデータの量は、クエリーの性能と直接関係します。このため、必要なデータのみを読み込み、不必要な全てのデータをスキップすることがとても重要です。SparkやDeltaで適用できるいくつかのデータスキッピングとプルーニング(刈り込み)のテクニックを示します。
1. Deltaのデータスキッピング
Deltaのデータスキッピングは、Deltaテーブルにデータを書き込む際に、最初の32列の背後にあるParquetファイルの統計情報(min、maxなど)を自動で収集します。Databricksでは、クエリーをスピードアップするために、クエリー時に不必要なファイルをスキップするようにこの情報(最小値と最大値)を活用します。
最初の32列以上の統計情報を収集するには、以下のDeltaのプロパティを設定することができます:
-- table property
delta.dataSkippingNumIndexedCols = <value>
長い文字列に対する統計情報の収集は高コストなオペレーションです。長い文字列に対する統計情報の収集を回避するには、長い文字列を含む列を避けるようにdelta.dataSkippingNumIndexedCols
テーブルプロパティを設定するか、以下のようにALTER TABLEを用いて、長い文字列を含む列をdelta.dataSkippingNumIndexedCols
に含まれない場所に移動します。
ALTER TABLE table_name ALTER [COLUMN] col_name col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name]
2. カラムのプルーニング
テーブルを読み込む際、我々は通常全てのカラムを選択しますが、これは非効率的です。無関係のデータのスキャンを回避するために、ワークロードでの利用に本当に含まれるカラム、後段のクエリーで必要とされるカラムが何であるのかを常に尋ねましょう。それらのカラムのみをソーステーブルから選択すべきです。これには、クエリー性能に大きなインパクトを与える可能性があります。
-- SQL
SELECT col1, col2, .. coln FROM table
# PySpark
dataframe = spark.table("table").select("col1", "col2", ... "coln")
3. 述語のプッシュダウン
これは、フィルタリングを「ベアメタル」、つまりデータソースエンジンにプッシュダウンすることを狙いとしています。フィルタリングは、Sparkメモリーにデータがロードされた後に全体のデータセットを取り扱うのではなく、より低レベルで実行されるのでクエリーのパフォーマンスを増加させます。
述語のプッシュダウンを活用するために必要なことは、ソーステーブルからデータを読み込む際にフィルターを追加することだけです。述語のプッシュダウンはデータソースエンジン依存です。Parquet、Delta、Cassandra、JDBCのようなデータソースで動作しますが、text、JSON、XMLなどでは動作しません。
-- SQL
SELECT col1, col2 .. coln FROM table WHERE col1 = <value>
# PySpark
dataframe = spark.table("table").select("col1", "col2", ... "coln").filter(col("col1") = <value>)
joinオペレーションを実行するケースでは、joinの前にフィルターを適用します。経験則として、テーブルの読み込みステートメントの直後でフィルターを適用します。
4. パーティションのプルーニング
パーティションを除外するテクニックによって、特定のパーティションにある必要なファイルのみが読み込まれるように、対応するファイルシステムからフォルダーを読み込む際にパフォーマンスを最適化することができます。これは、ディスクのI/Oの削減を狙いとして、不必要なデータがメモリーに含まれないようにし、可能な限りデータのフィルタリングをソースに近い方にシフトさせます。
パーティションプルーニングを活用するために必要なのは、テーブルパーティションで使用されるカラムにフィルターを提供することです。
-- SQL
SELECT * FROM table WHERE partition_col = <value>
# PySpark
dataframe = spark.table("table").filter(col("partition_col") = <value>)
joinオペレーションを実行している場合には、joinの前にパーティションフィルターを適用します。経験則として、テーブルの読み込みステートメントの直後でフィルターを適用します。
5. 動的パーティションプルーニング(DPP)
Apache Spark 3.0+では、Dynamic Partition Pruning (DPP)と呼ばれる新たな最適化処理が実装されています。DPPは、オプティマイザーがパース時点で排除すべきパーティションを特定できない場合に動作します。特に、任意の数のディメンションテーブルを参照する1つ以上のファクトテーブルから構成されるスタースキーマを考えてみます。このようなjoinオペレーションでは、ディメンションテーブルのフィルタリングから得られるこれらのパーティションを特定することで、ファクトテーブルからjoinが読み込むパーティションをプルーンすることができます。この機能を活用するには設定は不要です。Spark 3.0+ではデフォルトで有効化されています。
6. 動的ファイルプルーニング(DFP)
Dynamic File Pruning (DFP)は、Databricksランタイムで利用可能であり、最近のランタイムではデフォルトで有効化されています。名前が示すようにDPPと同じように動作しますが、クエリーを高速化するためにパーティションレベルではなくファイルレベルで動的なプルーニングを実行します。この機能を活用するには設定は不要です。Databricksランタイム6.1以降では自動で有効化されます。