Spark

SparkでJOINするときの注意点

動機

SPARKのワーカーノードを多数建ててJOINするとき、一部のノードだけ忙しくて他のノードが使われてなかったり、速度が出ないときがある。JOINが原因であることが時々ある。

参考文献

High Performance Spark p.75 に詳しく書いてある

このスライドもいい。
https://www.slideshare.net/databricks/optimizing-apache-spark-sql-joins

次の例を考えてみる

tableA.createOrReplaceTmpView("tableA")
tableB.createOrReplaceTmpView("tableB")
joinedtable = spark.sql("""
SELECT 
a.*,
b.*
FROM tableA as a
JOIN tableB as b
ON 
a.key = b.key""")

suffle join

suffle joinでは、A.key=B.keyの比較をするためには、AとBの同一キーのレコードが同じノード上に存在し無ければならない。例えば、A.key=2のデータとB.key=2のデータがバラバラのノードに格納されていてはJOINができない。A.key=2のデータとB.key=2のデータが全部ノード1に集めてからJOINを始める。

この場合、たとえば、A.key=2だけ全体の8割を占めるといった、A.keyの出現頻度に偏りが発生していると、1つのノードにテーブルAの8割が存在することになってしまう。これではいくらワーカーノードを用意しても速くならない。

broadcast join

これはbroadcast joinにすれば解決することがある。この方法ではテーブルBをすべてのノードに配布してしまう。
全てのノードにテーブルBのすべてのデータがあれば、先ほどのようにA.key=2のデータをノード1にすべて集めてくる作業は必要なくなる。

次の書き方だとbroadcastjoin を強制できる。

まずspark-submitするときにspark.sql.autoBroadcastJoinThresholdの値を設定する。なんでもいいので大きい値。この値より小さいサイズのテーブル(DataFrame)はbroadcastできる。(設定してもしなくても関係ない気がする。要検証)

spark-submit.sh
spark-submit \
--master yarn \
--conf spark.sql.autoBroadcastJoinThreshold=1099511627776 \
main.py

さらにpythonファイルの中でbroadcastしたいDataFrameはbroadcast(table)しておく。

main.py
#tableA と tableBは事前に作ってあるものとする
from pyspark.sql.functions import *
tableA.createOrReplaceTmpView("tableA")
tableB_b = broadcast(tableB)
tableB_b.createOrReplaceTmpView("tableB_b")
joinedtable = spark.sql("""
SELECT
a.*,
b.*
FROM
tableA as a
JOIN 
tableB_b as b
ON
a.key = b.key
""")

ホントにbroadcast join になっているか見るには.explain()で確認する。

spark.sql("""
SELECT
a.*,
b.*
FROM
tableA as a
JOIN 
tableB_b as b
ON
a.key = b.key
""").explain() # で実行計画を表示

broadcast hash joinと表示されれば成功

まとめ

  • suffle join が有効なとき
    • keyの種類がたくさんあり、かつ
    • keyの分布が均等である
  • broadcast joinが有効なとき、
    • テーブルBが小さい

おわり