2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Delta Live TablesにおけるAuto Loaderとチェンジデータキャプチャの活用

Posted at

こちらの続編です。

上の記事では、Delta Live Tablesでオートローダー(Auto Loader)を活用することで、新規に到着したデータのみを処理するパイプラインを簡単に構築できる様子を説明しました。

ここでは、さらに要件を追加します。

要件

  • 取り込まれるデータにはキーが存在する。
  • 取り込み済みのデータと取り込んだデータでキーによる照合を行い、一致するものがある場合にはUPDATE、ない場合にはINSERTを行う。いわゆるUPSERTを行う。
  • 処理の順序をしているキーが存在するので、それに従ってUPSERTを行う。

DLTのAPPLY CHANGES INTO

DLTでUPSERTを行うにはAPPLY CHANGES INTOを使用します。

APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[WHERE condition]
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]

今回使うのは以下の2つです。

  • KEYS: ソースデータで行を一意に特定するカラム、あるいはカラムの組み合わせです。これは、ターゲットテーブルの特定のレコードに適用されるCDCイベントの特定に使用されます。この句は必須です。
  • SEQUENCE BY: ソースデータにおけるCDCイベントの論理的順序を指定するカラム名です。Delta Live Tablesは、out-of-orderな変更イベントを取り扱うために、この順序を使用します。この句は必須です。

データの準備

data_path = "/tmp/takaaki.yayoi@databricks.com/dlt/landing"
dbutils.fs.rm(data_path, True)

少しずつデータを変更していく様子を示すためにデータを準備します。

data_07_06 = [{"Date": "2023/07/06", "Category": 'A', "ID": 1, "Value": 121.44, "Truth": True},
        {"Date": "2023/07/06", "Category": 'B', "ID": 2, "Value": 300.01, "Truth": False},
        {"Date": "2023/07/06", "Category": 'C', "ID": 3, "Value": 10.99, "Truth": None},
        {"Date": "2023/07/06", "Category": 'D', "ID": 4, "Value": 33.87, "Truth": True}
        ]

df_07_06 = spark.createDataFrame(data_07_06)
display(df_07_06)

data_07_07 = [{"Date": "2023/07/07", "Category": 'A', "ID": 1, "Value": 121.44, "Truth": True},
        {"Date": "2023/07/07", "Category": 'B', "ID": 2, "Value": 300.01, "Truth": False},
        #{"Date": "2023/07/07", "Category": 'C', "ID": 3, "Value": 10.99, "Truth": None},
        {"Date": "2023/07/07", "Category": 'D', "ID": 4, "Value": 33.87, "Truth": True}
        ]

df_07_07 = spark.createDataFrame(data_07_07)
display(df_07_07)

data_07_08 = [#{"Date": "2023/07/08", "Category": 'A', "ID": 1, "Value": 121.44, "Truth": True},
        #{"Date": "2023/07/08", "Category": 'B', "ID": 2, "Value": 300.01, "Truth": False},
        {"Date": "2023/07/08", "Category": 'C', "ID": 3, "Value": 10.99, "Truth": True},
        {"Date": "2023/07/08", "Category": 'E', "ID": 5, "Value": 33.87, "Truth": True}
        ]

df_07_08 = spark.createDataFrame(data_07_08)
display(df_07_08)

Screenshot 2023-07-13 at 20.14.36.png

最初の2つのデータを保存します。

save_path = f"{data_path}/2023/07/06/"
df_07_06.write.format("parquet").option("header", "true").mode("overwrite").save(save_path)
save_path = f"{data_path}/2023/07/07/"
df_07_07.write.format("parquet").option("header", "true").mode("overwrite").save(save_path)

パイプラインの定義

ブロンズテーブルの定義は前回と同じです。

CREATE
OR REFRESH STREAMING TABLE cdc_bronze TBLPROPERTIES (
  'delta.minReaderVersion' = '2',
  'delta.minWriterVersion' = '5',
  'delta.columnMapping.mode' = 'name'
) COMMENT "Parquetをそのまま保持するブロンズテーブル" AS
SELECT
  *,
  current_timestamp() as processed -- 処理時刻
FROM
  cloud_files(
    "/tmp/takaaki.yayoi@databricks.com/dlt/landing/",
    "parquet"
  )

これで新規データが毎回取り込まれるので、それに対してチェンジデータキャプチャを適用します。

CREATE OR REFRESH STREAMING TABLE cdc_silver COMMENT "変更点のみが反映されるシルバーテーブル";

APPLY CHANGES INTO LIVE.cdc_silver
FROM STREAM(live.cdc_bronze) -- ブロンズテーブルからストリーミング読み込み
KEYS (ID) -- IDでマッチング
SEQUENCE BY Date -- Date順に処理

パイプラインの実行

Screenshot 2023-07-13 at 20.18.20.png

シルバーテーブルを確認します。

SELECT * FROM takaakiyayoi_catalog.dlt.cdc_silver ORDER BY ID ASC

7/6のデータを読み込んだ後に、7/7のデータの変更点が反映されています。
Screenshot 2023-07-13 at 20.19.23.png

追加データの読み込み

save_path = f"{data_path}/2023/07/08/"
df_07_08.write.format("parquet").option("header", "true").mode("overwrite").save(save_path)

これは、ID3をUPDATEし、ID5をINSERTします。
Screenshot 2023-07-13 at 20.20.23.png

再度DLTパイプラインを実行します。2行分のみが反映されます。
Screenshot 2023-07-13 at 20.21.32.png

期待した通りにテーブルが更新されています。
Screenshot 2023-07-13 at 20.22.10.png

このような仕組みをスクラッチで構築するのは骨が折れますが、DLTを活用すればシンプルなコーディングだけで実現できます。是非お試しください!

Databricksクイックスタートガイド

Databricksクイックスタートガイド

Databricks無料トライアル

Databricks無料トライアル

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?