Comprehensive Guide to Optimize Data Workloads | DatabricksのセクションData Explosion — Identification, Consequences and Solutionsの翻訳です。
- 本書は著者が手動で翻訳したものであり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
- 2023年時点の内容です。一部情報が古いものがあります。
特定の変換ステップ後のSparkジョブの実行において、データ爆発と考えられるデータボリュームの不自然な増加を目撃するかもしれません。これによって、クエリーの実行時間は急激にスローダウンします。以下では、データ爆発を引き起こすことにある最もよくある変換処理のいくつかを示します:
1. Explode関数
JSON、Parquet、Delta、XMLのようなフォーマットを持つ構造化ファイルを操作している際、array、list、mapのようなコレクションでデータを取得することがよくあります。このような場合、Sparkで効率的に処理するためにコレクションの列を行に変換するのに、explode()
関数は有用です。このexplodeオペレーションは、データボリュームを劇的に増加させることがあります。explodeオペレーションは上の画像のようにSpark UIではGenerateとして表現されます。
2. Joinオペレーション
遅いクエリーの一般的な原因は、期待したよりもはるかに多い行を生成するjoinです。これは、多くの場合「行の爆発」と呼ばれるものです。潜在的な行の爆発を特定するには、Spark UIのSortMergeJoinやShuffleHashJoinノードのrows outputメトリックを参照します。
インパクト
Parquet、Deltaなどのようなソースから入力データを読み込む際、Sparkはコアあたりのタスクごとに約128MBを読み込み、これはそれぞれのCPUコアで利用できるメモリーに収まるちょうど良いパーティションサイズとなっています。しかし、データ爆発によって、128MBのデータは単一のCPUコアでは爆発したパーティションにフィットするには十分を持たないため問題となるような、非常に大きなボリューム(数GBなど)に変換されることでしょう。この結果、以降のワイドな変換処理では、大量のデータがディスクに溢れ、クエリーパフォーマンスに大きなインパクトを与えることになります。
ソリューション
-
入力パーティションのサイズを削減します。
explode()
関数の効果を打ち消すために、より小規模なパーティションを作成するために、spark.sql.files.maxPartitionBytes
(デフォルトは128MB)を減らします。例えば、16MBや32MBのようにはるかに小さいパーティションサイズを選択することができます。set spark.sql.files.maxPartitionBytes = <size in bytes>
-
パーティションの総数を増加させるために、読み込みステートメントのすぐ後でrepartition()関数を明示的に呼び出すことができます。これによって、それぞれのパーティションのサイズを削減することができます。
-
joinオペレーションによって爆発が起きている場合には、シンプルなソリューションはパーティションのサイズを128MBよりもはるかに少なくするように、シャッフルパーティションの数を増加させることです。詳細に関しては、手動でのシャッフルパーティションのチューニングのセクションをご覧ください。