7
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Mergeを用いたSCD(Slowly Changing Dimension) Type 2

Posted at

こちらのサンプルノートブックをウォークスルーした内容となっています。

翻訳版のサンプルノートブックはこちらにあります。

SCDとは

SCD(Slowly Changing Dimensions)とは、データの変更点を捕捉して処理を行うチェンジデータキャプチャ(CDC)の実現方法の一つです。

変化するデータを取り扱う際(CDC)、多くの場合、最新のデータを追跡するためにレコードを更新する必要があります。SCD Type 2は、オリジナルのデータを保持するようにターゲットの変更を適用する方法です。例えば、データベースにあるユーザーのエンティティが異なる住所に引っ越した際、そのユーザーの以前の全ての住所を格納することができます。DLTは、変更の監査証跡を維持する必要がある企業のためにSCD Type 2をサポートします。SCD2は値の全ての履歴を保持します。属性の値が変更した際、現在のレコードがクローズされ、変更された値を持つ新たなレコードが作成され、新たなレコードが現在のレコードになります。なお、SCD Type 1は履歴を保持せずに更新を行います。

  • SCD Type 1: 変更履歴を保持せずに変更を反映
  • SCD Type 2: 変更履歴を保持して変更を反映

Screen Shot 2022-08-25 at 17.26.16.png

DeltaのMergeで実現するSCD Type 2

チェンジデータキャプチャに関してはこちらもご覧ください。Delta Live TablesでもSCD Type 2をサポートしています。

Delta LakeでサポートしているMergeを用いることで、デフォルトの挙動はSCD Type 1となります。追加の処理を実装することでSCD Type 2を実現することができます。

Customers Deltaテーブル

これは我々がアップデートするslowly changingテーブルです。すべての顧客に対して、複数の住所が存在し得ます。しかし、それぞれの住所にはその住所が有効であったeffectiveDateからendDateに至る期間も存在します。さらに、それぞれの顧客において現在適切な住所であることを示す別のフィールドcurrentが存在します。すなわち、それぞれの顧客においてcurrenttrueである住所は1行のみであり、他の行はfalseになります。

Scala
import java.sql.Date
import java.text._
import spark.implicits

case class CustomerUpdate(customerId: Int, address: String, effectiveDate: Date)
case class Customer(customerId: Int, address: String, current: Boolean, effectiveDate: Date, endDate: Date)

implicit def date(str: String): Date = Date.valueOf(str)

sql("drop table if exists customers")

Seq(
  Customer(1, "old address for 1", false, null, "2018-02-01"),
  Customer(1, "current address for 1", true, "2018-02-01", null),
  Customer(2, "current address for 2", true, "2018-02-01", null),
  Customer(3, "current address for 3", true, "2018-02-01", null)
).toDF().write.format("delta").mode("overwrite").saveAsTable("customers")

display(table("customers").orderBy("customerId"))

Screen Shot 2022-08-25 at 17.16.04.png

Updatesテーブル

これは新規の住所を含むアップデートテーブルです。それぞれの顧客に対して、いつから有効かを示す日付と住所が含まれています。

簡単にするために、同じcaseクラスを用いており、フィールドcurrentendDateを無視していることに注意してください。ここではこれらを使用しません。

このテーブルには顧客あたり1行が含まれておりeffectiveDateが適切に設定されている必要があります。

Scala
Seq(
  CustomerUpdate(1, "new address for 1", "2018-03-03"),
  CustomerUpdate(3, "current address for 3", "2018-04-04"),    // customer 3においては現在のアドレスと新規のアドレスは同じです。
  CustomerUpdate(4, "new address for 4", "2018-04-04")
).toDF().createOrReplaceTempView("updates")

// 注意: 
// - effectiveDateはSCD Type2のMergeの後にcustomersテーブルにコピーされるので、ソーステーブルにeffectiveDateが適切に設定されるようにしてください
// - 顧客ごとに1行だけが存在するようにしてください

display(table("updates"))

Screen Shot 2022-08-25 at 17.17.06.png

SCD Type 2を実行するためのMerge文

このMerge文はソーステーブルのそれぞれの顧客に対して以下の処理を同時に行います。

  • currentがtrueに設定された新規アドレスをinsert
  • 以前の現在行のcurrentをfalseにupdateし、endDatenullからソースのeffectiveDateにupdate
SQL
-- ========================================
-- Merge SQL APIはDBR 5.1以降で利用できます
-- ========================================

MERGE INTO customers
USING (
   -- これらの行は既存顧客の現在の住所をUPDATEし、新規顧客の新規住所をINSERTします
  SELECT updates.customerId as mergeKey, updates.*
  FROM updates
  
  UNION ALL
  
  -- これらの行は既存顧客の新規住所をINSERTします 
  -- mergeKeyをNULLに設定すると、これらの行はNOT MATCHとなりINSERTが強制されます
  SELECT NULL as mergeKey, updates.*
  FROM updates JOIN customers
  ON updates.customerid = customers.customerid 
  WHERE customers.current = true AND updates.address <> customers.address 
  
) staged_updates
ON customers.customerId = mergeKey
WHEN MATCHED AND customers.current = true AND customers.address <> staged_updates.address THEN  
  UPDATE SET current = false, endDate = staged_updates.effectiveDate    -- currentをfalse、endDateをソースのeffective dateに設定します
WHEN NOT MATCHED THEN 
  INSERT(customerid, address, current, effectivedate, enddate) 
  VALUES(staged_updates.customerId, staged_updates.address, true, staged_updates.effectiveDate, null) -- currentをtrue、新規の住所とeffective dateに設定します

更新されたCustomersテーブル

  • customer 1では以前の住所はcurrent = falseとなり、新規の住所がcurrent = trueに設定されます
  • customer 2では更新はありません
  • customer 3では新規住所は以前の住所と同じであるので更新はされません
  • customer 4では新規住所がinsertされます
Scala
display(table("customers").orderBy("customerId", "current", "endDate"))

Screen Shot 2022-08-25 at 17.18.56.png

Databricks 無料トライアル

Databricks 無料トライアル

7
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?