LoginSignup
1
0

More than 1 year has passed since last update.

【トリビアのDelta Lake】#11 Pysparkで増分データ内の重複を削除する

Last updated at Posted at 2022-12-25

データハンドリングにおいて、データの品質を高める意味でも「重複データの削除」は重要だと思います。

Deltaテーブルを更新する際、元データ側から取得する増分データは、理想的には重複のない、真の増分のみであるべき。
しかし、なんらかの事情で、その増分データ内に重複があった場合、まともにこれを既存Deltaテーブルに追加するとテーブル内に重複が生まれ、品質が落ちてしまいます。

過去にPandasでは実装していたのですが、これをPysparkでやるとどうなるのか。
結論、出来ることには出来たのですが、Pysparkの仕様上少し癖があり、結構ハマったので共有します。
このやり方がベストプラクティスなのかは不明なので、念のため、ご確認の上ご使用くださいませ。

Pandasでやるには?

「Python データフレーム 重複削除」とググりますと、やはり情報量の多いPandasに関する方法が登場します。

Pandasでは、重複行を見つける(抽出する)のには「duplicated()」関数、削除するのには「drop_duplicated()」関数を使うみたいです。
では、ここで公式ドキュメントを見てみましょう。

これによると、duplicate関数は

DataFrame.duplicated(subset=None, keep='first')

とあるように、「subset」「keep」という2つの引数があることが分かりました。
subsetはデータ重複を判定する列を指定し、keepはどの重複を残すかを指定します。

なので、上の条件で言えば、subset関数で重複を判断するカラムを選択(複数可)し、
keep引数をfirstに設定することで、重複のあるデータのうち一番上のデータを残すことが出来ます。

Keep引数がPysparkには無い

じゃあ、PySparkでは?
PySparkにもPandasと同じく、dropDuplicates関数があります(drop以降をアンスコで繋いでいないという差はある)。

これを見ると、keep引数がない!
よって、重複があるデータのうちどれを残すべきか?PySparkでは、この関数だけでは指定できないことになります。

なので、対応策として考えるのは以下。

from pyspark.sql.function import col,desc
incremental_sdf = spark.read.format("delta").load("incremental_delta")
drop_dupulicated_incremental_sdf = (
    incremental_sdf
    .orderBy(col(<更新日付カラム>).desc())
    .coalesce(1)
    .dropDuplicates([<重複判断カラム>])
)

やっていることは、Pandasではkeep引数でやってくれていたことを、PySparkではorderBy,coalease関数を組み合わせて代用するというもの。
データの新旧を、レコードが持つ更新日付カラムを降順に並び替えることで判断し、coalesce関数で一番上(つまり、最新データ)だけ残す。
あとはdropDuplicate関数で、最新データ以外を削除する。

あとは、ここで定義した重複のなくなった増分データ=dataframeで、Merge関数を用いて既存Deltaテーブルを更新し、要件を満たすことが出来ると思います。

マスターデータや、日によってレコードのステータスが変化するデータなどの分析において、もしかしたら使える技かもしれません。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0