こちらのサンプルノートブックをウォークスルーした内容となっています。
翻訳版のサンプルノートブックはこちらにあります。
SCDとは
SCD(Slowly Changing Dimensions)とは、データの変更点を捕捉して処理を行うチェンジデータキャプチャ(CDC)の実現方法の一つです。
変化するデータを取り扱う際(CDC)、多くの場合、最新のデータを追跡するためにレコードを更新する必要があります。SCD Type 2は、オリジナルのデータを保持するようにターゲットの変更を適用する方法です。例えば、データベースにあるユーザーのエンティティが異なる住所に引っ越した際、そのユーザーの以前の全ての住所を格納することができます。DLTは、変更の監査証跡を維持する必要がある企業のためにSCD Type 2をサポートします。SCD2は値の全ての履歴を保持します。属性の値が変更した際、現在のレコードがクローズされ、変更された値を持つ新たなレコードが作成され、新たなレコードが現在のレコードになります。なお、SCD Type 1は履歴を保持せずに更新を行います。
- SCD Type 1: 変更履歴を保持せずに変更を反映
- SCD Type 2: 変更履歴を保持して変更を反映
DeltaのMergeで実現するSCD Type 2
チェンジデータキャプチャに関してはこちらもご覧ください。Delta Live TablesでもSCD Type 2をサポートしています。
Delta LakeでサポートしているMergeを用いることで、デフォルトの挙動はSCD Type 1となります。追加の処理を実装することでSCD Type 2を実現することができます。
Customers Deltaテーブル
これは我々がアップデートするslowly changingテーブルです。すべての顧客に対して、複数の住所が存在し得ます。しかし、それぞれの住所にはその住所が有効であったeffectiveDate
からendDate
に至る期間も存在します。さらに、それぞれの顧客において現在適切な住所であることを示す別のフィールドcurrent
が存在します。すなわち、それぞれの顧客においてcurrent
がtrue
である住所は1行のみであり、他の行はfalseになります。
import java.sql.Date
import java.text._
import spark.implicits
case class CustomerUpdate(customerId: Int, address: String, effectiveDate: Date)
case class Customer(customerId: Int, address: String, current: Boolean, effectiveDate: Date, endDate: Date)
implicit def date(str: String): Date = Date.valueOf(str)
sql("drop table if exists customers")
Seq(
Customer(1, "old address for 1", false, null, "2018-02-01"),
Customer(1, "current address for 1", true, "2018-02-01", null),
Customer(2, "current address for 2", true, "2018-02-01", null),
Customer(3, "current address for 3", true, "2018-02-01", null)
).toDF().write.format("delta").mode("overwrite").saveAsTable("customers")
display(table("customers").orderBy("customerId"))
Updatesテーブル
これは新規の住所を含むアップデートテーブルです。それぞれの顧客に対して、いつから有効かを示す日付と住所が含まれています。
簡単にするために、同じcaseクラスを用いており、フィールドcurrent
とendDate
を無視していることに注意してください。ここではこれらを使用しません。
このテーブルには顧客あたり1行が含まれておりeffectiveDate
が適切に設定されている必要があります。
Seq(
CustomerUpdate(1, "new address for 1", "2018-03-03"),
CustomerUpdate(3, "current address for 3", "2018-04-04"), // customer 3においては現在のアドレスと新規のアドレスは同じです。
CustomerUpdate(4, "new address for 4", "2018-04-04")
).toDF().createOrReplaceTempView("updates")
// 注意:
// - effectiveDateはSCD Type2のMergeの後にcustomersテーブルにコピーされるので、ソーステーブルにeffectiveDateが適切に設定されるようにしてください
// - 顧客ごとに1行だけが存在するようにしてください
display(table("updates"))
SCD Type 2を実行するためのMerge文
このMerge文はソーステーブルのそれぞれの顧客に対して以下の処理を同時に行います。
-
current
がtrueに設定された新規アドレスをinsert - 以前の現在行の
current
をfalseにupdateし、endDate
をnull
からソースのeffectiveDate
にupdate
-- ========================================
-- Merge SQL APIはDBR 5.1以降で利用できます
-- ========================================
MERGE INTO customers
USING (
-- これらの行は既存顧客の現在の住所をUPDATEし、新規顧客の新規住所をINSERTします
SELECT updates.customerId as mergeKey, updates.*
FROM updates
UNION ALL
-- これらの行は既存顧客の新規住所をINSERTします
-- mergeKeyをNULLに設定すると、これらの行はNOT MATCHとなりINSERTが強制されます
SELECT NULL as mergeKey, updates.*
FROM updates JOIN customers
ON updates.customerid = customers.customerid
WHERE customers.current = true AND updates.address <> customers.address
) staged_updates
ON customers.customerId = mergeKey
WHEN MATCHED AND customers.current = true AND customers.address <> staged_updates.address THEN
UPDATE SET current = false, endDate = staged_updates.effectiveDate -- currentをfalse、endDateをソースのeffective dateに設定します
WHEN NOT MATCHED THEN
INSERT(customerid, address, current, effectivedate, enddate)
VALUES(staged_updates.customerId, staged_updates.address, true, staged_updates.effectiveDate, null) -- currentをtrue、新規の住所とeffective dateに設定します
更新されたCustomersテーブル
- customer 1では以前の住所は
current = false
となり、新規の住所がcurrent = true
に設定されます - customer 2では更新はありません
- customer 3では新規住所は以前の住所と同じであるので更新はされません
- customer 4では新規住所がinsertされます
display(table("customers").orderBy("customerId", "current", "endDate"))