Change data feed | Databricks on AWS [2022/5/9時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
注意
- チェンジデータフィードはDatabricksランタイム8.4以降で使用できます。
- 本書ではチェンジデータフィードの機能を用いたDeltaテーブルの行レベルの変更情報をどのように記録しクエリーするのかを説明します。ソースデータの変更に基づくDelta Live Tablesにおけるテーブルの更新方法については、Delta Live Tablesによるチェンジデータキャプチャ(CDC)をご覧ください。
Deltaのチェンジデータフィードは、Deltaテーブルのバージョン間の行レベルの変更を表現します。Deltaテーブルで有効化すると、ランタイムはテーブルに書き込まれる全てのデータの「変更イベント」を記録します。これには、特定の行がインサート、削除、更新されたのかを示すメタデータと行のデータが含まれます。
SQLあるいはデータフレームAPI(df.read
)を用いたバッチクエリー、データフレームAPI(df.readStream
)を用いたストリーミングクエリーで変更イベントを読み込むことができます。
ユースケース
Deltaのチェンジデータフィードはデフォルトでは有効化されていません。以下のようなユースケースの場合、チェンジデータフィードを有効化すべきです。
-
シルバー、ゴールドテーブル: 高速化するために、初回の
MERGE
、UPDATE
、DELETE
オペレーション以降の行レベルの変更のみを処理することでDeltaの性能を改善し、ETL、ELTオペレーションをシンプルなものにします。 - マテリアライズドビュー: 背後のテーブル全てを再処理する必要なしに、行われた変更のみを更新することで、BIや分析で使用する最新かつ集計された情報のビューを作成します。
- 変更の送信: データパイプラインの後のステージでインクリメンタルに処理を行えるように、KafkaやRDBMSのような後段のシステムにチェンジデータフィードを送信します。
- 監査証跡テーブル: Deltaテーブルはいつ削除が行われたのか、どのような変更が行われたのかを含む全ての変更の時間変化を確認するために、永続的なストレージと効率的なクエリー機能を提供するので、チェンジデータフィードを捕捉します。
チェンジデータフィードを有効化する
以下の方法のいずれかを用いて、チェンジデータフィードのオプションを明示的に有効化する必要があります。
-
新規テーブル:
CREATE TABLE
コマンドで、テーブルプロパティdelta.enableChangeDataFeed = true
を設定します。SQLCREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
-
既存テーブル:
ALTER TABLE
コマンドで、テーブルプロパティdelta.enableChangeDataFeed = true
を設定します。SQLALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
-
全ての新規テーブル
SQLset spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
重要!
- テーブルのチェンジデータフィードオプションを有効化すると、Databricksランタイム8.1以下を用いてテーブルに書き込みを行うことはできなくなります。読み込みは行えます。
- チェンジデータフィードを有効化した後の変更のみが記録されます。テーブルの過去の変更は捕捉されません。
チェンジデータのストレージ
DatabricksはDeltaテーブルのディレクトリの中の_change_data
フォルダの中に、UPDATE
、DELETE
、MERGE
オペレーションの変更データを記録します。Databricksが、トランザクションログから直接チェンジデータフィードを効率的に計算できると検知した場合には、この記録がスキップされる場合があります。特に、INSERTのみのオペレーションやパーティション全体の削除では、_change_data
ディレクトリの中にデータを生成しません。
_change_data
フォルダの中のファイルは、テーブルの保持ポリシーに従います。そのため、VACUUMコマンドはチェンジデータフィードのデータも削除します。
バッチクエリーにおける変更の読み込み
バージョンあるいはタイムスタンプの開始と終了を指定することができます。バージョンあるいはタイムスタンプの開始と終了はクエリーに含まれます。特定の開始バージョンから最新のテーブルバージョンの間の変更を読み込む際には、開始バージョン、タイムスタンプのみを指定します。
バージョンには整数値、タイムスタンプにはyyyyMMddHHmmssSSS
のフォーマットの文字列を指定することができます。
記録されている変更イベント、すなわちチェンジデータフィードが有効化された時点のものよりも低いバージョン、古いタイムスタンプを指定した場合、チェンジデータフィードが有効化されていない旨を伝えるエラーが発生します。
-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)
-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')
-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)
-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')
-- path based tables
SELECT * FROM table_changes_by_path('\path', '2021-04-21 05:45:46')
# version as ints or longs
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.option("endingVersion", 10) \
.table("myDeltaTable")
# timestamps as formatted timestamp
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.option("endingTimestamp", '2021-05-21 12:00:00') \
.table("myDeltaTable")
# providing only the startingVersion/timestamp
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
# path based tables
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.load("pathToMyDeltaTable")
# version as ints or longs
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.option("endingVersion", 10) \
.table("myDeltaTable")
# timestamps as formatted timestamp
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.option("endingTimestamp", '2021-05-21 12:00:00') \
.table("myDeltaTable")
# providing only the startingVersion/timestamp
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
# path based tables
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.load("pathToMyDeltaTable")
ストリーミングクエリーにおける変更の読み込み
# providing a starting version
spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
# providing a starting timestamp
spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.option("startingTimestamp", "2021-04-21 05:35:43") \
.load("/pathToMyDeltaTable")
# not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.table("myDeltaTable")
// providing a starting version
spark.readStream.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("myDeltaTable")
// providing a starting timestamp
spark.readStream.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", "2021-04-21 05:35:43")
.load("/pathToMyDeltaTable")
// not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta")
.option("readChangeFeed", "true")
.table("myDeltaTable")
テーブルを読み込んでいる間にチェンジデータを取得するには、オプションreadChangeFeed
をtrue
に設定します。startingVersion
やstartingTimestamp
はオプションであり、指定されない場合、ストリームはストリーミング処理時点のテーブルの最新のスナップショットをINSERT
、それ以降の変更をチェンジデータとして返却します。チェンジデータを読み込み際に、レートリミット(maxFilesPerTrigger
, maxBytesPerTrigger
)のようなオプションやexcludeRegex
もサポートされています。
注意
レートリミットは、開始スナップショットバージョン以外のバージョンに対して原子的なものとなります。すなわち、コミットバージョン全体にレートリミットが適用されるか、コミット全体が返却されます。
チェンジデータイベントのスキーマ
データカラムに加え、チェンジデータには変更イベントのタイプを特定するメタデータのカラムが含まれます。
カラム名 | タイプ | 値 |
---|---|---|
_change_type |
String |
insert , update_preimage , update_postimage , delete (1) |
_commit_version |
Long | 変更を含むDeltaログあるいはテーブルのバージョン |
_commit_timestamp |
Timestamp | コミットが生成された時点のタイムスタンプ |
(1) preimage
はアップデート前の値で、postimage
はアップデート後の値となります。
FAQ
チェンジデータフィードを有効化することによるオーバーヘッドは何ですか?
大きなインパクトはありません。チェンジデータのレコードは、クエリー実行プロセスの間に生成され、これは一般的に書き込まれるファイルの全体サイズよりもはるかに小さいものとなります。
変更レコードの保持ポリシーは何ですか?
チェンジレコードは、古いテーブルバージョンに対するものと同じ保持ポリシーに従いますので、特定の保持期間の外にある場合、VACUUMでクリーンアップされます。
新規レコードはいつチェンジデータフィードで利用できるようになりますか?
変更データはDelta Lakeのトンランザクションと一緒にコミットされるので、テーブルで新規データが利用できるようになるのと同じタイミングで利用できるようになります。
ノートブック
チェンジデータフィードのノートブック
上のノートブックをウォークスルーした記事です。