LoginSignup
1
0

More than 1 year has passed since last update.

Delta Live Tablesによるチェンジデータキャプチャ(CDC)

Last updated at Posted at 2022-02-10

Change data capture with Delta Live Tables | Databricks on AWS [2022/8/11時点]の翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

注意
本書では、ソースデータの変更に基づいて、どのようにDelta Live Tablesのパイプラインをアップデートするのかを説明します。Deltaテーブルにおける行レベルの変更を記録しクエリーする方法については、チェンジデータフィードをご覧ください。

プレビュー
Delta Live TablesにおけるSCD type 2サポートはPublic Previewです。

ソースデータの変更に基づいてテーブルをアップデートするために、Delta Live Tablesでチェンジデータキャプチャ(CDC)を利用することができます。CDCはDelta Live TablesのSQLおよびPythonインタフェースでサポートされています。また、Delta Live Tablesでは、slowly changing dimensions (SCD)のタイプ1とタイプ2をサポートしています。

  • レコードを直接更新するにはSCDタイプ1を使用します。更新されたレコードに対する履歴は保持されません。
  • レコードのすべての更新履歴をを保持するにはSCDタイプ2を使用します。

変更の有効期間を表現するために、SCDタイプ2では、すべての変更を生成された__START_AT__END_ATカラムと共に格納します。__START_AT__END_ATカラムを生成するために、Delta Live TablesはSQLではSEQUENCE BY、Pythonではsequence_byで指定されるカラムを使用します。

注意
__START_AT__END_ATカラムのデータ型はSEQUENCE BYフィールドで指定されたデータ型と同じものになります。

SQL

Delta Live Tables CDCの機能を使用するためには、APPLY CHANGES INTO文を使用します。

SQL
APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[WHERE condition]
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
KEYS

ソースデータで行を一意に特定するカラム、あるいはカラムの組み合わせです。これは、ターゲットテーブルの特定のレコードに適用されるCDCイベントの特定に使用されます。

この句は必須です。
WHERE

パーティションプルーニングのような最適化処理を起動するために、ソース、ターゲットテーブルの両方に適用される条件です。この条件はソースのレコードを削除することに使用することはできません。ソーステーブルのすべてのCDCレコードはこの条件を満足する必要があり、そうでない場合にはエラーが発生します。WHERE句の使用は任意であり、お使いの処理で特定の最適化が必要な場合に使用されるべきです。

この句は任意です。
IGNORE NULL UPDATES

ターゲットカラムのサブセットに対する取り込み更新を許可します。CDCイベントが既存のレコードと合致し、IGNORE NULL UPDATESが指定されている場合、nullを含むカラムは、ターゲットにおける既存の値を維持します。これはnull値を持つネストされたカラムにも適用されます。

この句は任意です。

デフォルトでは、null値を持つ既存カラムを上書きします。
APPLY AS DELETE WHEN

CDCイベントがupsertではなくDELETEとして取り扱われるべきタイミングを指定します。out-of-orderのデータを取り扱うために、削除されたレコードは背後のDeltaテーブルで一時的にtombstoneとして保持され、これらのtombstoneを表示するビューがメタストアに作成されます。保持期間はテーブルプロパティpipelines.cdc.tombstoneGCThresholdInSecondsで設定することができます。

この句は任意です。
APPLY AS TRUNCATE WHEN

CDCイベントがテーブルの完全なTRUNCATEとして取り扱われるべきタイミングを指定します。この句はターゲットテーブルを完全にtruncateするので、この機能を必要とする特定のユースケースでのみ使用すべきです。

APPLY AS TRUNCATE WHEN句はSCDタイプ1でのみサポートされます。SCDタイプ2ではtruncateをサポートしていません。

この句は任意です。
SEQUENCE BY

ソースデータにおけるCDCイベントの論理的順序を指定するカラム名です。Delta Live Tablesは、out-of-orderな変更イベントを取り扱うために、この順序を使用します。

この句は必須です。
COLUMNS

ターゲットテーブルに含めるカラムのサブセットを指定します。以下のいずれかが可能です。
  • 含めるカラムの完全なリストを指定:COLUMNS (userId, name, city)
  • 除外するカラムのリストを指定:COLUMNS * EXCEPT (operation, sequenceNum)
この句は任意です。

デフォルトでは、COLUMNS句が指定されない場合、ターゲットテーブルには全てのカラムが含まれます。
STORED AS

レコードをSCDタイプ1あるいはSCDタイプ2で格納するのかを指定します。

この句は任意です。

デフォルトはSCDタイプ1です。

INSERTUPDATEイベントのデフォルトの挙動は、ソースからのCDCイベントをupsertするというものです:指定されたキーにマッチするターゲットテーブルの行を更新する、あるいはターゲットテーブルにマッチする行がない場合には行を挿入します。DELETEイベントの挙動をAPPLY AS DELETE WHEN条件で指定することも可能です。

Python

Delta Live Tables CDC機能を使うためには、Python APIのapply_changes()関数を使用します。また、Delta Live TablesのPython CDCインタフェースは

Apply changes関数

Python
apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>
)
引数
target
Type:str
更新されるテーブル名です。
このパラメーターは必須です。
source
Type:str
CDCレコードを含むデータソースです。
このパラメーターは必須です。
keys
Type:list
ソースデータで行を一意に特定するカラム、あるいはカラムの組み合わせです。これは、ターゲットテーブルの特定のレコードに適用されるCDCイベントの特定に使用されます。

以下のいずれかの方法で指定できます:
  • 文字列のリスト:["userId", "orderId"]
  • Spark SQLのcol関数のリスト:[col("userId"), col("orderId"]
col()関数の引数にはクオリファイアを含めることはできません。例えば、col(userId)は使えますが、col(source.userId)は使えません。

このパラメーターは必須です。
sequence_by
Type:strあるいはcol()
ソースデータにおけるCDCイベントの論理的順序を指定するカラム名です。Delta Live Tablesは、out-of-orderな変更イベントを取り扱うために、この順序を使用します。

以下のいずれかの方法で指定できます:
  • 文字列:"sequenceNum"
  • Spark SQLのcol関数:col("sequenceNum")
col()関数の引数にはクオリファイアを含めることはできません。例えば、col(userId)は使えますが、col(source.userId)は使えません。

このパラメーターは必須です。
ignore_null_updates
Type:bool
ターゲットカラムのサブセットに対する取り込み更新を許可します。CDCイベントが既存のレコードと合致し、ignore_null_updatesTrueの場合、nullを含むカラムは、ターゲットにおける既存の値を維持します。これはnull値を持つネストされたカラムにも適用されます。ignore_null_updatesFalseの場合、既存の値はnullで上書きされます。

このパラメーターは任意です。

デフォルトはFalseです。
apply_as_delete
Type:strあるいはexpr()
CDCイベントがupsertではなくDELETEとして取り扱われるべきタイミングを指定します。out-of-orderのデータを取り扱うために、削除されたレコードは背後のDeltaテーブルで一時的にtombstoneとして保持され、これらのtombstoneを表示するビューがメタストアに作成されます。保持期間はテーブルプロパティpipelines.cdc.tombstoneGCThresholdInSecondsで設定することができます。

以下のいずれかの方法で指定できます。
  • 文字列:"Operation = 'DELETE'"
  • Spark SQLのexpr()関数:expr("Operation = 'DELETE'")
このパラメーターは任意です。
apply_as_truncates
Type:strあるいはexpr()
CDCイベントがテーブルの完全なTRUNCATEとして取り扱われるべきタイミングを指定します。この句はターゲットテーブルを完全にtruncateするので、この機能を必要とする特定のユースケースでのみ使用すべきです。

apply_as_truncatesパラメータはSCDタイプ1でのみサポートされます。SCDタイプ2ではtruncateをサポートしていません。

以下のいずれかの方法で指定できます。
  • 文字列:"Operation = 'TRUNCATE'"
  • Spark SQLのexpr()関数:expr("Operation = 'TRUNCATE'")
このパラメーターは任意です。
column_list except_column_list
Type:list
ターゲットテーブルに含めるカラムのサブセットを指定します。含めるカラムの完全なリストを指定するにはcolumn_listを使用します。除外するカラムを指定するにはexcept_column_listを使用します。文字列のリストあるいはSpark SQLのcol()関数で値を宣言することができます
  • column_list = ["userId", "name", "city"]
  • column_list = [col("userId"), col("name"), col("city")]
  • except_column_list = ["operation", "sequenceNum"]
  • except_column_list = [col("operation"), col("sequenceNum")
col("sequenceNum")col()関数の引数にはクオリファイアを含めることはできません。例えば、col(userId)は使えますが、col(source.userId)は使えません。

このパラメーターは任意です。

column_listexcept_column_list引数が関数に渡されていない場合、デフォルトではターゲットテーブルの全てのカラムが含まれます。
stored_as_scd_type
Type:strint
レコードをSCDタイプ1あるいはSCDタイプ2で格納するのかを指定します。

SCDタイプ1では1、SCDタイプ2では2を指定します。

この句は任意です。

デフォルトはSCDタイプ1です。

INSERTUPDATEイベントのデフォルトの挙動は、ソースからのCDCイベントをupsertするというものです:指定されたキーにマッチするターゲットテーブルの行を更新する、あるいはターゲットテーブルにマッチする行がない場合には行を挿入します。DELETEイベントの挙動を引数apply_as_deleteで指定することも可能です。

出力レコードのためのターゲットテーブルの作成

apply_changes()の出力レコードのためのターゲットテーブルを作成するためにcreate_streaming_live_table()関数を使用します。

注意
create_target_table()関数は非推奨となります。既存のコードではcreate_streaming_live_table()を使用するように更新することをお勧めします。

Python
create_streaming_live_table(
  name = "<table-name>",
  comment = "<comment>"
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  path="<storage-location-path>",
  schema="schema-definition"
)
引数
name
Type:str

テーブル名

このパラメータは必須です。
comment
Type:str

テーブルに対するオプションのコメント。
spark_conf
Type:dict

このクエリーの実行におけるSpark設定のリスト。オプション。
table_properties
Type:dict

テーブルのプロパティのリスト。オプション。
partition_cols
Type:array

テーブルをパーティショニングするための1つ以上のカラムのリスト。オプション。
path
Type:str

テーブルデータの格納場所。オプション。指定されない場合、パイプラインのストレージ場所をデフォルトとします。
schema
Type:str

テーブルのスキーマ定義。オプション。スキーマはSQL DDL文字列、PythonのStructTypeで定義することができます。

apply_changesのターゲットテーブルのスキーマを指定する際、sequence_byフィールドと同じデータ型を__START_AT__END_ATカラムに指定して含める必要があります。例えば、ターゲットテーブルにkey, STRING, value, STRING sequencing, LONGのカラムがある場合、以下のようになります。

Python
create_streaming_live_table(
  name = "target",
  comment = "Target for CDC ingestion.",
  partition_cols=["value"],
  path="$tablePath",
  schema=
    StructType(
      [
        StructField('key', StringType()),
        StructField('value', StringType()),
        StructField('sequencing', LongType()),
        StructField('__START_AT', LongType()),
        StructField('__END_AT', LongType())
      ]
    )
)

注意

  • APPLY CHANGES INTOクエリーやapply_changes関数を実行する前に、ターゲットテーブルが作成されていることを確認してください。サンプルクエリーを参照ください。
  • 出力行数などターゲットテーブルのメトリクスは利用できません。
  • SCDタイプ2は、カラムに変更がなくても入力行ごとに履歴を追加します。
  • APPLY CHANGES INTOのクエリーやapply_changes関数のターゲットは、ストリーミングライブテーブルのソースとして使用することはできません。APPLY CHANGES INTOのクエリーやapply_changes関数のターゲットから読み込むテーブルはライブテーブルでなくてはいけません。
  • APPLY CHANGES INTOクエリーやapply_changes関数ではエクスペクテーションはサポートされていません。ソース、ターゲットデータセットにエクスペクテーションを適用するには、以下の手順を踏んでください。
    • 必要なエクスペクテーションを持つ中間テーブルを定義することでソースデータにエクスペクテーションを追加し、このデータセットをターゲットテーブルに対するソースとします。
    • ターゲットテーブルから入力データを読み込む下流のテーブルを用いてターゲットデータに対するエクスペクテーションを追加します。

テーブルプロパティ

以下のテーブルプロパティは、DELETEイベントにおけるtombstone管理を制御するために追加されています。

テーブルプロパティ
pipelines.cdc.tombstoneGCThresholdInSeconds
out-of-orderのデータに期待する最大の間隔にマッチする値を設定します。
pipelines.cdc.tombstoneGCFrequencyInSeconds
tombstoneとチェックされたデータのクリーンアップの頻度を指定します。
デフォルト:5分

サンプル

このサンプルでは、以下のソースイベントに基づいてターゲットテーブルをアップデートするDelta Live TablesのSCDタイプ1とタイプ2のクエリーをデモします。

  1. 新規ユーザーのレコードを作成
  2. ユーザーレコードを削除
  3. ユーザーレコードを更新。SCDタイプ1のサンプルでは、out-of-orderのイベントの取り扱いをデモするために、最後のUPDATEオペレーションは遅れて到着したので、ターゲットテーブルからは除外されます。
userId name city operation sequenceNum
124 Raul Oaxaca INSERT 1
123 Isabel Monterrey INSERT 1
125 Mercedes Tijuana INSERT 2
126 Lily Cancun INSERT 2
123 null null DELETE 6
125 Mercedes Guadalajara UPDATE 6
125 Mercedes Mexicali UPDATE 5
123 Isabel Chihuahua UPDATE 5

このSCDタイプ1のサンプルのクエリーを実行した後では、ターゲットテーブルには以下のレコードが含まれることになります。

userId name city
124 Raul Oaxaca
125 Mercedes Guadalajara
126 Lily Cancun

以下の入力レコードには、TRUNCATEオペレーションを伴う追加レコードが含まれており、SCDタイプ1のサンプルコードで使用することができます。

userId name city operation sequenceNum
124 Raul Oaxaca INSERT 1
123 Isabel Monterrey INSERT 1
125 Mercedes Tijuana INSERT 2
126 Lily Cancun INSERT 2
123 null null DELETE 6
125 Mercedes Guadalajara UPDATE 6
125 Mercedes Mexicali UPDATE 5
123 Isabel Chihuahua UPDATE 5
null null null TRUNCATE 3

追加のTRUNCATEレコードを伴うSCDタイプ1のサンプルの実行後は、TRUNCATEオペレーションがsequenceNum=3であるため、レコード124125は切り取られ、ターゲットテーブルには以下のレコードが含まれることになります。

userId name city
125 Mercedes Guadalajara

SCDタイプ2のサンプルの実行後は、ターゲットテーブルは以下のようになります。

userId name city __START_AT __END_AT
123 Isabel Monterrey 1 5
123 Isabel Chihuahua 5 6
123 Raul Oaxaca 1 null
123 Mercedes Tijuana 2 5
123 Mercedes Mexicali 5 6
123 Mercedes Guadalajara 6 null
123 Lily Cancun 2 null

テストデータの生成

このサンプルのテストレコードを生成するには、以下の手順を踏みます。

  1. Databricksワークスペースに移動し、Create a notebookを選択するか、サイドバーのCreateをクリックし、メニューからNotebookを選択します。Create Notebookダイアログが表示されます。

  2. Create Notebookダイアログでは、Generate test CDC recordsのようなノートブック名を入力します。Default LanguageドロップダウンメニューからSQLを選択します。

  3. 稼働中のクラスターがある場合には、Clusterドロップダウンが表示されます。ノートブックをアタッチしたいクラスターを選択します。ノートブックを作成した後に新規クラスターを作成しアタッチすることができます。

  4. Createをクリックします。

  5. ノートブックの最初のセルに以下のクエリーを貼り付けます。

    SQL
    CREATE SCHEMA IF NOT EXISTS cdc_data;
    
    CREATE TABLE
      cdc_data.users
    AS SELECT
      col1 AS userId,
      col2 AS name,
      col3 AS city,
      col4 AS operation,
      col5 AS sequenceNum
    FROM (
      VALUES
      -- Initial load.
      (124, "Raul",     "Oaxaca",      "INSERT", 1),
      (123, "Isabel",   "Monterrey",   "INSERT", 1),
      -- New users.
      (125, "Mercedes", "Tijuana",     "INSERT", 2),
      (126, "Lily",     "Cancun",      "INSERT", 2),
      -- Isabel is removed from the system and Mercedes moved to Guadalajara.
      (123, null,       null,          "DELETE", 6),
      (125, "Mercedes", "Guadalajara", "UPDATE", 6),
      -- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
      (125, "Mercedes", "Mexicali",    "UPDATE", 5),
      (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
      -- Uncomment to test TRUNCATE.
      -- ,(null, null,      null,          "TRUNCATE", 3)
    );
    
  6. ノートブックを実行し、レコードを生成するには、右端にあるセルのアクションメニューからをクリックし、Run Cellを選択するか、shift+enterを押下します。

SCDタイプ1のサンプルパイプラインを作成し実行する

  1. Databricksワークスペースに移動し、Create a notebookを選択するか、サイドバーのCreateをクリックし、メニューからNotebookを選択します。Create Notebookダイアログが表示されます。
  2. Create Notebookダイアログでは、DLT CDC exampleのようなノートブック名を入力します。使う言語に基づいて、Default LanguageドロップダウンメニューからPythonSQLを選択します。Clusterはデフォルト値のままでかまいません。Delta Live Tablesランタイムはパイプラインを実行する前にクラスターを作成します。
  3. Createをクリックします。
  4. ノートブックの最初のセルにPythonかSQLクエリーを貼り付けます。
  5. パイプラインを作成し、Notebook Librariesフィールドにノートブックを追加します。パイプライン処理の出力を公開するには、オプションとしてTargetフィールドにデータベース名を入力します。
  6. パイプラインをスタートします。Targetを設定した場合は、クエリーの結果を参照し確認することができます。

サンプルクエリー

Python
import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_live_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING LIVE TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

SCDタイプ2のサンプルパイプラインを作成し実行する

  1. Databricksワークスペースに移動し、Create a notebookを選択するか、サイドバーのCreateをクリックし、メニューからNotebookを選択します。Create Notebookダイアログが表示されます。
  2. Create Notebookダイアログでは、DLT CDC exampleのようなノートブック名を入力します。使う言語に基づいて、Default LanguageドロップダウンメニューからPythonSQLを選択します。Clusterはデフォルト値のままでかまいません。Delta Live Tablesランタイムはパイプラインを実行する前にクラスターを作成します。
  3. Createをクリックします。
  4. ノートブックの最初のセルにPythonかSQLクエリーを貼り付けます。
  5. パイプラインを作成し、Notebook Librariesフィールドにノートブックを追加します。パイプライン処理の出力を公開するには、オプションとしてTargetフィールドにデータベース名を入力します。
  6. パイプラインをスタートします。Targetを設定した場合は、クエリーの結果を参照し確認することができます。

サンプルクエリー

Python
import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_live_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING LIVE TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

Databricks 無料トライアル

Databricks 無料トライアル

1
0
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0