How to Improve Apache Spark Performance: 10 Mistakes to Avoidの翻訳です。
本書は著者が手動で翻訳したものであり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
著者:
- Irina Placinta, Resident Solutions Architect @ Databricks
- Canan Girgin, Senior Solutions Consultant @ Databricks
Apache Sparkはビッグデータ処理におけるパワフルなエンジンですが、小さなコーディングの間違いが大きなパフォーマンス劣化につながることがあります。多くのSparkユーザーは知らずして、実行コスト、時間を増加させ、ジョブの失敗すらも引き起こすアンチパターンを知らずのうちに導入してしまいます。
このブログ記事では、Sparkアプリケーションで見かける最も一般的な間違いのいくつかをハイライトし、より効率的でスケーラブルなコードを書く助けとなるようにベストプラクティスを提供します。
1. ビジネスロジックやログでの df.count() の誤用
特に書き込む前にデータが空ではないことをチェックするためだけに、アプリケーションロジックでdf.count()
を呼び出すことは高コストとなります。これは、Sparkが遅延評価を行い、count()
によってデータフレームをマテリアライズを開始し、完全なジョブを実行するためです。これは、実行時間を不必要に増加させます。
推奨事項:
- 多くの場合、書き込むかどうかを特定することは冗長です - 空のデータフレームへの追記は問題ありません。スキーマが欠落していることで失敗するのであれば、
df.write()
の周辺でtry/except
を使いましょう。
try:
df.write.mode("append").save(my_path)
except:
log.error(f"Failed to write data to {my_path}.")
.....
- 追記/マージするテーブル操作の結果を記録するには、データフレームのカウントを実行するのではなく、データを書き込んだ後でテーブル履歴からカウントを取得しましょう。この方がはるかに効率的です。
history = spark.sql("DESCRIBE HISTORY my_catalog.my_schema.my_table")
non_empty_metrics = history.filter(history.operationMetrics.isNotNull())
latest = non_empty_metrics.orderBy("timestamp", ascending=False).first()
row_count = latest["operationMetrics"]["numOutputRows"]
- データを上書きするパス/テーブル操作の結果を記録するには、ターゲットパス/テーブルから直接読み込むことでレコード数を取得することができます。parquet/deltaフォーマットの場合、parquetのメタデータでカウントを取得できるので、このカウントは非常に効率的になります。
df.write.format("delta") \
.mode("overwrite") \
.saveAsTable("my_catalog.my_schema.my_table")
output_df = spark.read.table("my_catalog.my_schema.my_table")
row_count = output_df.count()
2. 極端に幅広(100列以上)のテーブルの作成
Whole-stage code generation (WholeStageCodegen)は、処理のパフォーマンスを劇的に改善するApache Sparkのエンジンにおけるクエリー最適化のテクニックです(Spark 2.0以降はデフォルト)。しかし、あまり知られていないことですが、これは100カラムよりも少ないデータフレーム/テーブルにのみ適用されます[1、2]。このため、非常に幅広なテーブルを取り扱う場合には、Sparkはよりジェネリックで効率の悪い実行モデルにフォールバックします。
あなたのステージでWholeStageCodegenが使われているかどうかを特定するには、Spark UI(SQL/DataFrameタブ / Text Execution summary)にアクセスします。
あるいは:
- データを保持するために複数のテーブルを作成することでカラム数を削減します。 必要なデータを特定するために1,000ものカラムをスクロールするアナリストはいません。
- カラム数を削減するために、MapType、ArrayType、Databricksのバリアントタイプを使います。
- (サーバレスを含む)Photonが有効化されたDatabricksクラスターを使います。PhotonはWholeStageCodegenを使わないので、この制限には該当しません。
実験として、同じ設定のクラスターで1000カラムの非常に幅広のテーブルに対して同じグルーピング操作を行い、Photonの有無で確認しました。Photonクラスターでは、Photon無しのクラスターよりも 45% 短い時間でオペレーションを完了しました。
3. テーブルの上書きではなく直接ファイル/テーブルを削除する
テーブル/パスを上書きする際の一般的なアンチパターンは、事前にデータを削除するという者です。同様に、テーブルのドロップ、再作成もバッドプラクティスと言えます。
このアンチパターンの背後にある意図は、ストレージ容量の節約や、データが実際に削除されたことを確実にする、スキーマの変更など様々です。
dbutils.fs.rm(table_path, recursive = True) # spark.sql(f"DROP TABLE {my_table}")
df.write.format("delta").mode("overwrite").saveAsTable(my_table)
このアプローチはいくつかの問題を引き起こします:
- データの損失: Deltaテーブルの場合、人間やデータのエラーの場合に以前のバージョンに戻す選択肢を失うことになります。
- データを読み込んでいるすべてのプロセスは失敗し、新規データ化が着込まれるまで新たな読み込み処理を行うことができません。
- テーブル履歴とテーブルリネージの損失。
- 希望しないスキーマ変更を検知できなくなります。
- ストレージアカウントからのデータの直接削除に対応するためには、テーブルレベルのみで権限を管理するのではなく、さらなるUnity Catalogの権限が必要となり、セキュリティリスクを引き起こします。
- これは原子的なオペレーションではないので、プロセスは最初に削除を行い、書き込みに失敗したとすると、テーブル/パスは空の状態となり、後段で問題になります。
代替案として、ストレージの懸念に対応するために、テーブルデータとスキーマを上書きして、定期的にvacuumを行いましょう(または、UCマネージドテーブルの予測最適化をオンにしましょう) :
(df.write.format("delta")
.mode("overwrite").option("overwriteSchema", "true").saveAsTable(my_table))
# Vacuum often based on your business needs,reduce default retention period if needed
spark.sql("VACUUM my_table")
4. ループで withColumn() を呼び出す
(通常はメタデータドリブンのフレームワークの一部として)多くの列を追加するためにループでwithColumn()
を過度に呼び出すと、毎回新たなプロジェクションを行い、膨大な実行計画やパフォーマンスの劣化、場合によってはStackOverflowException
を引き起こします。
for i in range(no_columns)
base_df = base_df.withColumn(f"id_{i}", lit(i * 10))
代替案として、複数カラムによるselect()、selectExp()、withColumns()やプログラムでSQL文を構築しましょう。
for i in range(no_columns)
base_df = base_df.select("*", lit(i * 10).alias(f"id_{i}"))
100,000秒のデータフレームで実験を行いました。withColumn()を用いたループで70カラムを追加した場合には3分を要しました。上述の通りにwithColumn() をselect() で置き換えると、41秒で処理が完了し、パフォーマンスは179%改善しました。 2つのアプローチ間でぱどーマンスの改善を確認できる、ループで追加するカラム数のスイートスポットはデータフレームの行数、既存のカラム数、クラスターのサイズに依存します。我々の実験では、40+カラムから70+カラムで変動しています。
5. repartition(1)やcoalesce(1)を使う
このパターンは、開発者が出力するファイルの数を制御しようとする際によく見かけます。ファイル数の削減はビジネス要件(後段のシステムとの互換性など)かもしれませんが、スケーラビリティやパフォーマンスを犠牲にするには十分に強い理由になることはほとんどありません。
データサイズを検討せずに小さな数によるrepartition(n) は処理を非効率的なものにし、ジョブの実行を非常に遅くし、out-of-memory(OOM)エラーを引き起こすことがあります。また、クラスターがフル活用されない可能性を高めます。
*repartition(1)*を使うと、クラスターにおけるデータのフルシャッフルが行われます。フルシャッフルを行うことで、このオペレーションはデータを再分配するためにエグゼキューター間での通信を必要とするので、大規模データでは高コストになる場合があります。
単一ファイルに強制するために coalesce(1) を呼び出すと、単一のCPUコアにデータを結合することで深刻なパフォーマンス問題を引き起こす可能性があり、並列処理を活用できなくなります。Spark UIのStagesでタイムラインとメトリクスで単一のタスクを確認することで、これを容易に特定することができます。
代替案として:
- データボリュームとクラスター設定に基づいて、Sparkに最適なパーティショニングを決定させましょう。 AQEが動的にシャッフルパーティションのcoalesceを行います。
- ファイルサイズの最適化を行うには、Optimizeコマンド、(通常は単一の出力ファイルを生成することなしにデフォルトの~128MBのファイルをターゲットとしたdeltaファイルの圧縮と書き込み最適化を通じてファイルサイズを改善する)自動Optimize、UCマネージドテーブルにおける予測最適化の利用を検討しましょう。
- 単一出力ファイルが本当に重要な要件でない限り、repartition(1)を使うことは避けましょう。そうであったとしても、その要件に疑問を投げかけましょう。
- 本当に単一の出力ファイルが必要な場合でもcoalesce(1)を使うのは避けましょう。repartition(1)の方がパフォーマンスが優れています。
6. すべての列に対して dropDuplicates() や distinct を適用する
Sparkワークフローにおいては、重複排除は一般的なオペレーションですが、ユニーク性に本当に必要なカラムが何かを検討せずにすべてのカラムにdropDuplicates()やdistinctを適用すると、大きなパフォーマンス問題を引き起こし、メモリーのプレッシャーや実行時間を増加させます:
- dropDuplicates()とdistinctのいずれも高コストなデータセットのフルシャッフルを行います。
- すべてのカラム、特に幅広なテーブルに適用した際には、シャッフルサイズが劇的に増加します。
代替案としては、ユニーク性を定義するキーとなるカラムのみを用いて、選択的な重複排除を適用しましょう:
# Only deduplicate based on business keys
deduped_df = df.dropDuplicates(["user_id", "event_time"])
どのカラムで重複排除したらいいのかがわからない場合には:
- ビジネスロジックに関してデータオーナーと会話しましょう。
- 重複キーを理解するために、重複排除前後でのキーごとの重複数を記録することでデータを探索しましょう。
7. ウォーターマークなしのストリーミングアプリケーションで dropDuplicates() を使う
構造化ストリーミングにおいて、ウォーターマークなしでdropDuplicatesを呼び出すと、際限のない状態の増加を引き起こし、最終的にはOOMにつながります。これは、ストリームの文脈において到着するデータの重複を比較するために中間的な状態として、dropDuplicatesがトリガーにおけるすべてのデータを維持するためです。
推奨事項:
- 重複データがどれだけ遅れて到着するのかを制限し、状態ストアをスマートにするために、イベント時間のウォーターマークを使いましょう。 こちらに詳細なガイドがあります。
(streaming_df
.withWatermark("event_time", "1 HOUR")
.dropDuplicates("id", "event_time"))
# OR .dropDuplicatesWithinWatermark("id", "event_time")
- テーブルでデータの重複がない状態を維持することが目的であるならば、「foreachBatch」内のMERGE INTOオペレーションを用いることで、目的テーブルに対する重複排除を行うことで、これを実現することもできます。ソースに重複がある場合には、foreach関数における*dropDuplicates()*の実行にはウォーターマークは必要ありません。
def merge_function(df_batch: DataFrame, batch_id):
df_batch = df_batch.dropDuplicates("id")
df_batch.sparkSession.sql("""
MERGE INTO target_table
USING df_batch
.........
""")
streaming_df
.writeStream
.foreachBatch(merge_function)
....
8. cache()を使う
Sparkの cache() は、エグゼキューターのメモリーやディスクにデータフレームを格納することで、変換処理の再計算を回避する助けとなりますが、利用可能なリソース以上のデータがある場合にはメモリー不足エラーのような問題を引き起こしたり、現在のプロセスの外で背後のデータが変更された際には古い結果となる場合があります。
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
さらに、データセットが不要になってもユーザーは多くの場合unpersist()
を呼び出すことを無視してしまい、クラスターリソースをさらに圧迫することがあります。
代替案として:
- Databricksのディスクキャッシュを使いましょう - これは、読み込んだデータのローカルコピーを削減し、以降の読み込みを高速にします。これはdelta/parquetファイルでのみ動作することに注意してください。
- 非常に時間を要する変換処理の複雑なリネージにおいては、お使いのストレージアカウントにDeltaフォーマットで中間結果を保存しましょう。
- すべてのリネージ全体をSparkに再計算させましょう。
cache()を削除し、上述のいずれかの選択肢を採用するオーバーヘッドによって、パフォーマンスが減少するかもしれませんが、一貫性のある結果を生成し、データがメモリーやディスクにフィットしないことによるOOMによるジョブの失敗を回避することができます。
9. 本番運用で display()、collect()、print(df.count()) を使う
display()、collect()、print(df.count())のような関数は、多くの場合開発やデバッグで用いられるものであり、それらが確かに存在すべき場所でもあります。これらを本番運用のパイプラインに含めてしまうことは、よくあるアンチパターンです。
これらの関数は本質的には悪いものではありません - これらはシンプルに安全にスケールしません:
-
collect()
はすべてのデータをエグゼキューターからドライバーノードに持ってくることになり、大規模データセットにおいてはドライバーのメモリーオーバーフロー(OOMエラー)を引き起こします。 -
display()
はノートブックやユーザインタフェースのために設計されており、特に複雑なデータフレームに対して実行される場合には、スケーラブルなバッチやストリーミングジョブのためのものではありません。 -
print(df.count())
は繰り返し実行される可能性のある完全なアクションを起動し、パイプラインに不必要なオーバーヘッドを追加します。
本番運用のジョブはクリーンかつ対面を想定せず、ログやメトリクスを通じて観測可能なものであるべきであり、開発者を対象とした出力を行うべきではありません。代わりに、パイプラインの実行を追跡するためのロギングフレームワークや監視ツールを活用しましょう。
10. XML/JSONファイルを平坦にするために過度に explode() を使う
explode()
の適用は、行数を大幅に増加させ、パーティションのサイズを劇的に増加させます。1000アイテムの配列においては、行数が1000倍に増えることになります。これは、データの溢れを引き起こし、アプリケーションの実行を非常に遅くさせ、OOMで失敗することになります。この問題は、多くの場合XMLやJSONファイルの平坦化で目撃されます。
推奨事項:
- exlodeの前にデータを理解するために、Databricksのバリアントタイプを使いましょう。多くの場合、後段で必要がないのにDFの配列がexplodeされています。
- 誰かがすべてのカラムを参照、利用する場合には、メモリーの問題に遭遇しないように、explodeの後にexplodeした列をすぐに削除しましょう。
- パーティションサイズを削減するために、explode()オペレーションの直前により大きい数にデータフレームをrepartitionしましょう。
df = df.repartition(higher_number_of_partitions)
df = df.select(col("id"), explode(col("array_col"))
- (データの構造によって)explode()がデータのデカルト積を生成することと等価の場合には、データを中間データフレームに分割(explodeされる必要があるカラムとjoinカラムを持つデータフレームと、残りのカラムを持つデータフレーム)し、Deltaテーブルに結果を書き込んでjoinしましょう。これによって、パーティションサイズとパフォーマンスを改善します。
こちらではなく:
df = df.select(col("id"), col("other_col"), explode(col("array_col"))
こちらを使いましょう:
df.select("id", explode("array_col")).write.saveAsTable("table1")
df.select("id", "other_col").write.saveAsTable("table2")
table1 = spark.read.table("table1")
table2 = spark.read.table("table2")
flattened_result = table1.join(table2, on="id")
まとめ: パフォーマンスを改善するためによりスマートなSparkコードを書きましょう
これらのよくある間違いを避けることで、Sparkアプリケーションの効率性と安定性を劇的に改善することができます。これらのベストプラクティスを適用することで、チームは処理時間を削減し、コストを低減し、よりスケーラブルなデータパイプラインを構築することができます。
あなたのSparkワークフローを最適化したいですか?すぐにコードをこれらのアンチパターンの観点でレビューし、ベストプラクティスを実装しましょう!