Change data capture with Delta Live Tables | Databricks on AWS [2022/8/11時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
注意
本書では、ソースデータの変更に基づいて、どのようにDelta Live Tablesのパイプラインをアップデートするのかを説明します。Deltaテーブルにおける行レベルの変更を記録しクエリーする方法については、チェンジデータフィードをご覧ください。
プレビュー
Delta Live TablesにおけるSCD type 2サポートはPublic Previewです。
ソースデータの変更に基づいてテーブルをアップデートするために、Delta Live Tablesでチェンジデータキャプチャ(CDC)を利用することができます。CDCはDelta Live TablesのSQLおよびPythonインタフェースでサポートされています。また、Delta Live Tablesでは、slowly changing dimensions (SCD)のタイプ1とタイプ2をサポートしています。
- レコードを直接更新するにはSCDタイプ1を使用します。更新されたレコードに対する履歴は保持されません。
- レコードのすべての更新履歴をを保持するにはSCDタイプ2を使用します。
変更の有効期間を表現するために、SCDタイプ2では、すべての変更を生成された__START_ATと__END_ATカラムと共に格納します。__START_ATと__END_ATカラムを生成するために、Delta Live TablesはSQLではSEQUENCE BY、Pythonではsequence_byで指定されるカラムを使用します。
注意
__START_ATと__END_ATカラムのデータ型はSEQUENCE BYフィールドで指定されたデータ型と同じものになります。
SQL
Delta Live Tables CDCの機能を使用するためには、APPLY CHANGES INTO文を使用します。
APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[WHERE condition]
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
| 句 |
|---|
|
KEYS ソースデータで行を一意に特定するカラム、あるいはカラムの組み合わせです。これは、ターゲットテーブルの特定のレコードに適用されるCDCイベントの特定に使用されます。 この句は必須です。 |
|
WHERE パーティションプルーニングのような最適化処理を起動するために、ソース、ターゲットテーブルの両方に適用される条件です。この条件はソースのレコードを削除することに使用することはできません。ソーステーブルのすべてのCDCレコードはこの条件を満足する必要があり、そうでない場合にはエラーが発生します。WHERE句の使用は任意であり、お使いの処理で特定の最適化が必要な場合に使用されるべきです。 この句は任意です。 |
|
IGNORE NULL UPDATES ターゲットカラムのサブセットに対する取り込み更新を許可します。CDCイベントが既存のレコードと合致し、IGNORE NULL UPDATESが指定されている場合、 nullを含むカラムは、ターゲットにおける既存の値を維持します。これはnull値を持つネストされたカラムにも適用されます。この句は任意です。 デフォルトでは、 null値を持つ既存カラムを上書きします。 |
|
APPLY AS DELETE WHEN CDCイベントがupsertではなく DELETEとして取り扱われるべきタイミングを指定します。out-of-orderのデータを取り扱うために、削除されたレコードは背後のDeltaテーブルで一時的にtombstoneとして保持され、これらのtombstoneを表示するビューがメタストアに作成されます。保持期間はテーブルプロパティのpipelines.cdc.tombstoneGCThresholdInSecondsで設定することができます。この句は任意です。 |
|
APPLY AS TRUNCATE WHEN CDCイベントがテーブルの完全な TRUNCATEとして取り扱われるべきタイミングを指定します。この句はターゲットテーブルを完全にtruncateするので、この機能を必要とする特定のユースケースでのみ使用すべきです。APPLY AS TRUNCATE WHEN句はSCDタイプ1でのみサポートされます。SCDタイプ2ではtruncateをサポートしていません。この句は任意です。 |
|
SEQUENCE BY ソースデータにおけるCDCイベントの論理的順序を指定するカラム名です。Delta Live Tablesは、out-of-orderな変更イベントを取り扱うために、この順序を使用します。 この句は必須です。 |
|
COLUMNS ターゲットテーブルに含めるカラムのサブセットを指定します。以下のいずれかが可能です。
デフォルトでは、 COLUMNS句が指定されない場合、ターゲットテーブルには全てのカラムが含まれます。 |
|
STORED AS レコードをSCDタイプ1あるいはSCDタイプ2で格納するのかを指定します。 この句は任意です。 デフォルトはSCDタイプ1です。 |
INSERT、UPDATEイベントのデフォルトの挙動は、ソースからのCDCイベントをupsertするというものです:指定されたキーにマッチするターゲットテーブルの行を更新する、あるいはターゲットテーブルにマッチする行がない場合には行を挿入します。DELETEイベントの挙動をAPPLY AS DELETE WHEN条件で指定することも可能です。
Python
Delta Live Tables CDC機能を使うためには、Python APIのapply_changes()関数を使用します。また、Delta Live TablesのPython CDCインタフェースは
Apply changes関数
apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>
)
| 引数 |
|---|
|
target Type: str更新されるテーブル名です。 このパラメーターは必須です。 |
|
source Type: strCDCレコードを含むデータソースです。 このパラメーターは必須です。 |
|
keys Type: listソースデータで行を一意に特定するカラム、あるいはカラムの組み合わせです。これは、ターゲットテーブルの特定のレコードに適用されるCDCイベントの特定に使用されます。 以下のいずれかの方法で指定できます:
col()関数の引数にはクオリファイアを含めることはできません。例えば、col(userId)は使えますが、col(source.userId)は使えません。このパラメーターは必須です。 |
|
sequence_by Type: strあるいはcol()ソースデータにおけるCDCイベントの論理的順序を指定するカラム名です。Delta Live Tablesは、out-of-orderな変更イベントを取り扱うために、この順序を使用します。 以下のいずれかの方法で指定できます:
col()関数の引数にはクオリファイアを含めることはできません。例えば、col(userId)は使えますが、col(source.userId)は使えません。このパラメーターは必須です。 |
|
ignore_null_updates Type: boolターゲットカラムのサブセットに対する取り込み更新を許可します。CDCイベントが既存のレコードと合致し、 ignore_null_updatesがTrueの場合、nullを含むカラムは、ターゲットにおける既存の値を維持します。これはnull値を持つネストされたカラムにも適用されます。ignore_null_updatesがFalseの場合、既存の値はnullで上書きされます。このパラメーターは任意です。 デフォルトは Falseです。 |
|
apply_as_delete Type: strあるいはexpr()CDCイベントがupsertではなく DELETEとして取り扱われるべきタイミングを指定します。out-of-orderのデータを取り扱うために、削除されたレコードは背後のDeltaテーブルで一時的にtombstoneとして保持され、これらのtombstoneを表示するビューがメタストアに作成されます。保持期間はテーブルプロパティのpipelines.cdc.tombstoneGCThresholdInSecondsで設定することができます。以下のいずれかの方法で指定できます。
|
|
apply_as_truncates Type: strあるいはexpr()CDCイベントがテーブルの完全な TRUNCATEとして取り扱われるべきタイミングを指定します。この句はターゲットテーブルを完全にtruncateするので、この機能を必要とする特定のユースケースでのみ使用すべきです。apply_as_truncatesパラメータはSCDタイプ1でのみサポートされます。SCDタイプ2ではtruncateをサポートしていません。以下のいずれかの方法で指定できます。
|
|
column_list except_column_list Type: listターゲットテーブルに含めるカラムのサブセットを指定します。含めるカラムの完全なリストを指定するには column_listを使用します。除外するカラムを指定するにはexcept_column_listを使用します。文字列のリストあるいはSpark SQLのcol()関数で値を宣言することができます
col("sequenceNum")col()関数の引数にはクオリファイアを含めることはできません。例えば、col(userId)は使えますが、col(source.userId)は使えません。このパラメーターは任意です。 column_listやexcept_column_list引数が関数に渡されていない場合、デフォルトではターゲットテーブルの全てのカラムが含まれます。 |
|
stored_as_scd_type Type: strかintレコードをSCDタイプ1あるいはSCDタイプ2で格納するのかを指定します。 SCDタイプ1では 1、SCDタイプ2では2を指定します。この句は任意です。 デフォルトはSCDタイプ1です。 |
INSERT、UPDATEイベントのデフォルトの挙動は、ソースからのCDCイベントをupsertするというものです:指定されたキーにマッチするターゲットテーブルの行を更新する、あるいはターゲットテーブルにマッチする行がない場合には行を挿入します。DELETEイベントの挙動を引数apply_as_deleteで指定することも可能です。
出力レコードのためのターゲットテーブルの作成
apply_changes()の出力レコードのためのターゲットテーブルを作成するためにcreate_streaming_live_table()関数を使用します。
注意
create_target_table()関数は非推奨となります。既存のコードではcreate_streaming_live_table()を使用するように更新することをお勧めします。
create_streaming_live_table(
name = "<table-name>",
comment = "<comment>"
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
partition_cols=["<partition-column>", "<partition-column>"],
path="<storage-location-path>",
schema="schema-definition"
)
| 引数 |
|---|
|
name Type: strテーブル名 このパラメータは必須です。 |
|
comment Type: strテーブルに対するオプションのコメント。 |
|
spark_conf Type: dictこのクエリーの実行におけるSpark設定のリスト。オプション。 |
|
table_properties Type: dictテーブルのプロパティのリスト。オプション。 |
|
partition_cols Type: arrayテーブルをパーティショニングするための1つ以上のカラムのリスト。オプション。 |
|
path Type: strテーブルデータの格納場所。オプション。指定されない場合、パイプラインのストレージ場所をデフォルトとします。 |
|
schema Type: strテーブルのスキーマ定義。オプション。スキーマはSQL DDL文字列、Pythonの StructTypeで定義することができます。 |
apply_changesのターゲットテーブルのスキーマを指定する際、sequence_byフィールドと同じデータ型を__START_ATと__END_ATカラムに指定して含める必要があります。例えば、ターゲットテーブルにkey, STRING, value, STRING sequencing, LONGのカラムがある場合、以下のようになります。
create_streaming_live_table(
name = "target",
comment = "Target for CDC ingestion.",
partition_cols=["value"],
path="$tablePath",
schema=
StructType(
[
StructField('key', StringType()),
StructField('value', StringType()),
StructField('sequencing', LongType()),
StructField('__START_AT', LongType()),
StructField('__END_AT', LongType())
]
)
)
注意
-
APPLY CHANGES INTOクエリーやapply_changes関数を実行する前に、ターゲットテーブルが作成されていることを確認してください。サンプルクエリーを参照ください。 - 出力行数などターゲットテーブルのメトリクスは利用できません。
- SCDタイプ2は、カラムに変更がなくても入力行ごとに履歴を追加します。
-
APPLY CHANGES INTOのクエリーやapply_changes関数のターゲットは、ストリーミングライブテーブルのソースとして使用することはできません。APPLY CHANGES INTOのクエリーやapply_changes関数のターゲットから読み込むテーブルはライブテーブルでなくてはいけません。 -
APPLY CHANGES INTOクエリーやapply_changes関数ではエクスペクテーションはサポートされていません。ソース、ターゲットデータセットにエクスペクテーションを適用するには、以下の手順を踏んでください。- 必要なエクスペクテーションを持つ中間テーブルを定義することでソースデータにエクスペクテーションを追加し、このデータセットをターゲットテーブルに対するソースとします。
- ターゲットテーブルから入力データを読み込む下流のテーブルを用いてターゲットデータに対するエクスペクテーションを追加します。
テーブルプロパティ
以下のテーブルプロパティは、DELETEイベントにおけるtombstone管理を制御するために追加されています。
| テーブルプロパティ |
|---|
|
pipelines.cdc.tombstoneGCThresholdInSeconds out-of-orderのデータに期待する最大の間隔にマッチする値を設定します。 |
|
pipelines.cdc.tombstoneGCFrequencyInSeconds tombstoneとチェックされたデータのクリーンアップの頻度を指定します。 デフォルト:5分 |
サンプル
このサンプルでは、以下のソースイベントに基づいてターゲットテーブルをアップデートするDelta Live TablesのSCDタイプ1とタイプ2のクエリーをデモします。
- 新規ユーザーのレコードを作成
- ユーザーレコードを削除
- ユーザーレコードを更新。SCDタイプ1のサンプルでは、out-of-orderのイベントの取り扱いをデモするために、最後の
UPDATEオペレーションは遅れて到着したので、ターゲットテーブルからは除外されます。
| userId | name | city | operation | sequenceNum |
|---|---|---|---|---|
| 124 | Raul | Oaxaca | INSERT | 1 |
| 123 | Isabel | Monterrey | INSERT | 1 |
| 125 | Mercedes | Tijuana | INSERT | 2 |
| 126 | Lily | Cancun | INSERT | 2 |
| 123 | null | null | DELETE | 6 |
| 125 | Mercedes | Guadalajara | UPDATE | 6 |
| 125 | Mercedes | Mexicali | UPDATE | 5 |
| 123 | Isabel | Chihuahua | UPDATE | 5 |
このSCDタイプ1のサンプルのクエリーを実行した後では、ターゲットテーブルには以下のレコードが含まれることになります。
| userId | name | city |
|---|---|---|
| 124 | Raul | Oaxaca |
| 125 | Mercedes | Guadalajara |
| 126 | Lily | Cancun |
以下の入力レコードには、TRUNCATEオペレーションを伴う追加レコードが含まれており、SCDタイプ1のサンプルコードで使用することができます。
| userId | name | city | operation | sequenceNum |
|---|---|---|---|---|
| 124 | Raul | Oaxaca | INSERT | 1 |
| 123 | Isabel | Monterrey | INSERT | 1 |
| 125 | Mercedes | Tijuana | INSERT | 2 |
| 126 | Lily | Cancun | INSERT | 2 |
| 123 | null | null | DELETE | 6 |
| 125 | Mercedes | Guadalajara | UPDATE | 6 |
| 125 | Mercedes | Mexicali | UPDATE | 5 |
| 123 | Isabel | Chihuahua | UPDATE | 5 |
| null | null | null | TRUNCATE | 3 |
追加のTRUNCATEレコードを伴うSCDタイプ1のサンプルの実行後は、TRUNCATEオペレーションがsequenceNum=3であるため、レコード124と125は切り取られ、ターゲットテーブルには以下のレコードが含まれることになります。
| userId | name | city |
|---|---|---|
| 125 | Mercedes | Guadalajara |
SCDタイプ2のサンプルの実行後は、ターゲットテーブルは以下のようになります。
| userId | name | city | __START_AT | __END_AT |
|---|---|---|---|---|
| 123 | Isabel | Monterrey | 1 | 5 |
| 123 | Isabel | Chihuahua | 5 | 6 |
| 123 | Raul | Oaxaca | 1 | null |
| 123 | Mercedes | Tijuana | 2 | 5 |
| 123 | Mercedes | Mexicali | 5 | 6 |
| 123 | Mercedes | Guadalajara | 6 | null |
| 123 | Lily | Cancun | 2 | null |
テストデータの生成
このサンプルのテストレコードを生成するには、以下の手順を踏みます。
-
Databricksワークスペースに移動し、Create a notebookを選択するか、サイドバーの
Createをクリックし、メニューからNotebookを選択します。Create Notebookダイアログが表示されます。 -
Create Notebookダイアログでは、Generate test CDC recordsのようなノートブック名を入力します。Default LanguageドロップダウンメニューからSQLを選択します。
-
稼働中のクラスターがある場合には、Clusterドロップダウンが表示されます。ノートブックをアタッチしたいクラスターを選択します。ノートブックを作成した後に新規クラスターを作成しアタッチすることができます。
-
Createをクリックします。
-
ノートブックの最初のセルに以下のクエリーを貼り付けます。
SQLCREATE SCHEMA IF NOT EXISTS cdc_data; CREATE TABLE cdc_data.users AS SELECT col1 AS userId, col2 AS name, col3 AS city, col4 AS operation, col5 AS sequenceNum FROM ( VALUES -- Initial load. (124, "Raul", "Oaxaca", "INSERT", 1), (123, "Isabel", "Monterrey", "INSERT", 1), -- New users. (125, "Mercedes", "Tijuana", "INSERT", 2), (126, "Lily", "Cancun", "INSERT", 2), -- Isabel is removed from the system and Mercedes moved to Guadalajara. (123, null, null, "DELETE", 6), (125, "Mercedes", "Guadalajara", "UPDATE", 6), -- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state. (125, "Mercedes", "Mexicali", "UPDATE", 5), (123, "Isabel", "Chihuahua", "UPDATE", 5) -- Uncomment to test TRUNCATE. -- ,(null, null, null, "TRUNCATE", 3) ); -
ノートブックを実行し、レコードを生成するには、右端にあるセルのアクションメニュー
から
をクリックし、Run Cellを選択するか、shift+enterを押下します。
SCDタイプ1のサンプルパイプラインを作成し実行する
- Databricksワークスペースに移動し、Create a notebookを選択するか、サイドバーの
Createをクリックし、メニューからNotebookを選択します。Create Notebookダイアログが表示されます。 - Create Notebookダイアログでは、DLT CDC exampleのようなノートブック名を入力します。使う言語に基づいて、Default LanguageドロップダウンメニューからPythonかSQLを選択します。Clusterはデフォルト値のままでかまいません。Delta Live Tablesランタイムはパイプラインを実行する前にクラスターを作成します。
- Createをクリックします。
- ノートブックの最初のセルにPythonかSQLクエリーを貼り付けます。
- パイプラインを作成し、Notebook Librariesフィールドにノートブックを追加します。パイプライン処理の出力を公開するには、オプションとしてTargetフィールドにデータベース名を入力します。
- パイプラインをスタートします。Targetを設定した場合は、クエリーの結果を参照し確認することができます。
サンプルクエリー
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.format("delta").table("cdc_data.users")
dlt.create_streaming_live_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING LIVE TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
SCDタイプ2のサンプルパイプラインを作成し実行する
- Databricksワークスペースに移動し、Create a notebookを選択するか、サイドバーの
Createをクリックし、メニューからNotebookを選択します。Create Notebookダイアログが表示されます。 - Create Notebookダイアログでは、DLT CDC exampleのようなノートブック名を入力します。使う言語に基づいて、Default LanguageドロップダウンメニューからPythonかSQLを選択します。Clusterはデフォルト値のままでかまいません。Delta Live Tablesランタイムはパイプラインを実行する前にクラスターを作成します。
- Createをクリックします。
- ノートブックの最初のセルにPythonかSQLクエリーを貼り付けます。
- パイプラインを作成し、Notebook Librariesフィールドにノートブックを追加します。パイプライン処理の出力を公開するには、オプションとしてTargetフィールドにデータベース名を入力します。
- パイプラインをスタートします。Targetを設定した場合は、クエリーの結果を参照し確認することができます。
サンプルクエリー
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.format("delta").table("cdc_data.users")
dlt.create_streaming_live_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING LIVE TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;