Change data capture with Delta Live Tables | Databricks on AWS [2022/8/11時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
注意
本書では、ソースデータの変更に基づいて、どのようにDelta Live Tablesのパイプラインをアップデートするのかを説明します。Deltaテーブルにおける行レベルの変更を記録しクエリーする方法については、チェンジデータフィードをご覧ください。
プレビュー
Delta Live TablesにおけるSCD type 2サポートはPublic Previewです。
ソースデータの変更に基づいてテーブルをアップデートするために、Delta Live Tablesでチェンジデータキャプチャ(CDC)を利用することができます。CDCはDelta Live TablesのSQLおよびPythonインタフェースでサポートされています。また、Delta Live Tablesでは、slowly changing dimensions (SCD)のタイプ1とタイプ2をサポートしています。
- レコードを直接更新するにはSCDタイプ1を使用します。更新されたレコードに対する履歴は保持されません。
- レコードのすべての更新履歴をを保持するにはSCDタイプ2を使用します。
変更の有効期間を表現するために、SCDタイプ2では、すべての変更を生成された__START_AT
と__END_AT
カラムと共に格納します。__START_AT
と__END_AT
カラムを生成するために、Delta Live TablesはSQLではSEQUENCE BY
、Pythonではsequence_by
で指定されるカラムを使用します。
注意
__START_AT
と__END_AT
カラムのデータ型はSEQUENCE BY
フィールドで指定されたデータ型と同じものになります。
SQL
Delta Live Tables CDCの機能を使用するためには、APPLY CHANGES INTO
文を使用します。
APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[WHERE condition]
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
句 |
---|
KEYS ソースデータで行を一意に特定するカラム、あるいはカラムの組み合わせです。これは、ターゲットテーブルの特定のレコードに適用されるCDCイベントの特定に使用されます。 この句は必須です。 |
WHERE パーティションプルーニングのような最適化処理を起動するために、ソース、ターゲットテーブルの両方に適用される条件です。この条件はソースのレコードを削除することに使用することはできません。ソーステーブルのすべてのCDCレコードはこの条件を満足する必要があり、そうでない場合にはエラーが発生します。WHERE句の使用は任意であり、お使いの処理で特定の最適化が必要な場合に使用されるべきです。 この句は任意です。 |
IGNORE NULL UPDATES ターゲットカラムのサブセットに対する取り込み更新を許可します。CDCイベントが既存のレコードと合致し、IGNORE NULL UPDATESが指定されている場合、 null を含むカラムは、ターゲットにおける既存の値を維持します。これはnull 値を持つネストされたカラムにも適用されます。この句は任意です。 デフォルトでは、 null 値を持つ既存カラムを上書きします。 |
APPLY AS DELETE WHEN CDCイベントがupsertではなく DELETE として取り扱われるべきタイミングを指定します。out-of-orderのデータを取り扱うために、削除されたレコードは背後のDeltaテーブルで一時的にtombstoneとして保持され、これらのtombstoneを表示するビューがメタストアに作成されます。保持期間はテーブルプロパティのpipelines.cdc.tombstoneGCThresholdInSeconds で設定することができます。この句は任意です。 |
APPLY AS TRUNCATE WHEN CDCイベントがテーブルの完全な TRUNCATE として取り扱われるべきタイミングを指定します。この句はターゲットテーブルを完全にtruncateするので、この機能を必要とする特定のユースケースでのみ使用すべきです。APPLY AS TRUNCATE WHEN 句はSCDタイプ1でのみサポートされます。SCDタイプ2ではtruncateをサポートしていません。この句は任意です。 |
SEQUENCE BY ソースデータにおけるCDCイベントの論理的順序を指定するカラム名です。Delta Live Tablesは、out-of-orderな変更イベントを取り扱うために、この順序を使用します。 この句は必須です。 |
COLUMNS ターゲットテーブルに含めるカラムのサブセットを指定します。以下のいずれかが可能です。
デフォルトでは、 COLUMNS 句が指定されない場合、ターゲットテーブルには全てのカラムが含まれます。 |
STORED AS レコードをSCDタイプ1あるいはSCDタイプ2で格納するのかを指定します。 この句は任意です。 デフォルトはSCDタイプ1です。 |
INSERT
、UPDATE
イベントのデフォルトの挙動は、ソースからのCDCイベントをupsertするというものです:指定されたキーにマッチするターゲットテーブルの行を更新する、あるいはターゲットテーブルにマッチする行がない場合には行を挿入します。DELETE
イベントの挙動をAPPLY AS DELETE WHEN
条件で指定することも可能です。
Python
Delta Live Tables CDC機能を使うためには、Python APIのapply_changes()
関数を使用します。また、Delta Live TablesのPython CDCインタフェースは
Apply changes関数
apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>
)
引数 |
---|
target Type: str 更新されるテーブル名です。 このパラメーターは必須です。 |
source Type: str CDCレコードを含むデータソースです。 このパラメーターは必須です。 |
keys Type: list ソースデータで行を一意に特定するカラム、あるいはカラムの組み合わせです。これは、ターゲットテーブルの特定のレコードに適用されるCDCイベントの特定に使用されます。 以下のいずれかの方法で指定できます:
col() 関数の引数にはクオリファイアを含めることはできません。例えば、col(userId) は使えますが、col(source.userId) は使えません。このパラメーターは必須です。 |
sequence_by Type: str あるいはcol() ソースデータにおけるCDCイベントの論理的順序を指定するカラム名です。Delta Live Tablesは、out-of-orderな変更イベントを取り扱うために、この順序を使用します。 以下のいずれかの方法で指定できます:
col() 関数の引数にはクオリファイアを含めることはできません。例えば、col(userId) は使えますが、col(source.userId) は使えません。このパラメーターは必須です。 |
ignore_null_updates Type: bool ターゲットカラムのサブセットに対する取り込み更新を許可します。CDCイベントが既存のレコードと合致し、 ignore_null_updates がTrue の場合、null を含むカラムは、ターゲットにおける既存の値を維持します。これはnull 値を持つネストされたカラムにも適用されます。ignore_null_updates がFalse の場合、既存の値はnull で上書きされます。このパラメーターは任意です。 デフォルトは False です。 |
apply_as_delete Type: str あるいはexpr() CDCイベントがupsertではなく DELETE として取り扱われるべきタイミングを指定します。out-of-orderのデータを取り扱うために、削除されたレコードは背後のDeltaテーブルで一時的にtombstoneとして保持され、これらのtombstoneを表示するビューがメタストアに作成されます。保持期間はテーブルプロパティのpipelines.cdc.tombstoneGCThresholdInSeconds で設定することができます。以下のいずれかの方法で指定できます。
|
apply_as_truncates Type: str あるいはexpr() CDCイベントがテーブルの完全な TRUNCATE として取り扱われるべきタイミングを指定します。この句はターゲットテーブルを完全にtruncateするので、この機能を必要とする特定のユースケースでのみ使用すべきです。apply_as_truncates パラメータはSCDタイプ1でのみサポートされます。SCDタイプ2ではtruncateをサポートしていません。以下のいずれかの方法で指定できます。
|
column_list except_column_list Type: list ターゲットテーブルに含めるカラムのサブセットを指定します。含めるカラムの完全なリストを指定するには column_list を使用します。除外するカラムを指定するにはexcept_column_list を使用します。文字列のリストあるいはSpark SQLのcol() 関数で値を宣言することができます
col("sequenceNum") col() 関数の引数にはクオリファイアを含めることはできません。例えば、col(userId) は使えますが、col(source.userId) は使えません。このパラメーターは任意です。 column_list やexcept_column_list 引数が関数に渡されていない場合、デフォルトではターゲットテーブルの全てのカラムが含まれます。 |
stored_as_scd_type Type: str かint レコードをSCDタイプ1あるいはSCDタイプ2で格納するのかを指定します。 SCDタイプ1では 1 、SCDタイプ2では2 を指定します。この句は任意です。 デフォルトはSCDタイプ1です。 |
INSERT
、UPDATE
イベントのデフォルトの挙動は、ソースからのCDCイベントをupsertするというものです:指定されたキーにマッチするターゲットテーブルの行を更新する、あるいはターゲットテーブルにマッチする行がない場合には行を挿入します。DELETE
イベントの挙動を引数apply_as_delete
で指定することも可能です。
出力レコードのためのターゲットテーブルの作成
apply_changes()
の出力レコードのためのターゲットテーブルを作成するためにcreate_streaming_live_table()
関数を使用します。
注意
create_target_table()
関数は非推奨となります。既存のコードではcreate_streaming_live_table()
を使用するように更新することをお勧めします。
create_streaming_live_table(
name = "<table-name>",
comment = "<comment>"
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
partition_cols=["<partition-column>", "<partition-column>"],
path="<storage-location-path>",
schema="schema-definition"
)
引数 |
---|
name Type: str テーブル名 このパラメータは必須です。 |
comment Type: str テーブルに対するオプションのコメント。 |
spark_conf Type: dict このクエリーの実行におけるSpark設定のリスト。オプション。 |
table_properties Type: dict テーブルのプロパティのリスト。オプション。 |
partition_cols Type: array テーブルをパーティショニングするための1つ以上のカラムのリスト。オプション。 |
path Type: str テーブルデータの格納場所。オプション。指定されない場合、パイプラインのストレージ場所をデフォルトとします。 |
schema Type: str テーブルのスキーマ定義。オプション。スキーマはSQL DDL文字列、Pythonの StructType で定義することができます。 |
apply_changes
のターゲットテーブルのスキーマを指定する際、sequence_by
フィールドと同じデータ型を__START_AT
と__END_AT
カラムに指定して含める必要があります。例えば、ターゲットテーブルにkey, STRING, value, STRING sequencing, LONG
のカラムがある場合、以下のようになります。
create_streaming_live_table(
name = "target",
comment = "Target for CDC ingestion.",
partition_cols=["value"],
path="$tablePath",
schema=
StructType(
[
StructField('key', StringType()),
StructField('value', StringType()),
StructField('sequencing', LongType()),
StructField('__START_AT', LongType()),
StructField('__END_AT', LongType())
]
)
)
注意
-
APPLY CHANGES INTO
クエリーやapply_changes
関数を実行する前に、ターゲットテーブルが作成されていることを確認してください。サンプルクエリーを参照ください。 - 出力行数などターゲットテーブルのメトリクスは利用できません。
- SCDタイプ2は、カラムに変更がなくても入力行ごとに履歴を追加します。
-
APPLY CHANGES INTO
のクエリーやapply_changes
関数のターゲットは、ストリーミングライブテーブルのソースとして使用することはできません。APPLY CHANGES INTO
のクエリーやapply_changes
関数のターゲットから読み込むテーブルはライブテーブルでなくてはいけません。 -
APPLY CHANGES INTO
クエリーやapply_changes
関数ではエクスペクテーションはサポートされていません。ソース、ターゲットデータセットにエクスペクテーションを適用するには、以下の手順を踏んでください。- 必要なエクスペクテーションを持つ中間テーブルを定義することでソースデータにエクスペクテーションを追加し、このデータセットをターゲットテーブルに対するソースとします。
- ターゲットテーブルから入力データを読み込む下流のテーブルを用いてターゲットデータに対するエクスペクテーションを追加します。
テーブルプロパティ
以下のテーブルプロパティは、DELETE
イベントにおけるtombstone管理を制御するために追加されています。
テーブルプロパティ |
---|
pipelines.cdc.tombstoneGCThresholdInSeconds out-of-orderのデータに期待する最大の間隔にマッチする値を設定します。 |
pipelines.cdc.tombstoneGCFrequencyInSeconds tombstoneとチェックされたデータのクリーンアップの頻度を指定します。 デフォルト:5分 |
サンプル
このサンプルでは、以下のソースイベントに基づいてターゲットテーブルをアップデートするDelta Live TablesのSCDタイプ1とタイプ2のクエリーをデモします。
- 新規ユーザーのレコードを作成
- ユーザーレコードを削除
- ユーザーレコードを更新。SCDタイプ1のサンプルでは、out-of-orderのイベントの取り扱いをデモするために、最後の
UPDATE
オペレーションは遅れて到着したので、ターゲットテーブルからは除外されます。
userId | name | city | operation | sequenceNum |
---|---|---|---|---|
124 | Raul | Oaxaca | INSERT | 1 |
123 | Isabel | Monterrey | INSERT | 1 |
125 | Mercedes | Tijuana | INSERT | 2 |
126 | Lily | Cancun | INSERT | 2 |
123 | null | null | DELETE | 6 |
125 | Mercedes | Guadalajara | UPDATE | 6 |
125 | Mercedes | Mexicali | UPDATE | 5 |
123 | Isabel | Chihuahua | UPDATE | 5 |
このSCDタイプ1のサンプルのクエリーを実行した後では、ターゲットテーブルには以下のレコードが含まれることになります。
userId | name | city |
---|---|---|
124 | Raul | Oaxaca |
125 | Mercedes | Guadalajara |
126 | Lily | Cancun |
以下の入力レコードには、TRUNCATE
オペレーションを伴う追加レコードが含まれており、SCDタイプ1のサンプルコードで使用することができます。
userId | name | city | operation | sequenceNum |
---|---|---|---|---|
124 | Raul | Oaxaca | INSERT | 1 |
123 | Isabel | Monterrey | INSERT | 1 |
125 | Mercedes | Tijuana | INSERT | 2 |
126 | Lily | Cancun | INSERT | 2 |
123 | null | null | DELETE | 6 |
125 | Mercedes | Guadalajara | UPDATE | 6 |
125 | Mercedes | Mexicali | UPDATE | 5 |
123 | Isabel | Chihuahua | UPDATE | 5 |
null | null | null | TRUNCATE | 3 |
追加のTRUNCATE
レコードを伴うSCDタイプ1のサンプルの実行後は、TRUNCATE
オペレーションがsequenceNum=3
であるため、レコード124
と125
は切り取られ、ターゲットテーブルには以下のレコードが含まれることになります。
userId | name | city |
---|---|---|
125 | Mercedes | Guadalajara |
SCDタイプ2のサンプルの実行後は、ターゲットテーブルは以下のようになります。
userId | name | city | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Monterrey | 1 | 5 |
123 | Isabel | Chihuahua | 5 | 6 |
123 | Raul | Oaxaca | 1 | null |
123 | Mercedes | Tijuana | 2 | 5 |
123 | Mercedes | Mexicali | 5 | 6 |
123 | Mercedes | Guadalajara | 6 | null |
123 | Lily | Cancun | 2 | null |
テストデータの生成
このサンプルのテストレコードを生成するには、以下の手順を踏みます。
-
Databricksワークスペースに移動し、Create a notebookを選択するか、サイドバーのCreateをクリックし、メニューからNotebookを選択します。Create Notebookダイアログが表示されます。
-
Create Notebookダイアログでは、Generate test CDC recordsのようなノートブック名を入力します。Default LanguageドロップダウンメニューからSQLを選択します。
-
稼働中のクラスターがある場合には、Clusterドロップダウンが表示されます。ノートブックをアタッチしたいクラスターを選択します。ノートブックを作成した後に新規クラスターを作成しアタッチすることができます。
-
Createをクリックします。
-
ノートブックの最初のセルに以下のクエリーを貼り付けます。
SQLCREATE SCHEMA IF NOT EXISTS cdc_data; CREATE TABLE cdc_data.users AS SELECT col1 AS userId, col2 AS name, col3 AS city, col4 AS operation, col5 AS sequenceNum FROM ( VALUES -- Initial load. (124, "Raul", "Oaxaca", "INSERT", 1), (123, "Isabel", "Monterrey", "INSERT", 1), -- New users. (125, "Mercedes", "Tijuana", "INSERT", 2), (126, "Lily", "Cancun", "INSERT", 2), -- Isabel is removed from the system and Mercedes moved to Guadalajara. (123, null, null, "DELETE", 6), (125, "Mercedes", "Guadalajara", "UPDATE", 6), -- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state. (125, "Mercedes", "Mexicali", "UPDATE", 5), (123, "Isabel", "Chihuahua", "UPDATE", 5) -- Uncomment to test TRUNCATE. -- ,(null, null, null, "TRUNCATE", 3) );
-
ノートブックを実行し、レコードを生成するには、右端にあるセルのアクションメニューからをクリックし、Run Cellを選択するか、shift+enterを押下します。
SCDタイプ1のサンプルパイプラインを作成し実行する
- Databricksワークスペースに移動し、Create a notebookを選択するか、サイドバーのCreateをクリックし、メニューからNotebookを選択します。Create Notebookダイアログが表示されます。
- Create Notebookダイアログでは、DLT CDC exampleのようなノートブック名を入力します。使う言語に基づいて、Default LanguageドロップダウンメニューからPythonかSQLを選択します。Clusterはデフォルト値のままでかまいません。Delta Live Tablesランタイムはパイプラインを実行する前にクラスターを作成します。
- Createをクリックします。
- ノートブックの最初のセルにPythonかSQLクエリーを貼り付けます。
- パイプラインを作成し、Notebook Librariesフィールドにノートブックを追加します。パイプライン処理の出力を公開するには、オプションとしてTargetフィールドにデータベース名を入力します。
- パイプラインをスタートします。Targetを設定した場合は、クエリーの結果を参照し確認することができます。
サンプルクエリー
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.format("delta").table("cdc_data.users")
dlt.create_streaming_live_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING LIVE TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
SCDタイプ2のサンプルパイプラインを作成し実行する
- Databricksワークスペースに移動し、Create a notebookを選択するか、サイドバーのCreateをクリックし、メニューからNotebookを選択します。Create Notebookダイアログが表示されます。
- Create Notebookダイアログでは、DLT CDC exampleのようなノートブック名を入力します。使う言語に基づいて、Default LanguageドロップダウンメニューからPythonかSQLを選択します。Clusterはデフォルト値のままでかまいません。Delta Live Tablesランタイムはパイプラインを実行する前にクラスターを作成します。
- Createをクリックします。
- ノートブックの最初のセルにPythonかSQLクエリーを貼り付けます。
- パイプラインを作成し、Notebook Librariesフィールドにノートブックを追加します。パイプライン処理の出力を公開するには、オプションとしてTargetフィールドにデータベース名を入力します。
- パイプラインをスタートします。Targetを設定した場合は、クエリーの結果を参照し確認することができます。
サンプルクエリー
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.format("delta").table("cdc_data.users")
dlt.create_streaming_live_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING LIVE TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;