0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Databricks Delta Live TablesのAPPLY CHANGE FROM SNAPSHOTを試す

Posted at

導入

以下のDatabricks公式Blogを読みまして。

DatabricksのDelta Live TablesにAPPLY CHANGE FROM SNAPSHOTというAPIが追加されていることを知りました。

Delta Live Tables(DLT)は、Databricks上のマネージドなETLサービスであり、その中でSlowly Changing Dimensions(SCD)に関わる機能も提供しています。
ただChange Data Feed(CDF)からのSCDのみ対応していたという認識だったのですが、テーブル・ビューなどからの直接SCDを実現するAPIが実装されたということのようです。

これは便利。

というわけで、公式ドキュメントの内容を基に簡単なサンプルを実行してみます。

Step1. 初期テストデータの作成

まずはDLTを実行するためのテストデータを準備します。
ノートブックもしくはSQLエディタ上で以下のコードを実行して、training.dlt_sample.usersテーブルを作成しました。

内容はユーザマスタを想定した適当データです。実在の人物とは一切関係がありません。

CREATE CATALOG IF NOT EXISTS training;
CREATE SCHEMA IF NOT EXISTS dlt_sample;
USE CATALOG training;
USE SCHEMA dlt_sample;
CREATE OR REPLACE TABLE users AS
SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city
FROM
  (
    VALUES
      (124, "山田", "東京"),
      (123, "鈴木", "東京"),
      (125, "佐藤", "大阪"),
      (126, "斎藤", "名古屋")
  );

このようなテーブルが出来ます。

image.png

Step2. DLTパイプラインの作成

DLTパイプライン用のノートブックを作成し、以下のコードを記載。
SCD type1のテーブルとSCD type2のテーブルをそれぞれAPPLY CHANGE FROM SNAPSHOT APIを使って作る処理を記述しています。
記述の仕方は従来のAPPLY CHANGE APIと似ていますね。
なお、現在はPython用APIしか公開されていないようです。

import dlt


@dlt.view(name="source")
def source():
    return spark.read.table("training.dlt_sample.users")


### SCD type1
dlt.create_streaming_table("target_scd1")

dlt.apply_changes_from_snapshot(
    target="target_scd1",
    source="source",
    keys=["userId"],
    stored_as_scd_type=1,
)


### SCD Type2: Periodic snapshot processing
dlt.create_streaming_table("target_scd2_periodic")

dlt.apply_changes_from_snapshot(
    target="target_scd2_periodic",
    source="source",
    keys=["userId"],
    stored_as_scd_type=2,
)

Delta Live Tablesのパイプラインを作成。
クラスタはサーバレスにしました。

image.png

実行すると二つのテーブル(ストリーミングテーブル)が出来ます。

image.png

target_scd1は以下のようなテーブルが出来ます。元テーブルと同じですね。

image.png

target_scd2_periodicは以下のようなテーブルが出来ます。SCD type2なので、有効期間を示す__START_ATと__END_ATのカラムが追加されています。

image.png

Step3. テストデータを更新

SQLテストデータにレコードを追加や更新、削除処理を行います。

USE CATALOG training;
USE SCHEMA dlt_sample;
--
INSERT INTO users VALUES(127, "伊藤", "大阪");
UPDATE users SET city = '福岡' WHERE userId = '123';
UPDATE users SET city = '仙台' WHERE userId = '126';
DELETE FROM users WHERE userId = '124';

以下のようなテーブルとなります。
伊藤さんが追加となり、鈴木さんと斎藤さんが異動されました。山田さんのレコードは削除されています。

image.png

Step4. DLTパイプラインの再実行

Step2のパイプラインを再実行します。

結果、以下のようになりました。

target_scd1は以下のようなデータとなります。
元データと同じ内容になっていますね。

image.png

target_scd2_periodicは以下のように更新されています。

image.png

有効期限が記録され、適切にMERGE処理されているのがわかります。

CDFを準備しなくても、テーブルやビューからSCD実装が簡単にできました。
SCD type2で履歴管理をしたい場合などに非常に便利だと思います。

その他

ここのサンプルも再現してみたのですが、エラーが出てうまくいかず。
やり方がまずいのか、折を見て再チャレンジしようと思います。

まとめ

Delta Live Tablesは個人的に好きなサービスの一つなのですが、こちらも進化していっていますね。
以前と比べれば開発体験も向上してきています。
欲を言えば開発体験はまだまだ良くなって欲しいのですが。これはLakeflow Pipelineとして再編されるときに期待かな。。。

また、以下にDLT関連のサンプルノートブックがまとまったリポジトリもあるので、今後はもう少し複雑なサンプルなどに挑戦してみたいと思います。

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?