データハンドリングにおいて、データの品質を高める意味でも「重複データの削除」は重要だと思います。
Deltaテーブルを更新する際、元データ側から取得する増分データは、理想的には重複のない、真の増分のみであるべき。
しかし、なんらかの事情で、その増分データ内に重複があった場合、まともにこれを既存Deltaテーブルに追加するとテーブル内に重複が生まれ、品質が落ちてしまいます。
過去にPandasでは実装していたのですが、これをPysparkでやるとどうなるのか。
結論、出来ることには出来たのですが、Pysparkの仕様上少し癖があり、結構ハマったので共有します。
このやり方がベストプラクティスなのかは不明なので、念のため、ご確認の上ご使用くださいませ。
Pandasでやるには?
「Python データフレーム 重複削除」とググりますと、やはり情報量の多いPandasに関する方法が登場します。
Pandasでは、重複行を見つける(抽出する)のには「duplicated()」関数、削除するのには「drop_duplicated()」関数を使うみたいです。
では、ここで公式ドキュメントを見てみましょう。
これによると、duplicate関数は
DataFrame.duplicated(subset=None, keep='first')
とあるように、「subset」「keep」という2つの引数があることが分かりました。
subsetはデータ重複を判定する列を指定し、keepはどの重複を残すかを指定します。
なので、上の条件で言えば、subset関数で重複を判断するカラムを選択(複数可)し、
keep引数をfirstに設定することで、重複のあるデータのうち一番上のデータを残すことが出来ます。
Keep引数がPysparkには無い
じゃあ、PySparkでは?
PySparkにもPandasと同じく、dropDuplicates関数があります(drop以降をアンスコで繋いでいないという差はある)。
これを見ると、keep引数がない!
よって、重複があるデータのうちどれを残すべきか?PySparkでは、この関数だけでは指定できないことになります。
なので、対応策として考えるのは以下。
from pyspark.sql.function import col,desc
incremental_sdf = spark.read.format("delta").load("incremental_delta")
drop_dupulicated_incremental_sdf = (
incremental_sdf
.orderBy(col(<更新日付カラム>).desc())
.coalesce(1)
.dropDuplicates([<重複判断カラム>])
)
やっていることは、Pandasではkeep引数でやってくれていたことを、PySparkではorderBy,coalease関数を組み合わせて代用するというもの。
データの新旧を、レコードが持つ更新日付カラムを降順に並び替えることで判断し、coalesce関数で一番上(つまり、最新データ)だけ残す。
あとはdropDuplicate関数で、最新データ以外を削除する。
あとは、ここで定義した重複のなくなった増分データ=dataframeで、Merge関数を用いて既存Deltaテーブルを更新し、要件を満たすことが出来ると思います。
マスターデータや、日によってレコードのステータスが変化するデータなどの分析において、もしかしたら使える技かもしれません。