概要
Snowflake-managed Apache Iceberg™ テーブルでサポートされている CHANGE TRACKING 機能(テーブル変更追跡機能)について検証し、結果と検証コードを共有します。
出所:CREATE ICEBERG TABLE (IcebergカタログとしてのSnowflake) | Snowflake Documentation
本機能は Time Travel の保持期間に依存する機能であることがドキュメントにて記載されており、 DATA_RETENTION_TIME_IN_DAYS
オプションの設定値を要件に合わせて設定する必要があります。
要求されたデータがTime Travel保持期間(デフォルトは1日)を超えている場合、ステートメントは失敗します。
出所:CHANGES | Snowflake Documentation
本記事は「SnowflakeにおけるApache Iceberg機能の完全ガイド」シリーズの一部です。
Apache Iceberg機能の全体像やSnowflake上での活用方法について詳しく知りたい方は、以下の記事をご覧ください。
事前準備
以下の記事で紹介している環境構築手順を事前に実施してください。
出所:Snowflake Catalog における Iceberg テーブルの基本的な操作手順 #Spark - Qiita
データベース、テーブル、および STREAM の作成
データベースを作成
CREATE OR REPLACE DATABASE change_tracking_db;
USE DATABASE change_tracking_db
テーブルを作成
CREATE OR REPLACE ICEBERG TABLE CHANGE_TRACKING_TABLE (
id INT,
name VARCHAR,
status VARCHAR
)
CATALOG = 'SNOWFLAKE'
EXTERNAL_VOLUME = 'my_azure_sf_volume'
BASE_LOCATION = 'change_tracking_table'
CHANGE_TRACKING = TRUE;
SHOW TABLES IN DATABASE change_tracking_db;
検証コードと結果
INSERT 時の動作確認
まず、INSERT したレコードが全て STREAM に格納されることを確認します。
SET ts1 = (SELECT CURRENT_TIMESTAMP());
INSERT INTO
CHANGE_TRACKING_TABLE (id, name, status)
VALUES
(1, 'Alice', 'active'),
(2, 'Bob', 'active'),
(3, 'Charlie', 'active'),
(4, 'Daisy', 'active'),
(5, 'Ethan', 'active');
SET target_ts = $ts1;
SELECT *
FROM CHANGE_TRACKING_TABLE
CHANGES(INFORMATION => DEFAULT)
AT(TIMESTAMP => $target_ts);
UPDATE 時の動作確認
UPDATE 後のレコードが STREAM に格納されることを確認します。
SET ts2 = (SELECT CURRENT_TIMESTAMP());
UPDATE CHANGE_TRACKING_TABLE
SET status = 'inactive' WHERE id IN (2, 3);
SET target_ts = $ts2;
SELECT *
FROM CHANGE_TRACKING_TABLE
CHANGES(INFORMATION => DEFAULT)
AT(TIMESTAMP => $target_ts);
DELETE 時の動作確認
DELETE したレコードが STREAM に格納されることを確認します。
SET ts3 = (SELECT CURRENT_TIMESTAMP());
DELETE FROM CHANGE_TRACKING_TABLE
WHERE id = 5;
SET target_ts = $ts3;
SELECT *
FROM CHANGE_TRACKING_TABLE
CHANGES(INFORMATION => DEFAULT)
AT(TIMESTAMP => $target_ts);
MERGE 時の動作確認
MERGE(UPSERT)されたレコードが STREAM に格納されることを確認します。
SET ts4 = (SELECT CURRENT_TIMESTAMP());
MERGE INTO CHANGE_TRACKING_TABLE AS tgt
USING (
SELECT 1 AS id, 'Alice_updated' AS name, 'inactive' AS status
UNION ALL
SELECT 6 AS id, 'Frank' AS name, 'active' AS status
) AS src
ON tgt.id = src.id
WHEN MATCHED THEN
UPDATE SET tgt.name = src.name, tgt.status = src.status
WHEN NOT MATCHED THEN
INSERT (id, name, status)
VALUES (src.id, src.name, src.status);
SET target_ts = $ts4;
SELECT *
FROM CHANGE_TRACKING_TABLE
CHANGES(INFORMATION => DEFAULT)
AT(TIMESTAMP => $target_ts);
補足
DML のクエリ ID によりデータを取得する方法
指定したテーブルにおける DML ('INSERT', 'UPDATE', 'DELETE', 'MERGE', 'COPY')のクエリ ID の一連を取得します。
-- 変数の設定
SET DATABASE_NAME = 'CHANGE_TRACKING_DB';
SET SCHEMA_NAME = 'PUBLIC';
SET CHANGE_TRACKING_TABLE = 'CHANGE_TRACKING_TABLE';
-- クエリ
SELECT
query_id,
query_type,
start_time,
query_text
FROM TABLE(INFORMATION_SCHEMA.QUERY_HISTORY())
WHERE query_type IN ('INSERT', 'UPDATE', 'DELETE', 'MERGE', 'COPY')
AND DATABASE_NAME = $DATABASE_NAME
AND SCHEMA_NAME = $SCHEMA_NAME
AND query_text ILIKE '%' || $CHANGE_TRACKING_TABLE || '%'
ORDER BY start_time DESC;
上記の出力結果からクエリ ID を変数にセットして、データを取得します。
SET STATEMENT = '01bad92d-0001-9d25-0000-00652683e279';
SELECT *
FROM
CHANGE_TRACKING_TABLE
CHANGES(INFORMATION => DEFAULT)
AT(STATEMENT => $STATEMENT );