1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Snowflake-managed Apache Iceberg™ table における STREAM 機能を試してみた

Last updated at Posted at 2025-03-07

概要

Snowflake-managed Apache Iceberg™ テーブルでサポートされている STREAM 機能について検証し、結果と検証コードを共有します。

image.png

出所:Apache Iceberg™ テーブル | Snowflake Documentation

Snowflake のドキュメントでは「ほとんどのシナリオにおいて CDC(変更データキャプチャー)の記録ソースとして適切」とされていますが、厳密には CDC を実現するための機能ではないことに注意してください。

ほとんどのシナリオで CDC 記録の適切なソースです。

image.png

出所:ストリームの紹介 | Snowflake Documentation

なお、STREAM からテーブルへ DML 処理を実行した際に STREAM のデータが消失する(これを「データの消費」と呼ぶ)仕様である点も押さえておきましょう。

image.png

出所:ストリームの紹介 | Snowflake Documentation

本記事は「SnowflakeにおけるApache Iceberg機能の完全ガイド」シリーズの一部です。
Apache Iceberg機能の全体像やSnowflake上での活用方法について詳しく知りたい方は、以下の記事をご覧ください。

事前準備

以下の記事で紹介している環境構築手順を事前に実施してください。

image.png

出所:Snowflake Catalog における Iceberg テーブルの基本的な操作手順 #Spark - Qiita

データベース、テーブル、および STREAM の作成

データベースを作成

CREATE OR REPLACE DATABASE iceberg_stream_db;
USE DATABASE iceberg_stream_db

image.png

STREAM 元のテーブルを作成

-- stream 元のテーブルを作成
CREATE OR REPLACE ICEBERG TABLE source_table (
  id    INT,
  name  VARCHAR,
  status VARCHAR
)
    CATALOG = 'SNOWFLAKE'
    EXTERNAL_VOLUME = 'my_azure_sf_volume'
    BASE_LOCATION = 'source_table';
SHOW TABLES IN DATABASE iceberg_stream_db;

image.png

STREAM を作成

CREATE OR REPLACE STREAM source_table__stream
    ON TABLE source_table;
SHOW STREAMS IN DATABASE iceberg_stream_db;

image.png

STREAM の書き込み先のテーブルを作成

STREAM から CTAS で直接テーブルを作成しようとすると、Iceberg テーブルのカラム名の制約によってエラーが発生しました。

-- STREAM の書き込み先のテーブルを作成
CREATE OR REPLACE ICEBERG TABLE source_table__change
    CATALOG = 'SNOWFLAKE'
    EXTERNAL_VOLUME = 'my_azure_sf_volume'
    BASE_LOCATION = 'source_table__change'
    AS
    SELECT * FROM source_table__stream;

091372: SQL compilation error: 'METADATA$ACTION, METADATA$ISUPDATE, METADATA$ROW_ID' cannot be used as column name(s) for Iceberg tables. METADATA$ and SYSTEM$ column name prefixes are reserved.

image.png

そこで、$__ に置き換えてテーブルを作成しました。

CREATE OR REPLACE ICEBERG TABLE source_table__change
    CATALOG = 'SNOWFLAKE'
    EXTERNAL_VOLUME = 'my_azure_sf_volume'
    BASE_LOCATION = 'source_table__change'
    AS
    SELECT
        * EXCLUDE (
            METADATA$ACTION,
            METADATA$ISUPDATE,
            METADATA$ROW_ID        
        ),
        METADATA$ACTION AS METADATA__ACTION,
        METADATA$ISUPDATE AS METADATA__ISUPDATE,
        METADATA$ROW_ID AS METADATA__ROW_ID
        FROM 
            source_table__stream;
SHOW TABLES IN DATABASE iceberg_stream_db;

image.png

検証コードと結果

STREAM の初期状態を確認

-- STREAM の初期状態をチェック
SELECT
    *
    FROM
        source_table__stream;

image.png

INSERT 時の動作確認

まず、INSERT したレコードが全て STREAM に格納されることを確認します。

INSERT INTO source_table (id, name, status) VALUES
  (1,'Alice','active'),
  (2,'Bob','active'),
  (3,'Charlie','active'),
  (4,'Daisy','active'),
  (5,'Ethan','active');
SELECT 
    *
    FROM source_table__stream;

image.png

次に、STREAM のデータを保管用テーブルに INSERT して「消費」します。

-- データを消費
INSERT INTO source_table__change
(
    ID,
    NAME,
    STATUS,
    METADATA__ACTION,
    METADATA__ISUPDATE,
    METADATA__ROW_ID
)
SELECT
    * EXCLUDE (
        METADATA$ACTION,
        METADATA$ISUPDATE,
        METADATA$ROW_ID        
    ),
    METADATA$ACTION AS METADATA__ACTION,
    METADATA$ISUPDATE AS METADATA__ISUPDATE,
    METADATA$ROW_ID AS METADATA__ROW_ID
    FROM 
        source_table__stream;

image.png

UPDATE 時の動作確認

UPDATE 後のレコードが STREAM に格納されることを確認します。

-- 2人のステータスを変更(更新)
UPDATE source_table
    SET status = 'inactive' WHERE id IN (2, 3);
SELECT 
    *
    FROM source_table__stream;

image.png

更新後のレコードも同様に保管用テーブルへ INSERT して消費します。

-- データを消費
INSERT INTO source_table__change
(
    ID,
    NAME,
    STATUS,
    METADATA__ACTION,
    METADATA__ISUPDATE,
    METADATA__ROW_ID
)
SELECT
    * EXCLUDE (
        METADATA$ACTION,
        METADATA$ISUPDATE,
        METADATA$ROW_ID        
    ),
    METADATA$ACTION AS METADATA__ACTION,
    METADATA$ISUPDATE AS METADATA__ISUPDATE,
    METADATA$ROW_ID AS METADATA__ROW_ID
    FROM 
        source_table__stream;

image.png

DELETE 時の動作確認

DELETE したレコードが STREAM に格納されることを確認します。

DELETE FROM source_table
    WHERE id = 5;
SELECT 
    *
    FROM source_table__stream;

image.png

同様に、保管用テーブルへ INSERT して消費します。

-- データを消費
INSERT INTO source_table__change
(
    ID,
    NAME,
    STATUS,
    METADATA__ACTION,
    METADATA__ISUPDATE,
    METADATA__ROW_ID
)
SELECT
    * EXCLUDE (
        METADATA$ACTION,
        METADATA$ISUPDATE,
        METADATA$ROW_ID        
    ),
    METADATA$ACTION AS METADATA__ACTION,
    METADATA$ISUPDATE AS METADATA__ISUPDATE,
    METADATA$ROW_ID AS METADATA__ROW_ID
    FROM 
        source_table__stream;

image.png

MERGE 時の動作確認

MERGE(UPSERT)されたレコードが STREAM に格納されることを確認します。

MERGE INTO source_table AS tgt
USING (
  SELECT 1 AS id, 'Alice_updated' AS name, 'inactive' AS status
  UNION ALL
  SELECT 6 AS id, 'Frank' AS name, 'active' AS status
) AS src
ON tgt.id = src.id
WHEN MATCHED THEN
  UPDATE SET tgt.name = src.name, tgt.status = src.status
WHEN NOT MATCHED THEN
  INSERT (id, name, status)
  VALUES (src.id, src.name, src.status);
SELECT 
    *
    FROM
        source_table__stream;

image.png

その後、保管用テーブルへ消費します。

-- データを消費
INSERT INTO source_table__change
(
    ID,
    NAME,
    STATUS,
    METADATA__ACTION,
    METADATA__ISUPDATE,
    METADATA__ROW_ID
)
SELECT
    * EXCLUDE (
        METADATA$ACTION,
        METADATA$ISUPDATE,
        METADATA$ROW_ID        
    ),
    METADATA$ACTION AS METADATA__ACTION,
    METADATA$ISUPDATE AS METADATA__ISUPDATE,
    METADATA$ROW_ID AS METADATA__ROW_ID
    FROM 
        source_table__stream;

image.png

INSERT と DELETE を実行した時の動作確認

INSERT した後に DELETE を行ったレコードが、STREAM に格納されるケースを確認します。

INSERT INTO source_table (id, name, status) VALUES
  (6,'Franky','active');
SELECT 
    *
    FROM source_table__stream;
-- 1人のレコードを削除
DELETE FROM source_table WHERE id = 6;
SELECT 
    *
    FROM source_table__stream;

image.png

前日に同様のコードを実行した際は、STREAM に書き込まれないケースがありました。どちらが正式な仕様であるかが不明ですが、最新の結果を検証結果としました。

image.png

保管用テーブルへデータを INSERT して消費します。

-- データを消費
INSERT INTO source_table__change
(
    ID,
    NAME,
    STATUS,
    METADATA__ACTION,
    METADATA__ISUPDATE,
    METADATA__ROW_ID
)
SELECT
    * EXCLUDE (
        METADATA$ACTION,
        METADATA$ISUPDATE,
        METADATA$ROW_ID        
    ),
    METADATA$ACTION AS METADATA__ACTION,
    METADATA$ISUPDATE AS METADATA__ISUPDATE,
    METADATA$ROW_ID AS METADATA__ROW_ID
    FROM 
        source_table__stream;

CTAS の動作確認

CTAS(CREATE TABLE AS SELECT)を実行すると、STREAM にデータは書き込まれず、以下のエラーが発生しました。

CREATE OR REPLACE ICEBERG TABLE source_table
    EXTERNAL_VOLUME = 'my_azure_sf_volume'
    CATALOG = 'SNOWFLAKE'
    BASE_LOCATION = 'source_table'
    AS
    SELECT 1 AS id, 'Alice' AS name, 'active' AS status;
SELECT 
    *
    FROM source_table__stream;

091901: Base table 'ICEBERG_STREAM_DB.PUBLIC.SOURCE_TABLE' dropped, cannot read from stream 'SOURCE_TABLE__STREAM' in line 3 position 9.

image.png

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?