概要
Snowflake-managed Apache Iceberg™ テーブルでサポートされている STREAM 機能について検証し、結果と検証コードを共有します。
出所:Apache Iceberg™ テーブル | Snowflake Documentation
Snowflake のドキュメントでは「ほとんどのシナリオにおいて CDC(変更データキャプチャー)の記録ソースとして適切」とされていますが、厳密には CDC を実現するための機能ではないことに注意してください。
ほとんどのシナリオで CDC 記録の適切なソースです。
出所:ストリームの紹介 | Snowflake Documentation
なお、STREAM からテーブルへ DML 処理を実行した際に STREAM のデータが消失する(これを「データの消費」と呼ぶ)仕様である点も押さえておきましょう。
出所:ストリームの紹介 | Snowflake Documentation
本記事は「SnowflakeにおけるApache Iceberg機能の完全ガイド」シリーズの一部です。
Apache Iceberg機能の全体像やSnowflake上での活用方法について詳しく知りたい方は、以下の記事をご覧ください。
事前準備
以下の記事で紹介している環境構築手順を事前に実施してください。
出所:Snowflake Catalog における Iceberg テーブルの基本的な操作手順 #Spark - Qiita
データベース、テーブル、および STREAM の作成
データベースを作成
CREATE OR REPLACE DATABASE iceberg_stream_db;
USE DATABASE iceberg_stream_db
STREAM 元のテーブルを作成
-- stream 元のテーブルを作成
CREATE OR REPLACE ICEBERG TABLE source_table (
id INT,
name VARCHAR,
status VARCHAR
)
CATALOG = 'SNOWFLAKE'
EXTERNAL_VOLUME = 'my_azure_sf_volume'
BASE_LOCATION = 'source_table';
SHOW TABLES IN DATABASE iceberg_stream_db;
STREAM を作成
CREATE OR REPLACE STREAM source_table__stream
ON TABLE source_table;
SHOW STREAMS IN DATABASE iceberg_stream_db;
STREAM の書き込み先のテーブルを作成
STREAM から CTAS
で直接テーブルを作成しようとすると、Iceberg テーブルのカラム名の制約によってエラーが発生しました。
-- STREAM の書き込み先のテーブルを作成
CREATE OR REPLACE ICEBERG TABLE source_table__change
CATALOG = 'SNOWFLAKE'
EXTERNAL_VOLUME = 'my_azure_sf_volume'
BASE_LOCATION = 'source_table__change'
AS
SELECT * FROM source_table__stream;
091372: SQL compilation error: 'METADATA$ACTION, METADATA$ISUPDATE, METADATA$ROW_ID' cannot be used as column name(s) for Iceberg tables. METADATA$ and SYSTEM$ column name prefixes are reserved.
そこで、$
を __
に置き換えてテーブルを作成しました。
CREATE OR REPLACE ICEBERG TABLE source_table__change
CATALOG = 'SNOWFLAKE'
EXTERNAL_VOLUME = 'my_azure_sf_volume'
BASE_LOCATION = 'source_table__change'
AS
SELECT
* EXCLUDE (
METADATA$ACTION,
METADATA$ISUPDATE,
METADATA$ROW_ID
),
METADATA$ACTION AS METADATA__ACTION,
METADATA$ISUPDATE AS METADATA__ISUPDATE,
METADATA$ROW_ID AS METADATA__ROW_ID
FROM
source_table__stream;
SHOW TABLES IN DATABASE iceberg_stream_db;
検証コードと結果
STREAM の初期状態を確認
-- STREAM の初期状態をチェック
SELECT
*
FROM
source_table__stream;
INSERT 時の動作確認
まず、INSERT したレコードが全て STREAM に格納されることを確認します。
INSERT INTO source_table (id, name, status) VALUES
(1,'Alice','active'),
(2,'Bob','active'),
(3,'Charlie','active'),
(4,'Daisy','active'),
(5,'Ethan','active');
SELECT
*
FROM source_table__stream;
次に、STREAM のデータを保管用テーブルに INSERT して「消費」します。
-- データを消費
INSERT INTO source_table__change
(
ID,
NAME,
STATUS,
METADATA__ACTION,
METADATA__ISUPDATE,
METADATA__ROW_ID
)
SELECT
* EXCLUDE (
METADATA$ACTION,
METADATA$ISUPDATE,
METADATA$ROW_ID
),
METADATA$ACTION AS METADATA__ACTION,
METADATA$ISUPDATE AS METADATA__ISUPDATE,
METADATA$ROW_ID AS METADATA__ROW_ID
FROM
source_table__stream;
UPDATE 時の動作確認
UPDATE 後のレコードが STREAM に格納されることを確認します。
-- 2人のステータスを変更(更新)
UPDATE source_table
SET status = 'inactive' WHERE id IN (2, 3);
SELECT
*
FROM source_table__stream;
更新後のレコードも同様に保管用テーブルへ INSERT して消費します。
-- データを消費
INSERT INTO source_table__change
(
ID,
NAME,
STATUS,
METADATA__ACTION,
METADATA__ISUPDATE,
METADATA__ROW_ID
)
SELECT
* EXCLUDE (
METADATA$ACTION,
METADATA$ISUPDATE,
METADATA$ROW_ID
),
METADATA$ACTION AS METADATA__ACTION,
METADATA$ISUPDATE AS METADATA__ISUPDATE,
METADATA$ROW_ID AS METADATA__ROW_ID
FROM
source_table__stream;
DELETE 時の動作確認
DELETE したレコードが STREAM に格納されることを確認します。
DELETE FROM source_table
WHERE id = 5;
SELECT
*
FROM source_table__stream;
同様に、保管用テーブルへ INSERT して消費します。
-- データを消費
INSERT INTO source_table__change
(
ID,
NAME,
STATUS,
METADATA__ACTION,
METADATA__ISUPDATE,
METADATA__ROW_ID
)
SELECT
* EXCLUDE (
METADATA$ACTION,
METADATA$ISUPDATE,
METADATA$ROW_ID
),
METADATA$ACTION AS METADATA__ACTION,
METADATA$ISUPDATE AS METADATA__ISUPDATE,
METADATA$ROW_ID AS METADATA__ROW_ID
FROM
source_table__stream;
MERGE 時の動作確認
MERGE(UPSERT)されたレコードが STREAM に格納されることを確認します。
MERGE INTO source_table AS tgt
USING (
SELECT 1 AS id, 'Alice_updated' AS name, 'inactive' AS status
UNION ALL
SELECT 6 AS id, 'Frank' AS name, 'active' AS status
) AS src
ON tgt.id = src.id
WHEN MATCHED THEN
UPDATE SET tgt.name = src.name, tgt.status = src.status
WHEN NOT MATCHED THEN
INSERT (id, name, status)
VALUES (src.id, src.name, src.status);
SELECT
*
FROM
source_table__stream;
その後、保管用テーブルへ消費します。
-- データを消費
INSERT INTO source_table__change
(
ID,
NAME,
STATUS,
METADATA__ACTION,
METADATA__ISUPDATE,
METADATA__ROW_ID
)
SELECT
* EXCLUDE (
METADATA$ACTION,
METADATA$ISUPDATE,
METADATA$ROW_ID
),
METADATA$ACTION AS METADATA__ACTION,
METADATA$ISUPDATE AS METADATA__ISUPDATE,
METADATA$ROW_ID AS METADATA__ROW_ID
FROM
source_table__stream;
INSERT と DELETE を実行した時の動作確認
INSERT した後に DELETE を行ったレコードが、STREAM に格納されるケースを確認します。
INSERT INTO source_table (id, name, status) VALUES
(6,'Franky','active');
SELECT
*
FROM source_table__stream;
-- 1人のレコードを削除
DELETE FROM source_table WHERE id = 6;
SELECT
*
FROM source_table__stream;
保管用テーブルへデータを INSERT して消費します。
-- データを消費
INSERT INTO source_table__change
(
ID,
NAME,
STATUS,
METADATA__ACTION,
METADATA__ISUPDATE,
METADATA__ROW_ID
)
SELECT
* EXCLUDE (
METADATA$ACTION,
METADATA$ISUPDATE,
METADATA$ROW_ID
),
METADATA$ACTION AS METADATA__ACTION,
METADATA$ISUPDATE AS METADATA__ISUPDATE,
METADATA$ROW_ID AS METADATA__ROW_ID
FROM
source_table__stream;
CTAS の動作確認
CTAS(CREATE TABLE AS SELECT
)を実行すると、STREAM にデータは書き込まれず、以下のエラーが発生しました。
CREATE OR REPLACE ICEBERG TABLE source_table
EXTERNAL_VOLUME = 'my_azure_sf_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'source_table'
AS
SELECT 1 AS id, 'Alice' AS name, 'active' AS status;
SELECT
*
FROM source_table__stream;
091901: Base table 'ICEBERG_STREAM_DB.PUBLIC.SOURCE_TABLE' dropped, cannot read from stream 'SOURCE_TABLE__STREAM' in line 3 position 9.