結論
dropDuplicatesを使うと結果に対して再現性がなくなるので注意が必要。
詳細
Dropduplicatesは重複を無くす列をした際に、どの行を残すかはランダムになります。
また、pandasのdrop_duplicatesと異なり、「先頭を残す」等のオプションはありません。
(理由の記述はなかったのですが、各nodeに散った各情報を集計するのは spark上は大変なのでしょうか?)
サンプルコード
from pyspark.sql import functions as F
data = [
(1, "value_1a"),
(1, "value_1b"),
(1, "value_1c"),
(2, "value_2a"),
(2, "value_2b"),
(2, "value_2c"),
(3, "value_3a"),
(3, "value_3b"),
(3, "value_3c")
]
df = spark.createDataFrame(data, ["key", "value"])
print("==== Original DataFrame ====")
display(df)
for i in range(1, 6):
df_shuffled = df.withColumn("rnd", F.rand())\
.orderBy("rnd")\
.drop("rnd")
deduped = df_shuffled.dropDuplicates(["key"])
print(f"\n==== Run {i} ====")
display(deduped)
上記を実行すると以下のようになります。
==== Run 1 ====
key value
0 1 value_1b
1 2 value_2b
2 3 value_3a
==== Run 2 ====
key value
0 1 value_1c
1 2 value_2c
2 3 value_3c
==== Run 3 ====
key value
0 1 value_1a
1 2 value_2b
2 3 value_3c
==== Run 4 ====
key value
0 1 value_1a
1 2 value_2c
2 3 value_3c
==== Run 5 ====
key value
0 1 value_1a
1 2 value_2c
2 3 value_3c
対策
求めたい結果にもよりますが、sortして先頭行だけを持ってくるなど、丁寧な処理が必要です。
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
w = Window.partitionBy("key").orderBy("value")
df_deterministic = (
df
.withColumn("rn", row_number().over(w))
.filter("rn = 1")
.drop("rn")
)
df_deterministic.show()
上記のコードはsortした上で row_numberで連番をはり、1行目だけ抜き取るコードです。
結論
dropDuplicatesは再現性がなくなるので巨大なETL処理等行う時は注意しましょう。