概要
Databricks DLT の APPLY CHANGES API について仕様を検証した結果を共有します。
結論として、Key 列が NULL のレコードも連携されるため、実運用時には Key 列が NULL のレコードを除外するなどの対策が必要になりそうです。
今回の主な検証結果は下記のとおりです。
- Key 列が NULL の場合でもエラーにならず連携される
- ソース側で key と sequence_by が重複していてもエラーにならない
検証パターンと結果
以下の表に各検証パターンとその結果をまとめました。
# | 検証パターン | 結果 |
---|---|---|
1 | ソースで Key と sequence_by が重複している場合の UPDATE 検証 | エラーにならず、ID=2 のレコードが反映された |
2 | ソースで Key と sequence_by が重複している場合の INSERT 検証 | エラーにならず、ID=3 のレコードが反映された |
3 | ソースで sequence_by が NULL の場合の動作検証 | エラーにならず、ID=4 のレコードが追加された |
4 | ソースで sequence_by に値を設定した場合の動作検証 | ID=4 のレコードが更新された |
5 | ソースで sequence_by を再び NULL で追加した場合の動作検証 | ID=4 のレコードは変更されず、前回のまま維持された |
6 | ソースで Key が NULL の場合の動作検証 | NULL の ID レコードが追加された |
7 | ソースで Key が NULL のレコードを追加した場合の動作検証 | NULL の ID レコードが更新された |
検証コードと結果
事前準備
カタログを設定します。
%sql
USE CATALOG manabian_test_01;
ソースのスキーマとテーブルを作成します。
%sql
CREATE SCHEMA IF NOT EXISTS dlt_source_01;
%sql
CREATE OR REPLACE TABLE dlt_source_01.table_01
AS
SELECT
1 AS id,
'John' AS string_col,
CAST('2025-01-01' AS TIMESTAMP) AS ingest_ts
;
SELECT * FROM dlt_source_01.table_01;
続いて、ソーステーブルを参照する DLT の処理を記述して実行します。
DLT の書き込み先スキーマは dlt_test_01
です。
from dlt import *
from pyspark.sql.functions import *
catalog = "manabian_test_01"
schema = "dlt_test_01"
# table_01 の処理
table_01 = "table_01"
ftable_01 = f"{catalog}.{schema}.{table_01}"
@dlt.table(
name=ftable_01,
)
def customers_bronze_ingest_flow():
return spark.readStream.table(f"{catalog}.dlt_source_01.table_01")
# table_02 の処理
table_02 = "table_02"
ftable_02 = f"{catalog}.{schema}.{table_02}"
dlt.create_streaming_table(
name=ftable_02,
)
dlt.apply_changes(
target=ftable_02,
source=ftable_01,
keys=["id"],
sequence_by=col("ingest_ts"),
stored_as_scd_type=1,
)
初回実行後、結果が想定通りであることを確認します。
%sql
SELECT * FROM dlt_test_01.table_02
次に、ソーステーブルにデータを書き込んで DLT の処理を再度実行し、結果を確認します。
%sql
WITH src AS (
SELECT
1 AS id,
'John++++++++++' AS string_col,
CAST('2025-02-01' AS TIMESTAMP) AS ingest_ts
UNION ALL
SELECT
2 AS id,
'Doe' AS string_col,
CAST('2025-02-01' AS TIMESTAMP) AS ingest_ts
)
INSERT INTO dlt_source_01.table_01
SELECT * FROM src;
SELECT * FROM dlt_source_01.table_01;
%sql
SELECT * FROM dlt_test_01.table_02
1. ソースにて Key と sequence_by が重複している場合の UPDATE の検証
エラーにならず、ID=2 のレコードが反映されることを確認しました。
%sql
WITH src AS (
SELECT
2 AS id,
'Doe+1' AS string_col,
CAST('2025-03-01' AS TIMESTAMP) AS ingest_ts
UNION ALL
SELECT
2 AS id,
'Doe+2' AS string_col,
CAST('2025-03-01' AS TIMESTAMP) AS ingest_ts
UNION ALL
SELECT
2 AS id,
'Doe+3' AS string_col,
CAST('2025-03-01' AS TIMESTAMP) AS ingest_ts
)
INSERT INTO dlt_source_01.table_01
SELECT * FROM src;
SELECT * FROM dlt_source_01.table_01;
%sql
SELECT * FROM dlt_test_01.table_02
2. ソースにて Key と sequence_by が重複している場合の INSERT の検証
エラーにならず、ID=3 のレコードが反映されることを確認しました。
%sql
WITH src AS (
SELECT
3 AS id,
'Cat+1' AS string_col,
CAST('2025-04-01' AS TIMESTAMP) AS ingest_ts
UNION ALL
SELECT
3 AS id,
'Cat+2' AS string_col,
CAST('2025-04-01' AS TIMESTAMP) AS ingest_ts
UNION ALL
SELECT
3 AS id,
'Cat+3' AS string_col,
CAST('2025-04-01' AS TIMESTAMP) AS ingest_ts
)
INSERT INTO dlt_source_01.table_01
SELECT * FROM src;
SELECT * FROM dlt_source_01.table_01;
%sql
SELECT * FROM dlt_test_01.table_02
3. ソースにて sequence_by が NULL である場合の動作検証
エラーにならず、ID=4 のレコードが新規追加されていることを確認しました。
%sql
WITH src AS (
SELECT
4 AS id,
'Donkey' AS string_col,
NULL AS ingest_ts
)
INSERT INTO dlt_source_01.table_01
SELECT * FROM src;
SELECT * FROM dlt_source_01.table_01;
%sql
SELECT * FROM dlt_test_01.table_02 ORDER BY ID
4. ソースにて sequence_by に値を設定した場合の動作の検証
ID=4 のレコードが更新されたことを確認しました。
%sql
WITH src AS (
SELECT
4 AS id,
'Donkey+1' AS string_col,
CAST('2025-05-01' AS TIMESTAMP) AS ingest_ts
)
INSERT INTO dlt_source_01.table_01
SELECT * FROM src;
SELECT * FROM dlt_source_01.table_01;
%sql
SELECT * FROM dlt_test_01.table_02 ORDER BY ID
5. ソースにて sequence_by が再度 NULL で追加された場合の動作検証
ID=4 のレコードは更新されず、前回の状態が維持されました。
%sql
WITH src AS (
SELECT
4 AS id,
'Donkey+2' AS string_col,
NULL AS ingest_ts
)
INSERT INTO dlt_source_01.table_01
SELECT * FROM src;
SELECT * FROM dlt_source_01.table_01;
%sql
SELECT * FROM dlt_test_01.table_02 ORDER BY ID
6. ソースにて Key が NULL である場合の動作検証
ID=NULL のレコードが新規追加されていることを確認しました。
%sql
WITH src AS (
SELECT
NULL AS id,
'Elephant' AS string_col,
CAST('2025-05-01' AS TIMESTAMP) AS ingest_ts
)
INSERT INTO dlt_source_01.table_01
SELECT * FROM src;
SELECT * FROM dlt_source_01.table_01;
%sql
SELECT * FROM dlt_test_01.table_02 ORDER BY ID
7. ソースにて Key が NULL のレコードが追加された場合の動作検証
ID=NULL のレコードが更新されていることを確認しました。
%sql
WITH src AS (
SELECT
NULL AS id,
'Elephant+1' AS string_col,
CAST('2025-06-01' AS TIMESTAMP) AS ingest_ts
)
INSERT INTO dlt_source_01.table_01
SELECT * FROM src;
SELECT * FROM dlt_source_01.table_01;
``
```sql
%sql
SELECT * FROM dlt_test_01.table_02 ORDER BY ID