0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Sparkでexpire_snapshotsが失敗するときの対処方法

Posted at

SparkでIcebergテーブルをメンテナンスするためのプロシージャの1つに、expire_snapshotsがある。
https://iceberg.apache.org/docs/1.7.0/spark-procedures/#expire_snapshots

spark.sql(f'''
    CALL system.expire_snapshots(
        table => '{database_name}.{table_name}'
    )
''').show(truncate=False)

テーブルサイズが大きくなってきたときに、上記のコマンドで古いスナップショット全て削除しようとしたら以下のエラーに遭遇した。既に30分実行した後にこれはつらかった。

Py4JJavaError: An error occurred while calling o162.sql.
: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 457 tasks (1026.5 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2863)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2799)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2798)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    ...

原因はエラーメッセージの通り、処理途中でドライバーに割り当てたmaxResultSize (spark.driver.maxResultSize) を超えてしまっていること。内部実装は調べきれていないが、expire_snapshotsはドライバーにデータを集約する処理がある模様。

対処方法としてはドライバー用のインスタンスのスペックを上げる、spark.driver.maxResultSizeの設定値を上げるなどが考えられるが、必要なメモリサイズを事前に予測することは難しい。トライ&エラーするにも1回の実行時間が長いので、無駄なコストを払うことにもなりかねない。

改めてdocを読むと、stream_resultsというパラメータを発見した。

When true, deletion files will be sent to Spark driver by RDD partition (by default, all the files will be sent to Spark driver). This option is recommended to set to true to prevent Spark driver OOM from large file size

stream_resultstrue にすることでdeletion filesをdriverに送らなくなるため、driverのOOMを防げるらしい。

spark.sql(f'''
    CALL system.expire_snapshots(
        table => '{database_name}.{table_name}',
        stream_results => true
    )
''').show(truncate=False)

早速使ってみるも、再び同じエラー。deletion files以外はdriverに送られている、ということなのだろうか。

Py4JJavaError: An error occurred while calling o162.sql.
: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 452 tasks (1025.3 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2863)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2799)

再びGitHubのissueに戻ると、
spark.sql.autoBroadcastJoinThreshold=-1を設定することで回避できるとのコメントがあった。
https://github.com/apache/iceberg/issues/3703#issuecomment-2391600893

spark = SparkSession.builder \
    .config(f"spark.sql.catalog.{catalog_name}", "org.apache.iceberg.spark.SparkCatalog") \
    .config(f"spark.sql.catalog.{catalog_name}.warehouse", f"{warehouse_path}") \
    .config(f"spark.sql.catalog.{catalog_name}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
    .config(f"spark.sql.catalog.{catalog_name}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.autoBroadcastJoinThreshold", -1) \
    .getOrCreate()

再び実行すると今度は成功した。

spark.sql(f'''
    CALL system.expire_snapshots(
        table => '{database_name}.{table_name}',
        stream_results => true
    )
''').show(truncate=False)

+------------------------+-----------------------------------+-----------------------------------+----------------------------+----------------------------+
|deleted_data_files_count|deleted_position_delete_files_count|deleted_equality_delete_files_count|deleted_manifest_files_count|deleted_manifest_lists_count|
+------------------------+-----------------------------------+-----------------------------------+----------------------------+----------------------------+
|0                       |0                                  |0                                  |0                           |82                          |
+------------------------+-----------------------------------+-----------------------------------+----------------------------+----------------------------+

spark.sql.autoBroadcastJoinThresholdを-1に設定した場合、自動的なブロードキャストジョインが無効になる(Sparkのデフォルトは10MB)。
https://spark.apache.org/docs/3.5.3/sql-performance-tuning.html

-1を設定すると基本的に結合操作はSortMergeJoinで実行されるが、SortMergeJoinは高コストなため一般には推奨されない。今回のようにメモリに制約がある環境での実行時やブロードキャストによるメモリ不足を防ぎたい場合に-1を設定するが、パフォーマンスとのトレードオフが発生する点には注意が必要となる。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?