はじめに
PySparkにおいて、他方のDataFrameにないレコードを取得したい場合、letfanti joinする方法とexcept演算を行う方法とで振る舞いの違いが気になったので比較した。
検証コード
検証コード
import numpy as np
import pandas as pd
import random
n = 100; m = 10000
a = spark.createDataFrame(pd.DataFrame({'i': range(n)}))
b = spark.createDataFrame(pd.DataFrame({'j': [''.join(random.choice('qwertyuiopasdfghjklzxcvbnm') for _ in range(10)) for _ in range(m)]}))
x = a.crossJoin(b).persist()
y = x.sample(fraction=0.001).persist()
x.count(); y.count()
data1 = x.join(y, ['i', 'j'], 'left_anti') # leftanti
data2 = x.exceptAll(y) # except
leftanti join
explainメソッドでPhysicalPlanを確認すると大まかに以下の手順の処理になる。
- Joinする
- 上記の例だとBroadcastHashJoin
- Planからは分からないが、結合後にright tableの結合key is Nullで判定?
except all
同様に、explainメソッドでPhysicalPlanを確認すると大まかに以下の手順の処理になる。
※PySparkのDataFrameで提供されているのは、except allのみでexceptはない認識
- 一方のDataFrameに1、もう一方のDataFrameに-1の列Vを追加する
- Unionする
- 結合keyでHashAggregateにより、Vのsumを集計する
- sum(V)>0なレコードを取得
- sum(V)>1なレコードを複製する
- except all処理なので、重複行を再現する
実行時間
上記の例での実行時間を比較してみた。
扱うデータや環境によるだろうが、Joinするだけで済むので、BroadcashHashJoinできる条件下では、leftanti joinした方が早そう?
時間 | |
---|---|
leftanti join | 36.2 s ± 885 ms |
except | 40.2 s ± 374 ms |