Python
Scala
Spark

SparkでDataFrameのjoinする時に、結合キーにnullが入るとレコードが落ちるのを解決する話

テーブルを結合する際、普通、結合キーにnull値が含まれるとそのレコードは結合されないが、
Apache Sparkで無理やりnullレコードをくっつけるやり方

null値が落ちる例
val data1 =
sc.parallelize(
    Array(
        ("a",  1),
        ("b",  2),
        ("c",  3),
        (null, 4)
    )
).toDF("k", "v1")

val data2 =
sc.parallelize(
    Array(
        ("a",  "x"),
        ("b",  "v"),
        ("c",  "z"),
        (null, "w")
    )
).toDF("k", "v2")

data1.join(data2, Seq("k"), joinType="inner").show()

これを実行すると、k列がnullのレコードは落ちてしまう。
それを避けたい場合、Sparkでは以下のようにすればよい。

scala解決法
data1
.join(
    data2,
    data1("k") <=> data2("k"),
    joinType="inner"
).select(data1("k"), $"v1", $"v2").show()
python解決法
data1 \
.join(
    data2,
    (data1["k"] == data2["k"]) | (data1["k"].isNull() & data2["k"].isNull()),
    "inner"
).select(data1["k"], "v1", "v2").show()

pythonはもっと楽で効率の良いやり方がありそう。
上記のやり方だとちょっと遅く感じました。 

[追記]
withColumnで、欠損埋めした列を作った方が多分良さそう。
特に結合キーが複数ある場合は、cross joinするらしく、エラーが出る(scalaで確認)。

withColumn例
data1
.withColumn("k", when($"k".isNull, lit("")).otherwise($"k"))
.join(
    data2.withColumn("k", when($"k".isNull, lit("")).otherwise($"k")),
    Seq("k"),
    joinType="inner"
).select("k", "v1", "v2").show()