動機
SPARKのワーカーノードを多数建ててJOINするとき、一部のノードだけ忙しくて他のノードが使われてなかったり、速度が出ないときがある。JOINが原因であることが時々ある。
参考文献
High Performance Spark p.75 に詳しく書いてある
このスライドもいい。
https://www.slideshare.net/databricks/optimizing-apache-spark-sql-joins
例
次の例を考えてみる
tableA.createOrReplaceTmpView("tableA")
tableB.createOrReplaceTmpView("tableB")
joinedtable = spark.sql("""
SELECT
a.*,
b.*
FROM tableA as a
JOIN tableB as b
ON
a.key = b.key""")
suffle join
suffle joinでは、A.key=B.key
の比較をするためには、AとBの同一キーのレコードが同じノード上に存在し無ければならない。例えば、A.key=2のデータとB.key=2のデータがバラバラのノードに格納されていてはJOINができない。A.key=2のデータとB.key=2のデータが全部ノード1に集めてからJOINを始める。
この場合、たとえば、A.key=2
だけ全体の8割を占めるといった、A.key
の出現頻度に偏りが発生していると、1つのノードにテーブルAの8割が存在することになってしまう。これではいくらワーカーノードを用意しても速くならない。
broadcast join
これはbroadcast joinにすれば解決することがある。この方法ではテーブルBをすべてのノードに配布してしまう。
全てのノードにテーブルBのすべてのデータがあれば、先ほどのようにA.key=2のデータをノード1にすべて集めてくる作業は必要なくなる。
次の書き方だとbroadcastjoin を強制できる。
まずspark-submitするときにspark.sql.autoBroadcastJoinThreshold
の値を設定する。なんでもいいので大きい値。この値より小さいサイズのテーブル(DataFrame)はbroadcastできる。(設定してもしなくても関係ない気がする。要検証)
spark-submit \
--master yarn \
--conf spark.sql.autoBroadcastJoinThreshold=1099511627776 \
main.py
さらにpythonファイルの中でbroadcastしたいDataFrameはbroadcast(table)
しておく。
# tableA と tableBは事前に作ってあるものとする
from pyspark.sql.functions import *
tableA.createOrReplaceTmpView("tableA")
tableB_b = broadcast(tableB)
tableB_b.createOrReplaceTmpView("tableB_b")
joinedtable = spark.sql("""
SELECT
a.*,
b.*
FROM
tableA as a
JOIN
tableB_b as b
ON
a.key = b.key
""")
ホントにbroadcast join になっているか見るには.explain()
で確認する。
spark.sql("""
SELECT
a.*,
b.*
FROM
tableA as a
JOIN
tableB_b as b
ON
a.key = b.key
""").explain() # で実行計画を表示
broadcast hash join
と表示されれば成功
まとめ
- suffle join が有効なとき
- keyの種類がたくさんあり、かつ
- keyの分布が均等である
- broadcast joinが有効なとき、
- テーブルBが小さい
おわり