Job fails with Spark Shuffle FetchFailedException error - Databricksの翻訳です。
本書は著者が手動で翻訳したものであり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
FetchFailedExceptionエラーのワークアラウンドとして、デフォルトのSparkシャッフルサービスを無効化しましょう。
問題
あなたのアプリケーションに、何かしらの集計処理やjoinステージが含まれているう場合、その実行にはSparkのシャッフルステージが必要となります。使用されている特定の設定によりますが、インタラクティブクラスターで複数のストリーミングクエリーを実行している際、シャッフルのFetchFailedException
エラーに遭遇するかもしれません。
ShuffleMapStage has failed the maximum allowable number of times
DAGScheduler: ShuffleMapStage 499453 (start at command-39573728:13) failed in 468.820 s due to
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 228703
org.apache.spark.shuffle.FetchFailedException: Connection reset by peer
at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:747)
Caused by: java.io.IOException: Connection reset by peer
原因
シャッフルフェッチの失敗は通常、クラスターのダウンスケーリングのイベント、エグゼキューターの喪失、ワーカーの停止のようなシナリオで発生します。特定のケースでは、エグゼキューターからのシャッフルファイルが失われます。後段のタスクはシャッフルファイルをフェッチしようとしますが失敗します。
Databricksではシャッフルサービスがデフォルトで有効化されています。このサービスは、エグゼキューターを安全に削除できるように、エグゼキューターによって書き込まれたファイルを保持する外部のシャッフルサービスを有効化します。
現状のシャッフルサービスの値を取得するために、PythonやScalaのノートブックセルでspark.conf.get("spark.shuffle.service.enabled")
を実行します。
spark.conf.get("spark.shuffle.service.enabled")
ソリューション
デフォルトのSparkシャッフルサービスを無効化します。
シャッフルサービスを無効化しても、シャッフルをとめるわけではありません。実行の方法を変更するだけです。サービスが無効化されると、シャッフルはエグゼキューターで実行されます。
クラスターのSpark config(AWS | Azure | GCP)にspark.shuffle.service.enabled false
を追加することで、シャッフルサービスを無効化する事ができます。
Spark configを更新した後でクラスターを再起動します。
注意
シャッフルサービスを無効にすると、パフォーマンスに若干のインパクトがあります。