SparkでIcebergテーブルをメンテナンスするためのプロシージャの1つに、expire_snapshots
がある。
https://iceberg.apache.org/docs/1.7.0/spark-procedures/#expire_snapshots
spark.sql(f'''
CALL system.expire_snapshots(
table => '{database_name}.{table_name}'
)
''').show(truncate=False)
テーブルサイズが大きくなってきたときに、上記のコマンドで古いスナップショット全て削除しようとしたら以下のエラーに遭遇した。既に30分実行した後にこれはつらかった。
Py4JJavaError: An error occurred while calling o162.sql.
: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 457 tasks (1026.5 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2863)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2799)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2798)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
...
原因はエラーメッセージの通り、処理途中でドライバーに割り当てたmaxResultSize (spark.driver.maxResultSize
) を超えてしまっていること。内部実装は調べきれていないが、expire_snapshots
はドライバーにデータを集約する処理がある模様。
対処方法としてはドライバー用のインスタンスのスペックを上げる、spark.driver.maxResultSize
の設定値を上げるなどが考えられるが、必要なメモリサイズを事前に予測することは難しい。トライ&エラーするにも1回の実行時間が長いので、無駄なコストを払うことにもなりかねない。
改めてdocを読むと、stream_results
というパラメータを発見した。
When true, deletion files will be sent to Spark driver by RDD partition (by default, all the files will be sent to Spark driver). This option is recommended to set to true to prevent Spark driver OOM from large file size
stream_results
を true
にすることでdeletion filesをdriverに送らなくなるため、driverのOOMを防げるらしい。
spark.sql(f'''
CALL system.expire_snapshots(
table => '{database_name}.{table_name}',
stream_results => true
)
''').show(truncate=False)
早速使ってみるも、再び同じエラー。deletion files以外はdriverに送られている、ということなのだろうか。
Py4JJavaError: An error occurred while calling o162.sql.
: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 452 tasks (1025.3 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2863)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2799)
再びGitHubのissueに戻ると、
spark.sql.autoBroadcastJoinThreshold=-1
を設定することで回避できるとのコメントがあった。
https://github.com/apache/iceberg/issues/3703#issuecomment-2391600893
spark = SparkSession.builder \
.config(f"spark.sql.catalog.{catalog_name}", "org.apache.iceberg.spark.SparkCatalog") \
.config(f"spark.sql.catalog.{catalog_name}.warehouse", f"{warehouse_path}") \
.config(f"spark.sql.catalog.{catalog_name}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
.config(f"spark.sql.catalog.{catalog_name}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
.config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.autoBroadcastJoinThreshold", -1) \
.getOrCreate()
再び実行すると今度は成功した。
spark.sql(f'''
CALL system.expire_snapshots(
table => '{database_name}.{table_name}',
stream_results => true
)
''').show(truncate=False)
+------------------------+-----------------------------------+-----------------------------------+----------------------------+----------------------------+
|deleted_data_files_count|deleted_position_delete_files_count|deleted_equality_delete_files_count|deleted_manifest_files_count|deleted_manifest_lists_count|
+------------------------+-----------------------------------+-----------------------------------+----------------------------+----------------------------+
|0 |0 |0 |0 |82 |
+------------------------+-----------------------------------+-----------------------------------+----------------------------+----------------------------+
spark.sql.autoBroadcastJoinThreshold
を-1に設定した場合、自動的なブロードキャストジョインが無効になる(Sparkのデフォルトは10MB)。
https://spark.apache.org/docs/3.5.3/sql-performance-tuning.html
-1を設定すると基本的に結合操作はSortMergeJoinで実行されるが、SortMergeJoinは高コストなため一般には推奨されない。今回のようにメモリに制約がある環境での実行時やブロードキャストによるメモリ不足を防ぎたい場合に-1を設定するが、パフォーマンスとのトレードオフが発生する点には注意が必要となる。