テーブルを結合する際、普通、結合キーにnull値が含まれるとそのレコードは結合されないが、
Apache Sparkで無理やりnullレコードをくっつけるやり方
null値が落ちる例
val data1 =
sc.parallelize(
Array(
("a", 1),
("b", 2),
("c", 3),
(null, 4)
)
).toDF("k", "v1")
val data2 =
sc.parallelize(
Array(
("a", "x"),
("b", "v"),
("c", "z"),
(null, "w")
)
).toDF("k", "v2")
data1.join(data2, Seq("k"), joinType="inner").show()
これを実行すると、k列がnullのレコードは落ちてしまう。
それを避けたい場合、Sparkでは以下のようにすればよい。
scala解決法
data1
.join(
data2,
data1("k") <=> data2("k"),
joinType="inner"
).select(data1("k"), $"v1", $"v2").show()
python解決法
data1 \
.join(
data2,
(data1["k"] == data2["k"]) | (data1["k"].isNull() & data2["k"].isNull()),
"inner"
).select(data1["k"], "v1", "v2").show()
pythonはもっと楽で効率の良いやり方がありそう。
上記のやり方だとちょっと遅く感じました。
[追記]
withColumnで、欠損埋めした列を作った方が多分良さそう。
特に結合キーが複数ある場合は、cross joinするらしく、エラーが出る(scalaで確認)。
withColumn例
data1
.withColumn("k", when($"k".isNull, lit("")).otherwise($"k"))
.join(
data2.withColumn("k", when($"k".isNull, lit("")).otherwise($"k")),
Seq("k"),
joinType="inner"
).select("k", "v1", "v2").show()