概要
Snowflake の標準テーブルをソースとして利用し、標準 STREAM の仕様を検証した結果を共有します。以下の動作について検証し、各項目ごとに STREAM テーブルへの書き込みおよびデータの消費を行いました。
- STREAM の基本操作
- 一般的な DML 操作
- 応用的な DML 操作
- テーブル変更
- カラムのデータ型変更
- クローンによる動作確認
検証結果要約
各検証項目ごとの操作内容と結果は、下記のとおりです。
No. | 検証項目 | 結果 |
---|---|---|
1-1 | CREATE STREAM の実施 | STREAM に挿入データが反映 |
1-2 | ALTER STREAM の実施 | コメントが設定されたことを確認 |
1-3 | SHOW STREAMS の実施 | STREAM 一覧が表示(画像参照) |
1-4 | DESCRIBE STREAM の実施 | 出力結果は SHOW STREAMS と同様 |
1-5 | STREAM の消費 | STREAM のデータが消費され、空となる |
1-6 | DROP STREAM の実施 | STREAM の削除が確認できる |
2-1 | INSERT の実施 | INSERT 操作が STREAM に記録 |
2-2 | INSERT OVERWRITE の実施 | 既存レコードが DELETE として、上書き分が INSERT として記録 |
2-3 | マルチテーブル INSERT の実施 | 対象テーブルへの INSERT が STREAM に記録 |
2-4 | UPDATE の実施 | 更新操作が STREAM に反映 |
2-5 | DELETE の実施 | DELETE 操作が STREAM に記録 |
2-6 | MERGE の実施 | MERGE 操作の結果が STREAM に反映 |
2-7 | TRUNCATE の実施 | 全レコードが DELETE として STREAM に記録 |
2-8 | INSERT と DELETE の実施 | 操作が相殺され、STREAM に記録されない |
2-9 | INSERT と TRUNCATE の実施 | 結果として STREAM に記録は残らず |
2-10 | UPDATE と UPDATE の実施 | 最終 UPDATE の結果が INSERT として記録 |
3-1 | CTAS の実施 | STREAM へのクエリ時にエラー発生(元テーブル削除のため) |
3-2 | ロールバック の実施 | ROLLBACK により STREAM に記録されない |
4-1 | テーブル名の変更 | STREAM のソースが変更され、DDL に反映 |
4-2 | カラムの追加 | NOTE カラムが追加され、STREAM にも反映 |
4-3 | カラム名の変更 | カラム名変更が STREAM に反映 |
4-4 | カラムの削除 | STREAM からも対象カラムが削除される |
5-1 | 同義の型への変更 | 型変更後も STREAM への影響なし |
5-2 | 文字列の長さを増やす | カラム長が増え、STREAM に反映 |
5-3 | 数値列の精度を上げる | 数値精度が上がったことを確認 |
5-4 | 数値列の精度を下げる | 数値精度が下がったことを確認 |
6-1 | ソーステーブルのみの CLONE の実施 | クローン後も STREAM のデータは残る |
6-2 | STREAM の CLONE の実施 | クローン後も STREAM のデータは残る |
6-3 | ソーステーブルと STREAM の CLONE の実施 | クローン後、STREAM のデータが消失 |
事前準備
データベースを作成
CREATE OR REPLACE DATABASE SF_STREAM_TEST;
USE SF_STREAM_TEST;
1. STREAM(STREAM)の基本操作
1-1. CREATE STREAM の実施
ソーステーブルと STREAM を作成します。
-- 検証用テーブルの作成
CREATE OR REPLACE TABLE TEST_TABLE (
ID INT,
NAME VARCHAR(50),
QUANTITY NUMBER(10,2)
);
-- TEST_TABLEに対するストリームの作成
CREATE OR REPLACE STREAM TEST_STREAM ON TABLE TEST_TABLE;
ソーステーブルにデータを挿入し、STREAM にデータが格納されたことを確認します。
INSERT INTO TEST_TABLE (ID, NAME, QUANTITY) VALUES
(1, 'Alice', 100.00),
(2, 'Bob', 50.00);
SELECT * FROM TEST_STREAM;
1-2. ALTER STREAM の実施
ALTER STREAM でコメントを設定します。なお、オプション(APPEND_ONLY 等)の変更も可能です。
-- ストリームにコメントを設定
ALTER STREAM TEST_STREAM SET COMMENT = '検証用ストリーム';
1-3. SHOW STREAMS の実施
現在のスキーマ内に存在する STREAM 一覧を表示します。
-- SHOW STREAMS コマンドでスキーマ内の STREAM 一覧を確認
SHOW STREAMS;
1-4. DESCRIBE STREAM の実施
DESCRIBE STREAM
を実行しても、列情報は表示されず、SHOW STREAMS
と同様の結果が出力されます。
-- STREAM の詳細情報・スキーマを確認
DESCRIBE STREAM TEST_STREAM_1;
1-5. STREAM の消費(CTAS による実施例は上記各項目参照)
CTAS などにより STREAM のデータを選択すると、STREAM 上のデータは消費されて空になります。
-- ストリームをCTASで消費
CREATE OR REPLACE TABLE STREAM_OUTPUT_0 AS
SELECT * FROM TEST_STREAM;
SELECT * FROM TEST_STREAM;
1-6. DROP STREAM の実施
作成した STREAM を削除します。
-- 作成した STREAM の削除
DESCRIBE STREAM TEST_STREAM;
2. 一般 DML 操作
一般 DML 操作の事前準備
ストリームを再度作成します。
-- TEST_TABLEに対するストリームの再作成
CREATE OR REPLACE STREAM TEST_STREAM ON TABLE TEST_TABLE;
2-1. INSERT の実施
-- 新規行の挿入
INSERT INTO TEST_TABLE (ID, NAME, QUANTITY)
VALUES (3, 'Carol', 75.00);
STREAM に INSERT 操作が記録されていることを確認します。
-- ストリームの内容をCTASで取得
CREATE OR REPLACE TABLE STREAM_OUTPUT_1 AS
SELECT * FROM TEST_STREAM;
SELECT * FROM STREAM_OUTPUT_1 ORDER BY ID;
2-2. INSERT OVERWRITE の実施
現在 3 レコード存在するテーブルに対して、INSERT OVERWRITE
を実行します。
-- ID=1の行だけを残すため、INSERT OVERWRITEで上書き
INSERT OVERWRITE INTO TEST_TABLE
SELECT * FROM TEST_TABLE
WHERE ID = 1;
既存のレコードは DELETE として、挿入されたレコードは INSERT として記録されます。
-- ストリームの内容をCTASで取得
CREATE OR REPLACE TABLE STREAM_OUTPUT_2 AS
SELECT *
FROM TEST_STREAM;
SELECT * FROM STREAM_OUTPUT_2 ORDER BY ID;
2-3. INSERT(マルチテーブル)の実施
マルチテーブルへの INSERT を行います。
-- 追加のテーブル OTHER_TABLE の作成
CREATE OR REPLACE TABLE OTHER_TABLE (
ID INT,
NAME VARCHAR(50)
);
-- マルチテーブルINSERTの実行
INSERT ALL
INTO TEST_TABLE (ID, NAME, QUANTITY) VALUES(10, 'Dave', 0)
INTO OTHER_TABLE (ID, NAME) VALUES(10, 'Dave')
SELECT 1;
TEST_TABLE 側の STREAM には、該当の INSERT が記録されます。
-- TEST_TABLE側のストリーム確認
CREATE OR REPLACE TABLE STREAM_OUTPUT_3 AS
SELECT * FROM TEST_STREAM;
SELECT * FROM STREAM_OUTPUT_3 ORDER BY ID;
2-4. UPDATE の実施
UPDATE を実行します。
-- ID=1の行を更新
UPDATE TEST_TABLE
SET NAME = 'Alice (updated)', QUANTITY = 120.50
WHERE ID = 1;
UPDATE 前の行が DELETE、UPDATE 後の行が INSERT として記録されます。
-- ストリームの内容をCTASで取得
CREATE OR REPLACE TABLE STREAM_OUTPUT_4 AS
SELECT *
FROM TEST_STREAM;
SELECT * FROM STREAM_OUTPUT_4 ORDER BY ID;
2-5. DELETE の実施
DELETE を実行します。
-- ID=10の行を削除
DELETE FROM TEST_TABLE
WHERE ID = 10;
削除対象の行が DELETE として記録されます。
-- ストリームの内容をCTASで取得
CREATE OR REPLACE TABLE STREAM_OUTPUT_5 AS
SELECT * FROM TEST_STREAM;
SELECT * FROM STREAM_OUTPUT_5 ORDER BY ID;
2-6. MERGE の実施
MERGE 文を実行します。
-- ソーステーブル SOURCE_TABLE の作成とデータ投入
CREATE OR REPLACE TABLE SOURCE_TABLE (
ID INT,
NAME VARCHAR(50),
QUANTITY NUMBER(10,2)
);
INSERT INTO SOURCE_TABLE VALUES
(1, 'Alice (merged)', 130.00), -- ID=1は更新用
(3, 'Emma', 90.00); -- ID=3は新規挿入用
-- MERGE操作の実施
MERGE INTO TEST_TABLE AS T
USING SOURCE_TABLE AS S
ON T.ID = S.ID
WHEN MATCHED THEN
UPDATE SET T.NAME = S.NAME, T.QUANTITY = S.QUANTITY
WHEN NOT MATCHED THEN
INSERT (ID, NAME, QUANTITY) VALUES(S.ID, S.NAME, S.QUANTITY);
INSERT された行は INSERT、UPDATE された行は DELETE と INSERT の組み合わせで記録されます。
-- ストリームの内容をCTASで取得
CREATE OR REPLACE TABLE STREAM_OUTPUT_6 AS
SELECT *
FROM TEST_STREAM;
SELECT * FROM STREAM_OUTPUT_6 ORDER BY ID;
2-7. TRUNCATE の実施
TRUNCATE を実行します。
-- TRUNCATE操作
TRUNCATE TABLE TEST_TABLE;
全レコードが DELETE として記録されます。
-- ストリームの内容をCTASで取得
CREATE OR REPLACE TABLE STREAM_OUTPUT_7 AS
SELECT *
FROM TEST_STREAM;
SELECT * FROM STREAM_OUTPUT_7 ORDER BY ID;
2-8. INSERT と DELETE の実施
INSERT → DELETE の順に実施します。
-- 新規行のINSERT
INSERT INTO TEST_TABLE (ID, NAME, QUANTITY) VALUES (5, 'Frank', 45.00);
-- 直後にDELETE
DELETE FROM TEST_TABLE WHERE ID = 5;
結果として、何も記録されません。
-- ストリームの内容をCTASで取得
CREATE OR REPLACE TABLE STREAM_OUTPUT_8 AS
SELECT *
FROM TEST_STREAM;
SELECT * FROM STREAM_OUTPUT_8 ORDER BY ID;
Snowflake-managed Apache Iceberg™ テーブルで同様の検証を行うと、DELETE が記録されたケースもあります。どちらが正しい挙動なのかは、まだ確証を得られていません。
出所:Snowflake-managed Apache Iceberg™ table における STREAM 機能を試してみた #iceberg - Qiita
2-9. INSERT と TRUNCATE の実施
INSERT → TRUNCATE の順に実行します。
-- 新規データのINSERT
INSERT INTO TEST_TABLE (ID, NAME, QUANTITY) VALUES
(6, 'George', 60.00),
(7, 'Helen', 70.00);
-- TRUNCATE操作
TRUNCATE TABLE TEST_TABLE;
結果として、何も記録されません。
-- ストリームの内容をCTASで取得
CREATE OR REPLACE TABLE STREAM_OUTPUT_9 AS
SELECT *
FROM TEST_STREAM;
SELECT * FROM STREAM_OUTPUT_9 ORDER BY ID;
2-10. UPDATE と UPDATE の実施
まず、UPDATE 対象となるレコードを INSERT します。
-- 新規データのINSERT
INSERT INTO TEST_TABLE (ID, NAME, QUANTITY) VALUES
(8, 'Iverson', 80.00);
-- ストリームの消費
CREATE OR REPLACE TABLE STREAM_OUTPUT_0A AS
SELECT *
FROM TEST_STREAM
WHERE 1 = 0;
続けて 2 回の UPDATE を実行します。
UPDATE TEST_TABLE
SET NAME = 'Iverson (----------------)'
WHERE ID = 8;
UPDATE TEST_TABLE
SET NAME = 'Iverson (updated)'
WHERE ID = 8;
最後に実行した UPDATE の結果のみが INSERT として記録されていました。
-- ストリームの消費
CREATE OR REPLACE TABLE STREAM_OUTPUT_0A AS
SELECT *
FROM TEST_STREAM;
SELECT * FROM STREAM_OUTPUT_0A ORDER BY ID;
3. 応用的な DML 操作
3-1. CTAS の実施
CTAS(CREATE TABLE AS SELECT)を実行します。
-- 新規行のINSERT
CREATE OR REPLACE TABLE TEST_TABLE
AS
SELECT
CAST(8 AS int) AS ID
,CAST('Ivy' AS VARCHAR(50)) AS NAME
,CAST(80.00 AS NUMBER(10,2)) AS QUANTITY
STREAM へクエリを実行すると、エラーが発生します。
SELECT * FROM TEST_STREAM ORDER BY ID;
091901: Base table 'SF_STREAM_TEST.PUBLIC.TEST_TABLE' dropped, cannot read from stream 'TEST_STREAM' in line 1 position 14.
3-2. ロールバックの実施
STREAM を再作成します。
-- TEST_TABLEに対するストリームの作成
CREATE OR REPLACE STREAM TEST_STREAM ON TABLE TEST_TABLE;
下記のトランザクションを ROLLBACK します。
BEGIN;
INSERT INTO TEST_TABLE (ID, NAME, QUANTITY) VALUES (9, 'Jack', 90.00);
-- コミットせずROLLBACK
ROLLBACK;
結果として、何も記録されません。
-- ストリームの内容を確認
SELECT * FROM TEST_STREAM ORDER BY ID;
4. テーブル変更
4-1. テーブル名の変更
テーブル名を変更して STREAM への影響を確認します。
-- テーブル名の変更
ALTER TABLE TEST_TABLE RENAME TO TEST_TABLE_RENAMED;
STREAM のソーステーブル名も変更されており、DDL を確認すると変更が反映されていることがわかります。
SHOW STREAMS LIKE 'TEST_STREAM';
SELECT GET_DDL('STREAM', 'TEST_STREAM');
create or replace stream TEST_STREAM on table TEST_TABLE_RENAMED;
4-2. カラムの追加
カラムを追加し、STREAM への影響を確認します。
-- カラムの追加
ALTER TABLE TEST_TABLE_RENAMED
ADD COLUMN NOTE STRING;
STREAM からも新たに追加された NOTE
列が確認できます。
-- ストリームの内容をCTASで取得
CREATE OR REPLACE TABLE STREAM_OUTPUT_10 AS
SELECT *
FROM TEST_STREAM;
DESC TABLE STREAM_OUTPUT_10;
4-3. カラム名の変更
カラム名を変更する前にデータを挿入します。
-- 挿入テスト
INSERT INTO TEST_TABLE_RENAMED (ID, NAME, QUANTITY, NOTE)
VALUES (12, 'Laura', 120.00, '名前変更後の挿入');
続いて、カラム名を変更します。
-- カラム名の変更: NAME列をFULL_NAMEにリネーム
ALTER TABLE TEST_TABLE_RENAMED
RENAME COLUMN NAME TO FULL_NAME;
カラム名が FULL_NAME となり、STREAM 側にも反映されています。
-- ストリームの内容をCTASで取得
CREATE OR REPLACE TABLE STREAM_OUTPUT_11 AS
SELECT *
FROM TEST_STREAM;
SELECT * FROM STREAM_OUTPUT_11 ORDER BY ID;
DESC TABLE STREAM_OUTPUT_11;
4-4. カラムの削除
カラムを削除します。
-- カラムの削除(事前にストリームを消費済みの状態を想定)
ALTER TABLE TEST_TABLE_RENAMED
DROP COLUMN NOTE;
STREAM からも対応するカラムが削除されました。
-- ストリームの内容をCTASで取得
CREATE OR REPLACE TABLE STREAM_OUTPUT_12 AS
SELECT *
FROM TEST_STREAM;
DESC TABLE STREAM_OUTPUT_12;
5. カラムのデータ型変更
5-1. 同義の型への変更(VARCHAR から STRING など)
VARCHAR 型のカラムを STRING 型に変更します。
-- FULL_NAME列の型をSTRING(50)に変更
ALTER TABLE TEST_TABLE_RENAMED
ALTER COLUMN FULL_NAME STRING(50);
STREAM 上には特に変化は見られません。
-- ストリームの内容をCTASで取得
CREATE OR REPLACE TABLE STREAM_OUTPUT_20 AS
SELECT *
FROM TEST_STREAM;
DESC TABLE STREAM_OUTPUT_20;
5-2. 文字列の長さを増やす(VARCHAR(50) から VARCHAR(100))
VARCHAR(50) 型を VARCHAR(100) 型に変更します。
ALTER TABLE TEST_TABLE_RENAMED
ALTER COLUMN FULL_NAME VARCHAR(100);
FULL_NAME
列が VARCHAR(100) 型に変更され、STREAM にも反映されます。
-- ストリームの内容をCTASで取得
CREATE OR REPLACE TABLE STREAM_OUTPUT_21 AS
SELECT *
FROM TEST_STREAM;
DESC TABLE STREAM_OUTPUT_21;
5-3. 数値列の精度を上げる(NUMBER(10,2) から NUMBER(20,2))
NUMBER(10,2) 型を NUMBER(20,2) 型に変更します。
ALTER TABLE TEST_TABLE_RENAMED
ALTER COLUMN QUANTITY NUMBER(20,2);
QUANTITY
列が NUMBER(20,2) 型に変更されました。
-- ストリームの内容をCTASで取得
CREATE OR REPLACE TABLE STREAM_OUTPUT_22 AS
SELECT *
FROM TEST_STREAM;
DESC TABLE STREAM_OUTPUT_22;
5-4. 数値列の精度を下げる(NUMBER(20,2) から NUMBER(10,2))
NUMBER(20,2) 型を NUMBER(10,2) 型に変更します。
ALTER TABLE TEST_TABLE_RENAMED
ALTER COLUMN QUANTITY NUMBER(10,2);
QUANTITY
列が NUMBER(10,2) 型に戻り、STREAM にも反映されます。
-- ストリームの内容をCTASで取得
CREATE OR REPLACE TABLE STREAM_OUTPUT_23 AS
SELECT *
FROM TEST_STREAM;
DESC TABLE STREAM_OUTPUT_23;
6. クローンによる動作確認
事前準備
検証用のデータベースを作成します。
CREATE OR REPLACE DATABASE SF_STREAM_CLONE_TEST;
USE SF_STREAM_CLONE_TEST;
6-1. ソーステーブルのみの CLONE の実施
ソーステーブルと STREAM を再作成してデータを挿入します。
-- 検証用テーブルの作成と初期データ投入
CREATE OR REPLACE TABLE TEST_TABLE (
ID INT,
NAME VARCHAR(50),
QUANTITY NUMBER(10,2)
);
-- TEST_TABLEに対するストリームの作成
CREATE OR REPLACE STREAM TEST_STREAM ON TABLE TEST_TABLE;
INSERT INTO TEST_TABLE (ID, NAME, QUANTITY) VALUES
(1, 'Alice', 100.00);
ソーステーブルのみ CLONE を作成します。
CREATE OR REPLACE TABLE TEST_TABLE_CLONE CLONE TEST_TABLE;
STREAM のデータはそのまま残っています。
SELECT * FROM TEST_STREAM ORDER BY ID;
6-2. STREAM の CLONE の実施
STREAM のみ CLONE を作成します。
CREATE OR REPLACE STREAM TEST_STREAM_CLONE CLONE TEST_STREAM;
STREAM に記録されているデータは引き続き残っています。
SELECT * FROM TEST_STREAM ORDER BY ID;
6-3. ソーステーブルと STREAM の CLONE の実施
ソーステーブルと STREAM の両方をクローンします。
CREATE OR REPLACE SCHEMA CLONE_SCHEMA CLONE PUBLIC;
クローン後は、オリジナルの STREAM からデータが消えます。
SELECT * FROM TEST_STREAM ORDER BY ID;