Edited at

SparkでJOINするときの注意点

More than 1 year has passed since last update.


動機

SPARKのワーカーノードを多数建ててJOINするとき、一部のノードだけ忙しくて他のノードが使われてなかったり、速度が出ないときがある。JOINが原因であることが時々ある。


参考文献

High Performance Spark p.75 に詳しく書いてある

このスライドもいい。

https://www.slideshare.net/databricks/optimizing-apache-spark-sql-joins


次の例を考えてみる

tableA.createOrReplaceTmpView("tableA")

tableB.createOrReplaceTmpView("tableB")
joinedtable = spark.sql("""
SELECT
a.*,
b.*
FROM tableA as a
JOIN tableB as b
ON
a.key = b.key""")


suffle join

suffle joinでは、A.key=B.keyの比較をするためには、AとBの同一キーのレコードが同じノード上に存在し無ければならない。例えば、A.key=2のデータとB.key=2のデータがバラバラのノードに格納されていてはJOINができない。A.key=2のデータとB.key=2のデータが全部ノード1に集めてからJOINを始める。

この場合、たとえば、A.key=2だけ全体の8割を占めるといった、A.keyの出現頻度に偏りが発生していると、1つのノードにテーブルAの8割が存在することになってしまう。これではいくらワーカーノードを用意しても速くならない。


broadcast join

これはbroadcast joinにすれば解決することがある。この方法ではテーブルBをすべてのノードに配布してしまう。

全てのノードにテーブルBのすべてのデータがあれば、先ほどのようにA.key=2のデータをノード1にすべて集めてくる作業は必要なくなる。

次の書き方だとbroadcastjoin を強制できる。

まずspark-submitするときにspark.sql.autoBroadcastJoinThresholdの値を設定する。なんでもいいので大きい値。この値より小さいサイズのテーブル(DataFrame)はbroadcastできる。(設定してもしなくても関係ない気がする。要検証)


spark-submit.sh

spark-submit \

--master yarn \
--conf spark.sql.autoBroadcastJoinThreshold=1099511627776 \
main.py

さらにpythonファイルの中でbroadcastしたいDataFrameはbroadcast(table)しておく。


main.py

#tableA と tableBは事前に作ってあるものとする

from pyspark.sql.functions import *
tableA.createOrReplaceTmpView("tableA")
tableB_b = broadcast(tableB)
tableB_b.createOrReplaceTmpView("tableB_b")
joinedtable = spark.sql("""
SELECT
a.*,
b.*
FROM
tableA as a
JOIN
tableB_b as b
ON
a.key = b.key
"""
)

ホントにbroadcast join になっているか見るには.explain()で確認する。

spark.sql("""

SELECT
a.*,
b.*
FROM
tableA as a
JOIN
tableB_b as b
ON
a.key = b.key
""").explain() # で実行計画を表示

broadcast hash joinと表示されれば成功


まとめ


  • suffle join が有効なとき


    • keyの種類がたくさんあり、かつ

    • keyの分布が均等である



  • broadcast joinが有効なとき、


    • テーブルBが小さい



おわり