2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Delta Lakeのチェンジデータフィード

Last updated at Posted at 2022-06-04

Change data feed | Databricks on AWS [2022/5/9時点]の翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

注意

  • チェンジデータフィードはDatabricksランタイム8.4以降で使用できます。
  • 本書ではチェンジデータフィードの機能を用いたDeltaテーブルの行レベルの変更情報をどのように記録しクエリーするのかを説明します。ソースデータの変更に基づくDelta Live Tablesにおけるテーブルの更新方法については、Delta Live Tablesによるチェンジデータキャプチャ(CDC)をご覧ください。

Deltaのチェンジデータフィードは、Deltaテーブルのバージョン間の行レベルの変更を表現します。Deltaテーブルで有効化すると、ランタイムはテーブルに書き込まれる全てのデータの「変更イベント」を記録します。これには、特定の行がインサート、削除、更新されたのかを示すメタデータと行のデータが含まれます。

SQLあるいはデータフレームAPI(df.read)を用いたバッチクエリー、データフレームAPI(df.readStream)を用いたストリーミングクエリーで変更イベントを読み込むことができます。

ユースケース

Deltaのチェンジデータフィードはデフォルトでは有効化されていません。以下のようなユースケースの場合、チェンジデータフィードを有効化すべきです。

  • シルバー、ゴールドテーブル: 高速化するために、初回のMERGEUPDATEDELETEオペレーション以降の行レベルの変更のみを処理することでDeltaの性能を改善し、ETL、ELTオペレーションをシンプルなものにします。
  • マテリアライズドビュー: 背後のテーブル全てを再処理する必要なしに、行われた変更のみを更新することで、BIや分析で使用する最新かつ集計された情報のビューを作成します。
  • 変更の送信: データパイプラインの後のステージでインクリメンタルに処理を行えるように、KafkaやRDBMSのような後段のシステムにチェンジデータフィードを送信します。
  • 監査証跡テーブル: Deltaテーブルはいつ削除が行われたのか、どのような変更が行われたのかを含む全ての変更の時間変化を確認するために、永続的なストレージと効率的なクエリー機能を提供するので、チェンジデータフィードを捕捉します。

チェンジデータフィードを有効化する

以下の方法のいずれかを用いて、チェンジデータフィードのオプションを明示的に有効化する必要があります。

  • 新規テーブル: CREATE TABLEコマンドで、テーブルプロパティdelta.enableChangeDataFeed = trueを設定します。

    SQL
    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • 既存テーブル: ALTER TABLEコマンドで、テーブルプロパティdelta.enableChangeDataFeed = trueを設定します。

    SQL
    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • 全ての新規テーブル

    SQL
    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

重要!

  • テーブルのチェンジデータフィードオプションを有効化すると、Databricksランタイム8.1以下を用いてテーブルに書き込みを行うことはできなくなります。読み込みは行えます。
  • チェンジデータフィードを有効化した後の変更のみが記録されます。テーブルの過去の変更は捕捉されません。

チェンジデータのストレージ

DatabricksはDeltaテーブルのディレクトリの中の_change_dataフォルダの中に、UPDATEDELETEMERGEオペレーションの変更データを記録します。Databricksが、トランザクションログから直接チェンジデータフィードを効率的に計算できると検知した場合には、この記録がスキップされる場合があります。特に、INSERTのみのオペレーションやパーティション全体の削除では、_change_dataディレクトリの中にデータを生成しません。

_change_dataフォルダの中のファイルは、テーブルの保持ポリシーに従います。そのため、VACUUMコマンドはチェンジデータフィードのデータも削除します。

バッチクエリーにおける変更の読み込み

バージョンあるいはタイムスタンプの開始と終了を指定することができます。バージョンあるいはタイムスタンプの開始と終了はクエリーに含まれます。特定の開始バージョンから最新のテーブルバージョンの間の変更を読み込む際には、開始バージョン、タイムスタンプのみを指定します。

バージョンには整数値、タイムスタンプにはyyyyMMddHHmmssSSSのフォーマットの文字列を指定することができます。

記録されている変更イベント、すなわちチェンジデータフィードが有効化された時点のものよりも低いバージョン、古いタイムスタンプを指定した場合、チェンジデータフィードが有効化されていない旨を伝えるエラーが発生します。

SQL
-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

-- path based tables
SELECT * FROM table_changes_by_path('\path', '2021-04-21 05:45:46')
Python
# version as ints or longs
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")


# path based tables
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .load("pathToMyDeltaTable")
Scala
# version as ints or longs
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")


# path based tables
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .load("pathToMyDeltaTable")

ストリーミングクエリーにおける変更の読み込み

Python
# providing a starting version
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

# providing a starting timestamp
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", "2021-04-21 05:35:43") \
  .load("/pathToMyDeltaTable")

# not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .table("myDeltaTable")
Scala
// providing a starting version
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

// providing a starting timestamp
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", "2021-04-21 05:35:43")
  .load("/pathToMyDeltaTable")

// not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

テーブルを読み込んでいる間にチェンジデータを取得するには、オプションreadChangeFeedtrueに設定します。startingVersionstartingTimestampはオプションであり、指定されない場合、ストリームはストリーミング処理時点のテーブルの最新のスナップショットをINSERT、それ以降の変更をチェンジデータとして返却します。チェンジデータを読み込み際に、レートリミット(maxFilesPerTrigger, maxBytesPerTrigger)のようなオプションやexcludeRegexもサポートされています。

注意
レートリミットは、開始スナップショットバージョン以外のバージョンに対して原子的なものとなります。すなわち、コミットバージョン全体にレートリミットが適用されるか、コミット全体が返却されます。

チェンジデータイベントのスキーマ

データカラムに加え、チェンジデータには変更イベントのタイプを特定するメタデータのカラムが含まれます。

カラム名 タイプ
_change_type String insert, update_preimage, update_postimage, delete (1)
_commit_version Long 変更を含むDeltaログあるいはテーブルのバージョン
_commit_timestamp Timestamp コミットが生成された時点のタイムスタンプ

(1) preimageはアップデート前の値で、postimageはアップデート後の値となります。

FAQ

チェンジデータフィードを有効化することによるオーバーヘッドは何ですか?

大きなインパクトはありません。チェンジデータのレコードは、クエリー実行プロセスの間に生成され、これは一般的に書き込まれるファイルの全体サイズよりもはるかに小さいものとなります。

変更レコードの保持ポリシーは何ですか?

チェンジレコードは、古いテーブルバージョンに対するものと同じ保持ポリシーに従いますので、特定の保持期間の外にある場合、VACUUMでクリーンアップされます。

新規レコードはいつチェンジデータフィードで利用できるようになりますか?

変更データはDelta Lakeのトンランザクションと一緒にコミットされるので、テーブルで新規データが利用できるようになるのと同じタイミングで利用できるようになります。

ノートブック

チェンジデータフィードのノートブック

上のノートブックをウォークスルーした記事です。

Databricks 無料トライアル

Databricks 無料トライアル

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?