導入
以下のDatabricks公式Blogを読みまして。
DatabricksのDelta Live TablesにAPPLY CHANGE FROM SNAPSHOTというAPIが追加されていることを知りました。
Delta Live Tables(DLT)は、Databricks上のマネージドなETLサービスであり、その中でSlowly Changing Dimensions(SCD)に関わる機能も提供しています。
ただChange Data Feed(CDF)からのSCDのみ対応していたという認識だったのですが、テーブル・ビューなどからの直接SCDを実現するAPIが実装されたということのようです。
これは便利。
というわけで、公式ドキュメントの内容を基に簡単なサンプルを実行してみます。
Step1. 初期テストデータの作成
まずはDLTを実行するためのテストデータを準備します。
ノートブックもしくはSQLエディタ上で以下のコードを実行して、training.dlt_sample.users
テーブルを作成しました。
内容はユーザマスタを想定した適当データです。実在の人物とは一切関係がありません。
CREATE CATALOG IF NOT EXISTS training;
CREATE SCHEMA IF NOT EXISTS dlt_sample;
USE CATALOG training;
USE SCHEMA dlt_sample;
CREATE OR REPLACE TABLE users AS
SELECT
col1 AS userId,
col2 AS name,
col3 AS city
FROM
(
VALUES
(124, "山田", "東京"),
(123, "鈴木", "東京"),
(125, "佐藤", "大阪"),
(126, "斎藤", "名古屋")
);
このようなテーブルが出来ます。
Step2. DLTパイプラインの作成
DLTパイプライン用のノートブックを作成し、以下のコードを記載。
SCD type1のテーブルとSCD type2のテーブルをそれぞれAPPLY CHANGE FROM SNAPSHOT APIを使って作る処理を記述しています。
記述の仕方は従来のAPPLY CHANGE APIと似ていますね。
なお、現在はPython用APIしか公開されていないようです。
import dlt
@dlt.view(name="source")
def source():
return spark.read.table("training.dlt_sample.users")
### SCD type1
dlt.create_streaming_table("target_scd1")
dlt.apply_changes_from_snapshot(
target="target_scd1",
source="source",
keys=["userId"],
stored_as_scd_type=1,
)
### SCD Type2: Periodic snapshot processing
dlt.create_streaming_table("target_scd2_periodic")
dlt.apply_changes_from_snapshot(
target="target_scd2_periodic",
source="source",
keys=["userId"],
stored_as_scd_type=2,
)
Delta Live Tablesのパイプラインを作成。
クラスタはサーバレスにしました。
実行すると二つのテーブル(ストリーミングテーブル)が出来ます。
target_scd1
は以下のようなテーブルが出来ます。元テーブルと同じですね。
target_scd2_periodic
は以下のようなテーブルが出来ます。SCD type2なので、有効期間を示す__START_ATと__END_ATのカラムが追加されています。
Step3. テストデータを更新
SQLテストデータにレコードを追加や更新、削除処理を行います。
USE CATALOG training;
USE SCHEMA dlt_sample;
--
INSERT INTO users VALUES(127, "伊藤", "大阪");
UPDATE users SET city = '福岡' WHERE userId = '123';
UPDATE users SET city = '仙台' WHERE userId = '126';
DELETE FROM users WHERE userId = '124';
以下のようなテーブルとなります。
伊藤さんが追加となり、鈴木さんと斎藤さんが異動されました。山田さんのレコードは削除されています。
Step4. DLTパイプラインの再実行
Step2のパイプラインを再実行します。
結果、以下のようになりました。
target_scd1
は以下のようなデータとなります。
元データと同じ内容になっていますね。
target_scd2_periodic
は以下のように更新されています。
有効期限が記録され、適切にMERGE処理されているのがわかります。
CDFを準備しなくても、テーブルやビューからSCD実装が簡単にできました。
SCD type2で履歴管理をしたい場合などに非常に便利だと思います。
その他
ここのサンプルも再現してみたのですが、エラーが出てうまくいかず。
やり方がまずいのか、折を見て再チャレンジしようと思います。
まとめ
Delta Live Tablesは個人的に好きなサービスの一つなのですが、こちらも進化していっていますね。
以前と比べれば開発体験も向上してきています。
欲を言えば開発体験はまだまだ良くなって欲しいのですが。これはLakeflow Pipelineとして再編されるときに期待かな。。。
また、以下にDLT関連のサンプルノートブックがまとまったリポジトリもあるので、今後はもう少し複雑なサンプルなどに挑戦してみたいと思います。