3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

PySparkのleft anti joinとexceptの振る舞いの違いを調べた

Posted at

はじめに

PySparkにおいて、他方のDataFrameにないレコードを取得したい場合、letfanti joinする方法とexcept演算を行う方法とで振る舞いの違いが気になったので比較した。

 検証コード

検証コード
import numpy as np
import pandas as pd
import random

n = 100; m = 10000
a = spark.createDataFrame(pd.DataFrame({'i': range(n)}))
b = spark.createDataFrame(pd.DataFrame({'j': [''.join(random.choice('qwertyuiopasdfghjklzxcvbnm') for _ in range(10)) for _ in range(m)]}))

x = a.crossJoin(b).persist()
y = x.sample(fraction=0.001).persist()

x.count(); y.count()

data1 = x.join(y, ['i', 'j'], 'left_anti')  # leftanti
data2 = x.exceptAll(y)  # except

leftanti join

explainメソッドでPhysicalPlanを確認すると大まかに以下の手順の処理になる。

  1. Joinする
    • 上記の例だとBroadcastHashJoin
    • Planからは分からないが、結合後にright tableの結合key is Nullで判定?

except all

同様に、explainメソッドでPhysicalPlanを確認すると大まかに以下の手順の処理になる。

※PySparkのDataFrameで提供されているのは、except allのみでexceptはない認識

  1. 一方のDataFrameに1、もう一方のDataFrameに-1の列Vを追加する
  2. Unionする
  3. 結合keyでHashAggregateにより、Vのsumを集計する
  4. sum(V)>0なレコードを取得
  5. sum(V)>1なレコードを複製する
    • except all処理なので、重複行を再現する

実行時間

上記の例での実行時間を比較してみた。
扱うデータや環境によるだろうが、Joinするだけで済むので、BroadcashHashJoinできる条件下では、leftanti joinした方が早そう?

時間
leftanti join 36.2 s ± 885 ms
except 40.2 s ± 374 ms
3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?