0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

pyspark における効率的な join 操作 - salted_join

Last updated at Posted at 2025-05-09

pyspark における効率的なjoin操作

  • 問題:pysparkのjoinが非常に遅かったりメモリ不足で処理ストップしたり分析が滞ってしまった
  • 原因:data skew や アンバランスなパーティション
  • 解決方法:broadcast, salting, バランスよくパーティションする

下部のreferenceに記載されている記事を参考に、pyspark における効率的な join (left, innerのみ対応) を実装しました。

github: NatsukiNateYamashita/efficient_join - 効率的な PySpark join

概要

このモジュールは、Apache Sparkでデータ処理を行う際によく発生するdata skew問題を軽減するためのユーティリティ関数を提供します。特に大規模SparkDataFrameと小規模SparkDataFrameのjoin処理でパフォーマンスを向上させます。

機能

  • salted join: 大規模SparkDataFrameと小規模SparkDataFrameをjoinする際のdata skewを軽減します
  • cross join: broadcastを活用した効率的なcross joinを実行します

salted_join() のパフォーマンス改善効果

joinタイプ SparkDataFrameサイズ built-in join() salted_join() 改善率
inner join Df_big: 約4億件
Df_small: 3.5万件
4分20秒 1分53秒 56.5%
inner join Df_big: 約4億件
Df_small: 1万件
3分2秒 1分31秒 50.0%
left join Df_big: 約4億件
Df_small: 3.5万件
19.14秒 15.31秒 15.6%
left join Df_big: 約4億件
Df_small: 1万件
7.98秒 6.39秒 20.0%
  • inner join: 約50~56%の性能改善
  • left join: 約15~20%の性能改善

[実験環境]

  • Azure Databricks
    • Policy: Shared Compute
    • Access mode: Standard
    • Databricks Runtime Version: 15.4 LTS (includes Apache Spark 3.5.0, Scala 2.12)
    • Worker type: Standard_F4s 1-4 workers
    • Driver type: Standard_F4s

効率的な join function の 説明

salted_join(df_big, big_key_column, df_small, small_key_column, join_type)

大規模SparkDataFrameのキーに「salt」を追加し、小規模SparkDataFrameを複製することでdata skewを軽減します。

引数:

  • df_big: 大規模SparkSparkDataFrame
  • big_key_column: 大規模SparkDataFrameのjoinキー列名
  • df_small: 小規模SparkSparkDataFrame
  • small_key_column: 小規模SparkDataFrameのjoinキー列名
  • join_type: joinタイプ(inner、left、leftouter、left_outer、leftanti、left_antiのいずれか)

戻り値:

  • joinされたSparkSparkDataFrame

利点:

  • data skew問題の解決: パーティション間のデータ分布の偏りによる処理のボトルネックを防止します
  • 分散コンピューティング効率の向上: ワーカーノード間でデータを均等に分散させることで、並列処理を最適化します
  • エグゼキューターの障害防止: skewしたキーを処理する特定のエグゼキューターでのメモリ不足エラーのリスクを軽減します
  • 処理時間の短縮: バランスの取れたワークロードにより、全体的なジョブ完了が高速化します

制限事項:

  • メモリエラーの可能性があるため、使用可能なjoinタイプは限定されています

crossjoin(df_big, df_small)

大規模SparkDataFrameと小規模SparkDataFrameのcross joinを、小規模SparkDataFrameをbroadcastして効率的に実行します。

引数:

  • df_big: 大規模SparkSparkDataFrame
  • df_small: 小規模SparkSparkDataFrame(broadcastされます)

戻り値:

  • cross joinされたSparkSparkDataFrame

salted joinの仕組み

このユーティリティは以下の手法でdata skew問題に対処します:

  1. キーのsalt化: 大規模SparkDataFrameのキーにランダムな接尾辞(0-2)を追加し、各キーに対して複数の仮想パーティションを効果的に作成します
  2. 小規模SparkDataFrameの展開: 小規模SparkDataFrameの各行を対応する接尾辞で3回複製し、salt化されたキーとマッチングできるようにします
  3. バランスの取れた分散: 高頻度キーのすべてのレコードを1つのパーティションで処理する代わりに、作業を複数のパーティションに分散します

変換プロセス:

  • 元のjoin条件: df_big.key = df_small.key
  • 変換後のjoin条件: df_big.key_salted = concat(df_small.key, "_", exploded_value)

このアプローチにより、Sparkクラスター内のエグゼキューターノード間でワークロードが均等に分散され、並列処理効率が大幅に向上します。

使用例

from spark_utils import salted_join, crossjoin

# salted joinの使用例
result_df = salted_join(
    large_customer_df, 
    "customer_id", 
    small_product_df, 
    "customer_id", 
    "left"
)

# cross joinの使用例
result_cross_df = crossjoin(transaction_df, calendar_df)

成果と課題

salted joinアプローチには以下の成果と課題があります:

  • 成果:
    • skewしたキー分布を持つSparkDataFrameで大幅なパフォーマンス向上
    • クラスター全体でのリソース使用率の向上
  • 今後の課題
    • 前処理(saltの追加と小さいSparkDataFrameの展開)による若干のオーバーヘッド
    • saltの数(現在は3に設定)はskewの深刻度に基づいて調整が必要な場合があります

参考文献

このユーティリティの開発には以下の文献を参考にしました:

注記

本記事、および、github: efficient_join/README.mdは、コードと参考文献に基づいてClaude 3.7 Sonnetによって生成されました

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?