0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Databricks DLT の APPLY CHANGES API に関する仕様検証

Posted at

概要

Databricks DLT の APPLY CHANGES API について仕様を検証した結果を共有します。
結論として、Key 列が NULL のレコードも連携されるため、実運用時には Key 列が NULL のレコードを除外するなどの対策が必要になりそうです。

今回の主な検証結果は下記のとおりです。

  1. Key 列が NULL の場合でもエラーにならず連携される
  2. ソース側で key と sequence_by が重複していてもエラーにならない

検証パターンと結果

以下の表に各検証パターンとその結果をまとめました。

# 検証パターン 結果
1 ソースで Key と sequence_by が重複している場合の UPDATE 検証 エラーにならず、ID=2 のレコードが反映された
2 ソースで Key と sequence_by が重複している場合の INSERT 検証 エラーにならず、ID=3 のレコードが反映された
3 ソースで sequence_by が NULL の場合の動作検証 エラーにならず、ID=4 のレコードが追加された
4 ソースで sequence_by に値を設定した場合の動作検証 ID=4 のレコードが更新された
5 ソースで sequence_by を再び NULL で追加した場合の動作検証 ID=4 のレコードは変更されず、前回のまま維持された
6 ソースで Key が NULL の場合の動作検証 NULL の ID レコードが追加された
7 ソースで Key が NULL のレコードを追加した場合の動作検証 NULL の ID レコードが更新された

検証コードと結果

事前準備

カタログを設定します。

%sql
USE CATALOG manabian_test_01;

image.png

ソースのスキーマとテーブルを作成します。

%sql
CREATE SCHEMA IF NOT EXISTS dlt_source_01;
%sql
CREATE OR REPLACE TABLE dlt_source_01.table_01
AS
SELECT
  1 AS id,
  'John' AS string_col,
  CAST('2025-01-01' AS TIMESTAMP) AS ingest_ts
  ;
SELECT * FROM dlt_source_01.table_01;

続いて、ソーステーブルを参照する DLT の処理を記述して実行します。
DLT の書き込み先スキーマは dlt_test_01 です。

from dlt import *
from pyspark.sql.functions import *


catalog = "manabian_test_01"
schema = "dlt_test_01"

# table_01 の処理
table_01 = "table_01"
ftable_01 = f"{catalog}.{schema}.{table_01}"


@dlt.table(
    name=ftable_01,
)
def customers_bronze_ingest_flow():
    return spark.readStream.table(f"{catalog}.dlt_source_01.table_01")


# table_02 の処理
table_02 = "table_02"
ftable_02 = f"{catalog}.{schema}.{table_02}"
dlt.create_streaming_table(
    name=ftable_02,
)
dlt.apply_changes(
    target=ftable_02,
    source=ftable_01,
    keys=["id"],
    sequence_by=col("ingest_ts"),
    stored_as_scd_type=1,
)

image.png

初回実行後、結果が想定通りであることを確認します。

%sql
SELECT * FROM dlt_test_01.table_02

image.png

次に、ソーステーブルにデータを書き込んで DLT の処理を再度実行し、結果を確認します。

%sql
WITH src AS (
SELECT
  1 AS id,
  'John++++++++++' AS string_col,
  CAST('2025-02-01' AS TIMESTAMP) AS ingest_ts
UNION ALL
SELECT
  2 AS id,
  'Doe' AS string_col,
  CAST('2025-02-01' AS TIMESTAMP) AS ingest_ts

)
INSERT INTO dlt_source_01.table_01
SELECT * FROM src;

SELECT * FROM dlt_source_01.table_01;
%sql
SELECT * FROM dlt_test_01.table_02

image.png

1. ソースにて Key と sequence_by が重複している場合の UPDATE の検証

エラーにならず、ID=2 のレコードが反映されることを確認しました。

%sql
WITH src AS (
SELECT
  2 AS id,
  'Doe+1' AS string_col,
  CAST('2025-03-01' AS TIMESTAMP) AS ingest_ts
UNION ALL
SELECT
  2 AS id,
  'Doe+2' AS string_col,
  CAST('2025-03-01' AS TIMESTAMP) AS ingest_ts
UNION ALL
SELECT
  2 AS id,
  'Doe+3' AS string_col,
  CAST('2025-03-01' AS TIMESTAMP) AS ingest_ts

)
INSERT INTO dlt_source_01.table_01
SELECT * FROM src;

SELECT * FROM dlt_source_01.table_01;
%sql
SELECT * FROM dlt_test_01.table_02

image.png

2. ソースにて Key と sequence_by が重複している場合の INSERT の検証

エラーにならず、ID=3 のレコードが反映されることを確認しました。

%sql
WITH src AS (
SELECT
  3 AS id,
  'Cat+1' AS string_col,
  CAST('2025-04-01' AS TIMESTAMP) AS ingest_ts
UNION ALL
SELECT
  3 AS id,
  'Cat+2' AS string_col,
  CAST('2025-04-01' AS TIMESTAMP) AS ingest_ts
UNION ALL
SELECT
  3 AS id,
  'Cat+3' AS string_col,
  CAST('2025-04-01' AS TIMESTAMP) AS ingest_ts

)
INSERT INTO dlt_source_01.table_01
SELECT * FROM src;

SELECT * FROM dlt_source_01.table_01;
%sql
SELECT * FROM dlt_test_01.table_02

image.png

3. ソースにて sequence_by が NULL である場合の動作検証

エラーにならず、ID=4 のレコードが新規追加されていることを確認しました。

%sql
WITH src AS (
SELECT
  4 AS id,
  'Donkey' AS string_col,
  NULL AS ingest_ts
)
INSERT INTO dlt_source_01.table_01
SELECT * FROM src;

SELECT * FROM dlt_source_01.table_01;
%sql
SELECT * FROM dlt_test_01.table_02 ORDER BY ID

image.png

4. ソースにて sequence_by に値を設定した場合の動作の検証

ID=4 のレコードが更新されたことを確認しました。

%sql
WITH src AS (
SELECT
  4 AS id,
  'Donkey+1' AS string_col,
  CAST('2025-05-01' AS TIMESTAMP) AS ingest_ts
)
INSERT INTO dlt_source_01.table_01
SELECT * FROM src;

SELECT * FROM dlt_source_01.table_01;
%sql
SELECT * FROM dlt_test_01.table_02 ORDER BY ID

image.png

5. ソースにて sequence_by が再度 NULL で追加された場合の動作検証

ID=4 のレコードは更新されず、前回の状態が維持されました。

%sql
WITH src AS (
SELECT
  4 AS id,
  'Donkey+2' AS string_col,
  NULL AS ingest_ts
)
INSERT INTO dlt_source_01.table_01
SELECT * FROM src;

SELECT * FROM dlt_source_01.table_01;
%sql
SELECT * FROM dlt_test_01.table_02 ORDER BY ID

image.png

6. ソースにて Key が NULL である場合の動作検証

ID=NULL のレコードが新規追加されていることを確認しました。

%sql
WITH src AS (
SELECT
  NULL AS id,
  'Elephant' AS string_col,
  CAST('2025-05-01' AS TIMESTAMP) AS ingest_ts
)
INSERT INTO dlt_source_01.table_01
SELECT * FROM src;

SELECT * FROM dlt_source_01.table_01;
%sql
SELECT * FROM dlt_test_01.table_02 ORDER BY ID

image.png

7. ソースにて Key が NULL のレコードが追加された場合の動作検証

ID=NULL のレコードが更新されていることを確認しました。

%sql
WITH src AS (
SELECT
  NULL AS id,
  'Elephant+1' AS string_col,
  CAST('2025-06-01' AS TIMESTAMP) AS ingest_ts
)
INSERT INTO dlt_source_01.table_01
SELECT * FROM src;

SELECT * FROM dlt_source_01.table_01;
``

```sql
%sql
SELECT * FROM dlt_test_01.table_02 ORDER BY ID

image.png

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?