Performance Showdown: withColumn vs withColumns in... - Databricks Community - 129142の翻訳です。
本書は著者が手動で翻訳したものであり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
イントロダクション
Apache Sparkでスケーラブルなデータパイプラインを構築する際、データフレームにカラムを追加したりカラムを変換する方法はパフォーマンスに対して劇的なインパクトを与えます。この記事では、withColumnとwithColumnsの技術的な違いにディープダイブし、Sparkのオプティマイザがそれぞれをどのように取り扱うのかを探索し、効率的でプロダクションレベルのコードを記述するためのベストプラクティスを共有します。
基礎: withColumnとwithColumns
- withColumn: データフレームに単一の列を追加、あるいは置き換えます。それぞれの呼び出しは、更新されたスキーマを持つ新規のデータフレームを返却します。
- withColumns: Spark 3.3で導入され、カラム名とエクスプレッションのディクショナリーを渡すことで、単一の呼び出しで複数のカラムの追加、置き換えが可能となります。
パフォーマンスの罠: ループでのwithColumnの使用
内部で何が起きているのか?
withColumnの呼び出しを行うたびに、Sparkは追加のProjectノードを持つ新たな論理プランを作成します。多数のカラムを追加、変更するために、ループでwithColumnを使うと,
SparkのCatalystオプティマイザは、すべての新規カラムごとに全体的なプランを再解析、再度の最適化を行う必要があります。これは以下を引き起こします:
- 論理プランのサイズの指数関数的増加
- ジョブのプランニングと実行時間の増加
- 数百のカラムでStackOverflowExceptionやメモリーエラーが起こる可能性
例: 100カラムの追加(非効率的)
from pyspark.sql.functions import lit
df1 = spark.range(100000)
for i in range(100):
df1 = df1.withColumn(f"col_{i+1}", lit(i+1))
df1.explain(extended=True)
== Parsed Logical Plan ==
+- Project [id#78033L, col_1#78036, col_2#78042, col_3#78050, col_4#78060, col_5#78072, col_6#78086, col_7#78102, col_8#78120, col_9#78140, 10 AS col_10#78162]
+- Project [id#78033L, col_1#78036, col_2#78042, col_3#78050, col_4#78060, col_5#78072, col_6#78086, col_7#78102, col_8#78120, 9 AS col_9#78140]
+- Project [id#78033L, col_1#78036, col_2#78042, col_3#78050, col_4#78060, col_5#78072, col_6#78086, col_7#78102, 8 AS col_8#78120]
+- Project [id#78033L, col_1#78036, col_2#78042, col_3#78050, col_4#78060, col_5#78072, col_6#78086, 7 AS col_7#78102]
+- Project [id#78033L, col_1#78036, col_2#78042, col_3#78050, col_4#78060, col_5#78072, 6 AS col_6#78086]
+- Project [id#78033L, col_1#78036, col_2#78042, col_3#78050, col_4#78060, 5 AS col_5#78072]
+- Project [id#78033L, col_1#78036, col_2#78042, col_3#78050, 4 AS col_4#78060]
+- Project [id#78033L, col_1#78036, col_2#78042, 3 AS col_3#78050]
+- Project [id#78033L, col_1#78036, 2 AS col_2#78042]
+- Project [id#78033L, 1 AS col_1#78036]
- それぞれの繰り返しごとに、新規のデータフレームと論理プランが作成されます。
- オプティマイザは非常に大きく複雑なDirected Acyclic Graph (DAG)を処理しなくてはなりません。
- ジョブは時には遅くなり、メモリやスタックの上限に達してしまうかもしれません。
なぜwithColumns(やselect)がより効率的なのか
単一の表現、単一のプラン
- withColumnsによって、単一のトランスフォーメーションで複数のカラムを追加、変更することができます。
- また、selectは、一度にすべてのカラムとそれらのトランスフォーメーションを指定することで、同じ効果を達成するために活用することができます。
メリット
- カラム数に関係なく、単一のProjectノードのみが論理プランに追加されます。
- オプティマイザは、解析、最適化のために非常にシンプルなプランを手に入れることになります。
- プランニングと実行時間において劇的な削減が認められ、ベンチマークでは多数のカラムにおいて3-4xの高速化が示されています。
例: 効率的に100カラムを追加
from pyspark.sql.functions import lit
df2 = spark.range(100000)
cols = {f"col_{i+1}": lit(i+1) for i in range(100)}
df2 = df2.withColumns(cols)
df2.explain(extended=True)
== Parsed Logical Plan ==
Project [id#88638L, 1 AS col_1#88740, 2 AS col_2#88741, 3 AS col_3#88742, 4 AS col_4#88743, 5 AS col_5#88744, 6 AS col_6#88745, 7 AS col_7#88746, 8 AS col_8#88747, 9 AS col_9#88748, 10 AS col_10#88749, 11 AS col_11#88750, 12 AS col_12#88751, 13 AS col_13#88752, 14 AS col_14#88753, 15 AS col_15#88754, 16 AS col_16#88755, 17 AS col_17#88756, 18 AS col_18#88757, 19 AS col_19#88758, 20 AS col_20#88759, 21 AS col_21#88760, 22 AS col_22#88761, 23 AS col_23#88762, 24 AS col_24#88763, ... 76 more fields]
あるいは、selectを使います:
df = df.select(
*df.columns,
*(expr("...").alias(col) for col in columns_to_add))
Sparkオプティマイザはこれらのパターンにどのように対応するのか
複数のwithColumnの呼び出し
1. 論理プランの増加とその影響
-
プランの拡張: それぞれの
withColumn
の呼び出しは、新たな論理プランのノードを生成し、ネスティングによって効果的なオペレーションの複雑性を増加させることのある、プランの成長につながります。 -
プランの深さと幅: 繰り返しのトランスフォーメーションは新規の
withColumn
それぞれがプロジェクションのチェーンを形成するので、論理的に深く複雑なツリーを生み出します。
2. Catalystの最適化のオーバーヘッド
- 繰り返しのパーシングと解析: Catalystはすべての新たな論理プランの全体に対してチェック(型の解決、ルールの適用など)を行います。これは、ノードが数百、数千になると、遅くなりメモリーを大量に消費することになります。
- ルール適用の複雑性: Catalystは数百のルールを順次的かつ反復的なパスで適用します。より多く、深いプランにおいては、比較、再書き込み、ツリー移動のステップが増加し、超線形なコストにつながります。
- ガベージコレクションとJVMのオーバーヘッド: 非常に大きな論理プランは、非常に大量のメモリー割り当てとガベージコレクションのプレッシャーを引き起こし、多くの場合において、ジョブ計画中の一時停止を引き起こします。
3. 物理プランの作成とコード生成
- 物理プランの非効率性: 極端に深い論理ツリーは、多くの場合深くネストされた、あるいは冗長な物理オペレーターに変換され、Spark実行の効率性にインパクトを与えます。
- コード生成(Whole-Stage Codegen): 複雑なプランにおいては、Sparkは大きく、深くネストされたJavaコードを生成し、これはJVMの制限に直面する場合があり、コンパイルエラーやJIT最適化の劣化を引き起こします。
4. 極端なケースの実行時間の影響
- 計画フェーズの遅延: Catalystが計画を解析、再書き込みすることで、ドライバー側がボトルネックとなり、実行時間の遅延や無限のハングに陥ります。ユーザーは、データ処理がスタートする前に「ハングした」Sparkジョブを目撃するかもしれません。
-
Out of Memory / StackOverflow: 数百、数千のカラムを追加する繰り返しの
withColumn
の呼び出しは再帰的なツリーの処理によって、ドライバーにおけるヒープ空間の枯渇やスタックオーバーフローを引き起こします。 - 処理実行時のペナルティ: プランが生成されたとしても、非効率なカラムの刈り込みや不十分なプッシュダウンの最適化、I/OやCPUコストの増大、非効率的なコード生成のパイプラインによって、多くの場合は最適なものとなりません。ワーカーノードは、不必要に複雑な実行計画を解釈するためにより多くの時間を費やすことになるかもしれません。
単一のwithColumnsやselectの呼び出し
- すべてのトランスフォーメーションは、単一の論理プランノードで適用されます。
- オプティマイザはより効率的にプランを解析、最適化することができます。
- 物理プランはよりシンプル、より高性能になります。
ベストプラクティスと推奨事項
- 多数のカラムを追加、変換するのにループでwithColumnを使うのは避けましょう。
- 単一のトランスフォーメーションですべての変更を適用するために、withColumns(Spark 3.3+)あるいはselectを使いましょう。
- (それぞれの新規のカラムが以前のカラムに依存するなど)複雑な依存関係においてはwithColumnのチェーンが必要かもしれませんが、このようなパターンを最小限にするようにしましょう。
あなたのトランスフォーメーションのインパクトを理解するために、df.explain()
を用いて論理、物理プランを常にチェックするようにしましょう。
パフォーマンス比較テーブル(多数のカラム)
手法 | 論理プランのサイズ | オプティマイザのオーバーヘッド | メモリー使用量 | パフォーマンス |
---|---|---|---|---|
ループでのwithColumn | 大 | 高 | 高 | 低 |
withColumns | 小 | 低 | 低 | 良 |
select | 小 | 低 | 低 | 良 |
ベンチマーク
100,000行のデータフレームに対するシンプルな実験では、withColumn()を知いたループでの100カラムの追加には、4.16秒を要しました。withColumn()をwithColumns()で置き換えることでパフォーマンスは97%まで改善し、0.13秒にまで短縮しました。このパフォーマンスのギャップは、カラム数が増えるほど顕著になります。
まとめ
- 単一あるいは少数のカラム: withColumnで大丈夫です。
- 大量のカラム: パフォーマンスボトルネックやメモリー問題を回避するために常にwithColumnsやselectを使うようにしましょう。
- あなたの論理プランを理解しましょう: あなたのコードがSparkのオプティマイザにどのような影響を与えるのかを理解するためにdf.explain()を使いましょう。