こちらの続編です。
上の記事では、Delta Live Tablesでオートローダー(Auto Loader)を活用することで、新規に到着したデータのみを処理するパイプラインを簡単に構築できる様子を説明しました。
ここでは、さらに要件を追加します。
要件
- 取り込まれるデータにはキーが存在する。
- 取り込み済みのデータと取り込んだデータでキーによる照合を行い、一致するものがある場合には
UPDATE
、ない場合にはINSERT
を行う。いわゆるUPSERTを行う。 - 処理の順序をしているキーが存在するので、それに従ってUPSERTを行う。
DLTのAPPLY CHANGES INTO
DLTでUPSERTを行うにはAPPLY CHANGES INTO
を使用します。
APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[WHERE condition]
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
今回使うのは以下の2つです。
- KEYS: ソースデータで行を一意に特定するカラム、あるいはカラムの組み合わせです。これは、ターゲットテーブルの特定のレコードに適用されるCDCイベントの特定に使用されます。この句は必須です。
- SEQUENCE BY: ソースデータにおけるCDCイベントの論理的順序を指定するカラム名です。Delta Live Tablesは、out-of-orderな変更イベントを取り扱うために、この順序を使用します。この句は必須です。
データの準備
data_path = "/tmp/takaaki.yayoi@databricks.com/dlt/landing"
dbutils.fs.rm(data_path, True)
少しずつデータを変更していく様子を示すためにデータを準備します。
data_07_06 = [{"Date": "2023/07/06", "Category": 'A', "ID": 1, "Value": 121.44, "Truth": True},
{"Date": "2023/07/06", "Category": 'B', "ID": 2, "Value": 300.01, "Truth": False},
{"Date": "2023/07/06", "Category": 'C', "ID": 3, "Value": 10.99, "Truth": None},
{"Date": "2023/07/06", "Category": 'D', "ID": 4, "Value": 33.87, "Truth": True}
]
df_07_06 = spark.createDataFrame(data_07_06)
display(df_07_06)
data_07_07 = [{"Date": "2023/07/07", "Category": 'A', "ID": 1, "Value": 121.44, "Truth": True},
{"Date": "2023/07/07", "Category": 'B', "ID": 2, "Value": 300.01, "Truth": False},
#{"Date": "2023/07/07", "Category": 'C', "ID": 3, "Value": 10.99, "Truth": None},
{"Date": "2023/07/07", "Category": 'D', "ID": 4, "Value": 33.87, "Truth": True}
]
df_07_07 = spark.createDataFrame(data_07_07)
display(df_07_07)
data_07_08 = [#{"Date": "2023/07/08", "Category": 'A', "ID": 1, "Value": 121.44, "Truth": True},
#{"Date": "2023/07/08", "Category": 'B', "ID": 2, "Value": 300.01, "Truth": False},
{"Date": "2023/07/08", "Category": 'C', "ID": 3, "Value": 10.99, "Truth": True},
{"Date": "2023/07/08", "Category": 'E', "ID": 5, "Value": 33.87, "Truth": True}
]
df_07_08 = spark.createDataFrame(data_07_08)
display(df_07_08)
最初の2つのデータを保存します。
save_path = f"{data_path}/2023/07/06/"
df_07_06.write.format("parquet").option("header", "true").mode("overwrite").save(save_path)
save_path = f"{data_path}/2023/07/07/"
df_07_07.write.format("parquet").option("header", "true").mode("overwrite").save(save_path)
パイプラインの定義
ブロンズテーブルの定義は前回と同じです。
CREATE
OR REFRESH STREAMING TABLE cdc_bronze TBLPROPERTIES (
'delta.minReaderVersion' = '2',
'delta.minWriterVersion' = '5',
'delta.columnMapping.mode' = 'name'
) COMMENT "Parquetをそのまま保持するブロンズテーブル" AS
SELECT
*,
current_timestamp() as processed -- 処理時刻
FROM
cloud_files(
"/tmp/takaaki.yayoi@databricks.com/dlt/landing/",
"parquet"
)
これで新規データが毎回取り込まれるので、それに対してチェンジデータキャプチャを適用します。
CREATE OR REFRESH STREAMING TABLE cdc_silver COMMENT "変更点のみが反映されるシルバーテーブル";
APPLY CHANGES INTO LIVE.cdc_silver
FROM STREAM(live.cdc_bronze) -- ブロンズテーブルからストリーミング読み込み
KEYS (ID) -- IDでマッチング
SEQUENCE BY Date -- Date順に処理
パイプラインの実行
シルバーテーブルを確認します。
SELECT * FROM takaakiyayoi_catalog.dlt.cdc_silver ORDER BY ID ASC
7/6のデータを読み込んだ後に、7/7のデータの変更点が反映されています。
追加データの読み込み
save_path = f"{data_path}/2023/07/08/"
df_07_08.write.format("parquet").option("header", "true").mode("overwrite").save(save_path)
これは、ID3をUPDATEし、ID5をINSERTします。
再度DLTパイプラインを実行します。2行分のみが反映されます。
このような仕組みをスクラッチで構築するのは骨が折れますが、DLTを活用すればシンプルなコーディングだけで実現できます。是非お試しください!