3
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Snowflakeを業務で使う人向けに「これだけ読めばOK」をまとめた

3
Last updated at Posted at 2026-04-06

はじめに

スプレッドシートでの集計に限界を感じて、Snowflake で分析基盤を作ろうとしたとき「結局何からやればいいのか分からない」状態になった。

公式ドキュメントは膨大、Zenn の記事は 1,290 本、Qiita にも大量にある。全部読むのは無理。

この記事はそのときに整理した内容で、業務で Snowflake を使う前提で必要な知識を一通りまとめた。「DWH って何?」から「異常検知 SQL」まで、この 1 記事で完結する。


目次

  1. なぜ DWH が必要なのか
  2. なぜ Snowflake が選ばれるのか
  3. Snowflake の何が凄いのか
  4. アーキテクチャ(3 層構造)
  5. ログインと画面の使い方
  6. SQL 基本操作
  7. データ型一覧
  8. データの取り込み(S3 → Snowflake)
  9. Snowpipe(リアルタイム取り込み)
  10. ウェアハウス管理とコスト最適化
  11. 関数リファレンス
  12. ウィンドウ関数
  13. ビューとマテリアライズドビュー
  14. タイムトラベル(過去データの復元)
  15. ストリーム(変更データキャプチャ)
  16. タスク(自動スケジュール実行)
  17. ストリーム × タスクでパイプライン構築
  18. ロールとアクセス制御
  19. ゼロコピークローニング
  20. データ共有
  21. Snowflake × Python 連携
  22. パフォーマンスチューニング
  23. トラブルシューティング
  24. 業務用クエリテンプレート
  25. ショートカット・Tips
  26. 動的テーブル(Dynamic Tables)
  27. Cortex AI(SQL で AI を使う)
  28. 半構造化データ(JSON / Parquet)
  29. 外部テーブル(S3 直接クエリ)
  30. UDF(ユーザー定義関数)
  31. データマスキング
  32. 行アクセスポリシー
  33. Snowpark(Python DataFrame API)
  34. Snowflake Notebooks
  35. 検索最適化サービス
  36. マルチクラスターウェアハウス
  37. Apache Iceberg テーブル
  38. Fail-safe(最終防衛線)

1. なぜ DWH(データウェアハウス)が必要なのか

まず Snowflake の前に、DWH という概念を腹落ちさせる。

業務データベースと分析用データベースは別物

会社には「業務用 DB」がある。ECサイトならユーザーの注文データ、SaaS なら顧客の利用ログ。この DB は「注文を処理する」「ログインを認証する」ために最適化されている。

ここに「先月の売上を集計して」「顧客ごとの利用傾向を分析して」という重いクエリを投げると何が起きるか。業務 DB が遅くなる。 ユーザーがログインできなくなったり、注文が通らなくなったりする。

だから分析用のデータベースを別に用意する。これが DWH(データウェアハウス)。

ポイント:業務 DB に重い分析クエリを投げない。DWH に分離する。

DWH がないとどうなるか

やりたいこと DWH なし DWH あり
月次レポート スプシに手作業で転記 SQL 1 本で完了
前年比較 古いデータが消えてて不可能 全期間のデータが残ってる
部門横断分析 各部署に「データください」 1 つの DB に集約済み
リアルタイム集計 業務 DB に負荷をかける DWH で安全に実行
データ共有 CSV でメール添付 アクセス権付与するだけ

要するに DWH は「データを 1 箇所に集めて、安全に・高速に分析するための専用倉庫」。

DWH・データレイク・データマートの違い

「DWH 以外にも似た言葉があってよく分からない」という声があるので整理する。

データレイク DWH データマート
例え 倉庫 整理された棚 部署ごとの引き出し
データ 生データ(なんでも入れる) 整理・統合されたデータ 部署用に絞ったデータ
用途 保管 全社分析 部門分析
ユーザー エンジニア アナリスト・経営層 各部署の担当者

Snowflake はこの 3 つ全部の機能を 1 つで提供する。だから「データプラットフォーム」と呼ばれる。

なぜ今 DWH が必要になっているか

10 年前は「月次レポートは経理が Excel で作る」で回っていた。今は違う。

  • データ量が爆発的に増えた(ログ、IoT、行動データ)
  • リアルタイムに近い判断が求められる
  • 部門をまたいだデータ統合が必要
  • AI / ML にデータを食わせる基盤が必要

Excel やスプレッドシートでは限界がある。「先月の集計に 3 日かかる」「データが壊れてやり直し」「誰かが数式を壊した」——この手の問題は DWH で根本解決できる。


2. なぜ DWH の中で Snowflake が選ばれるのか

DWH は Snowflake だけじゃない。BigQuery(Google)、Redshift(AWS)、Databricks もある。その中でなぜ Snowflake が選ばれているのか。

競合との比較

Snowflake BigQuery Redshift Databricks
クラウド AWS/GCP/Azure 全対応 GCP のみ AWS のみ マルチクラウド
料金モデル 使った分だけ(秒単位) クエリ量課金 常時稼働(RI あり) 使った分だけ
スケール ボタン 1 つで即時 自動 手動リサイズ 自動
セットアップ ほぼゼロ 簡単 やや複雑 やや複雑
半構造化データ VARIANT 型で直接 STRUCT/ARRAY JSON 関数 Delta Lake
データ共有 ゼロコピー共有 Analytics Hub なし Delta Sharing
タイムトラベル 最大 90 日 7 日 なし(スナップショット) タイムトラベルあり
AI/ML 統合 Cortex AI Vertex AI SageMaker MLflow

Snowflake のコアアーキテクチャ:マルチクラスターシェアードデータ

Snowflake の設計思想を一言で言うと「1 つのデータを、複数の独立した計算エンジンが同時に使える」。

営業チームが重いクエリを回しても、経理チームのクエリに影響しない。データのコピーも発生しない。これが従来の DB にない最大の強み。

Snowflake が選ばれる 5 つの理由

1. マルチクラウド
AWS にいても GCP にいても Azure にいても使える。ベンダーロックインを避けたい企業に刺さる。BigQuery は GCP、Redshift は AWS に縛られる。

2. ストレージとコンピュートの完全分離
「データは保存しておくだけなのに課金される」問題がない。ウェアハウスを止めればコンピュート費用はゼロ。Redshift は常時稼働が基本。

3. ゼロコピー共有
データをコピーせずに他のチームや取引先と共有できる。CSV エクスポート→メール添付→インポートの地獄から解放される。これは Snowflake の最大の差別化ポイント。

4. 圧倒的にシンプル
インフラの知識がほぼ不要。ブラウザでログインして SQL を打つだけ。DBA(データベース管理者)がいなくても運用できる。

5. エコシステムの充実
dbt、Fivetran、Airbyte、Looker、Tableau、Streamlit——主要な ETL / BI ツールが全部対応している。

どういう会社が Snowflake を選ぶか

条件 向いている DWH
GCP で統一したい BigQuery
AWS で統一したい+常時稼働 OK Redshift
ML/AI がメイン用途 Databricks
マルチクラウド・シンプル・データ共有重視 Snowflake
小規模でとりあえず始めたい BigQuery(無料枠あり)

3. Snowflake の何が凄いのか

普通のデータベース(MySQL、PostgreSQL)と何が違うのか。素人目線で整理する。

一言で言うと

「サーバーを持たずに、巨大なデータを高速に分析できるクラウド DB」

従来の DB との違い

従来の DB(MySQL 等) Snowflake
サーバー管理 自分で構築・運用 不要。ブラウザで SQL 打つだけ
スケール サーバーのスペックに依存 ボタン 1 つで拡張・縮小
料金 サーバー常時稼働=常に課金 使った分だけ。止めれば 0 円
データ量 TB 超えると厳しい PB(ペタバイト)でも平気
同時アクセス ユーザー増えると遅くなる ウェアハウスを分ければ影響なし
半構造化データ JSON 扱いが面倒 VARIANT 型で直接格納・クエリ
過去データ復元 バックアップから手動復元 タイムトラベルで 1 行で復元

なぜ企業が Snowflake を選ぶのか

1. ストレージとコンピュートが分離している

従来の DB はデータを保存するサーバーとクエリを実行するサーバーが同じ。だから「データは保存しておくだけなのに、サーバーが動いてて課金される」問題が発生する。

Snowflake はデータの保存(ストレージ)とクエリの実行(ウェアハウス)が完全に別。ウェアハウスを止めてもデータは消えない。使わない時間は 0 円。

2. 秒単位でスケールできる

「月末の集計処理が重いから、月末だけウェアハウスを Large にして、終わったら X-Small に戻す」ができる。ダウンタイムなし。SQL 1 行で変更。

3. S3 / GCS / Azure Blob から直接読める

データを Snowflake にコピーしなくても、S3 に置いたまま分析できる(外部テーブル)。データの二重管理が不要。

4. 「やらかした」ときに戻せる(タイムトラベル)

DELETE FROM t_sales_log(WHERE なし)を実行してしまった。従来の DB なら「バックアップあります?」から始まる地獄。Snowflake なら UNDROP TABLE t_sales_log または SELECT * FROM t_sales_log AT(OFFSET => -300) で 5 分前のデータが見える。

5. データ共有が一瞬

別のチームや取引先とデータを共有するとき、CSV でエクスポート→メールで送付→インポート、という作業が不要。Snowflake 上でアクセス権を付与するだけ。データのコピーは発生しない。

ユーザーの業務での位置づけ

S3(利用ログデータ)
    ↓ COPY INTO / Snowpipe
Snowflake(分析用 DB)
    ↓ SQL で集計・分析
BI ツール(ダッシュボード表示)

S3 上の利用ログデータを Snowflake に取り込んで、マスタと突合して集計・分析する。今まで手作業(スプレッドシート + 関数 + 目視確認)でやっていた定常集計を、Snowflake + SQL で自動化する。


2. Snowflake のアーキテクチャ(3 層構造)

3 つのポイント:

  • 各層が独立してスケール可能
  • ウェアハウスを止めてもデータは消えない(ストレージ分離)
  • 複数ウェアハウスが同じデータに同時アクセス可能

ポイント:

  • ストレージとコンピュートが分離 → ウェアハウスを止めてもデータは消えない
  • 複数ウェアハウスが同じデータに同時アクセス可能
  • サーバー管理不要。ブラウザから SQL を打つだけ

オブジェクト階層

アカウント
├── データベース
│   ├── スキーマ
│   │   ├── テーブル
│   │   ├── ビュー
│   │   ├── ステージ(外部ストレージの参照)
│   │   ├── ストリーム(変更追跡)
│   │   ├── タスク(定期実行)
│   │   ├── パイプ(Snowpipe)
│   │   └── 関数 / プロシージャ
│   └── ...
├── ウェアハウス(計算リソース)
├── ロール(権限管理)
└── ユーザー

2. ログインと画面の使い方

ログイン

  1. https://app.snowflake.com にアクセス
  2. アカウント名・ユーザー名・パスワードを入力

Snowsight(Web UI)の主要画面

メニュー 用途
Worksheets SQL を書いて実行する場所
Dashboards チャート・グラフを配置
Data データベース・テーブルの構造確認
Marketplace 外部データセットの入手
Activity クエリ履歴・コスト確認
Admin ウェアハウス・ユーザー・ロール管理

ワークシートの使い方

  1. 「Worksheets」→「+」→「SQL Worksheet」
  2. 右上でロールウェアハウスを選択
  3. 左上でデータベーススキーマを選択
  4. SQL を入力 → Ctrl+Enter で実行
-- 最初にやること
USE ROLE SYSADMIN;
USE WAREHOUSE my_wh;
USE DATABASE my_database;
USE SCHEMA public;

3. SQL 基本操作

データを見る

-- テーブル一覧
SHOW TABLES;

-- カラム構造を確認
DESCRIBE TABLE t_sales_log;

-- 中身を確認(先頭 100 行)
SELECT * FROM t_sales_log LIMIT 100;

-- 行数を確認
SELECT COUNT(*) FROM t_sales_log;

絞り込み(WHERE)

-- 完全一致
WHERE account_id = 'A001'

-- 部分一致
WHERE feature LIKE '%packet%'

-- 範囲指定
WHERE usage_date BETWEEN '2026-03-01' AND '2026-03-31'

-- 複数条件
WHERE account_id = 'A001' AND feature = 'web_access'

-- IN句
WHERE feature IN ('web_access', 'pv', 'mail')

-- NULL チェック
WHERE note IS NULL
WHERE note IS NOT NULL

集計(GROUP BY)

SELECT
    feature,
    COUNT(*) AS cnt,
    SUM(usage_amount) AS total,
    AVG(usage_amount) AS avg_val,
    MIN(usage_amount) AS min_val,
    MAX(usage_amount) AS max_val
FROM t_sales_log
GROUP BY feature
ORDER BY total DESC;

テーブル作成

CREATE TABLE t_sales_log (
    account_id STRING,
    usage_date DATE,
    feature STRING,
    usage_amount NUMBER(18,2),
    created_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);

-- 既存テーブルから作成(構造+データコピー)
CREATE TABLE t_sales_log_BACKUP AS SELECT * FROM t_sales_log;

-- 構造だけコピー(データなし)
CREATE TABLE t_sales_log_EMPTY LIKE t_sales_log;

データ操作

-- 挿入
INSERT INTO t_sales_log (account_id, usage_date, feature, usage_amount)
VALUES ('A001', '2026-04-01', 'web_access', 1500);

-- 更新
UPDATE t_sales_log SET usage_amount = 2000
WHERE account_id = 'A001' AND usage_date = '2026-04-01';

-- 削除
DELETE FROM t_sales_log WHERE usage_date < '2025-01-01';

-- テーブル削除
DROP TABLE t_sales_log_BACKUP;

-- テーブル中身だけ削除(構造は残す)
TRUNCATE TABLE t_sales_log_BACKUP;

JOIN

-- INNER JOIN(両方に存在するもの)
SELECT u.*, p.unit_price
FROM t_sales_log u
INNER JOIN M_PRICING p ON u.feature = p.feature;

-- LEFT JOIN(左テーブルは全行残す)
SELECT u.*, p.unit_price
FROM t_sales_log u
LEFT JOIN M_PRICING p ON u.feature = p.feature;

サブクエリ・CTE

-- CTE(WITH句):複雑なクエリを読みやすくする
WITH monthly_usage AS (
    SELECT
        account_id,
        DATE_TRUNC('month', usage_date) AS month,
        SUM(usage_amount) AS total
    FROM t_sales_log
    GROUP BY account_id, month
)
SELECT * FROM monthly_usage
WHERE total > 10000
ORDER BY month;

4. データ型一覧

カテゴリ 説明
数値 NUMBER(p,s) 精度・スケール指定 NUMBER(18,2)
数値 INTEGER 整数 38桁まで
数値 FLOAT 浮動小数点 倍精度
文字列 STRING / VARCHAR 可変長文字列 最大16MB
文字列 CHAR(n) 固定長 CHAR(10)
日付時刻 DATE 日付のみ 2026-04-06
日付時刻 TIMESTAMP_NTZ タイムゾーンなし 2026-04-06 15:30:00
日付時刻 TIMESTAMP_LTZ ローカルTZ セッションTZに依存
日付時刻 TIMESTAMP_TZ TZ付き 2026-04-06 15:30:00 +09:00
日付時刻 TIME 時刻のみ 15:30:00
論理 BOOLEAN 真偽値 TRUE / FALSE
半構造化 VARIANT JSON等を格納 {'key': 'value'}
半構造化 OBJECT キーバリュー JSON オブジェクト
半構造化 ARRAY 配列 [1, 2, 3]
バイナリ BINARY バイナリデータ -

5. データの取り込み(S3 → Snowflake)

全体フロー

ステップ 1:ファイルフォーマット定義

-- CSV
CREATE OR REPLACE FILE FORMAT csv_format
  TYPE = 'CSV'
  FIELD_DELIMITER = ','
  SKIP_HEADER = 1
  NULL_IF = ('', 'NULL')
  FIELD_OPTIONALLY_ENCLOSED_BY = '"';

-- JSON
CREATE OR REPLACE FILE FORMAT json_format
  TYPE = 'JSON'
  STRIP_OUTER_ARRAY = TRUE;

-- Parquet
CREATE OR REPLACE FILE FORMAT parquet_format
  TYPE = 'PARQUET';

ステップ 2:ステージ作成

-- 外部ステージ(S3)
CREATE OR REPLACE STAGE my_s3_stage
  URL = 's3://your-bucket/usage-data/'
  CREDENTIALS = (AWS_KEY_ID = '***' AWS_SECRET_KEY = '***')
  FILE_FORMAT = csv_format;

-- 内部ステージ(Snowflake 内部ストレージ)
CREATE OR REPLACE STAGE my_internal_stage
  FILE_FORMAT = csv_format;

-- ステージの中身を確認
LIST @my_s3_stage;

ステップ 3:データ読み込み

-- S3 → テーブル
COPY INTO t_sales_log
FROM @my_s3_stage
FILE_FORMAT = csv_format
ON_ERROR = 'CONTINUE'
PURGE = FALSE;

-- 特定ファイルだけ読み込み
COPY INTO t_sales_log
FROM @my_s3_stage/2026/04/
PATTERN = '.*usage.*[.]csv'
FILE_FORMAT = csv_format;

-- 読み込み結果確認
SELECT COUNT(*) FROM t_sales_log;

エラー確認

-- エラーだけ確認(実際には読み込まない)
COPY INTO t_sales_log
FROM @my_s3_stage
FILE_FORMAT = csv_format
VALIDATION_MODE = 'RETURN_ERRORS';

-- 直近の COPY 履歴
SELECT * FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY(
  TABLE_NAME => 't_sales_log',
  START_TIME => DATEADD('hour', -24, CURRENT_TIMESTAMP())
));

6. Snowpipe(リアルタイム取り込み)

S3 にファイルが置かれたら自動で取り込む仕組み。

-- Snowpipe 作成
CREATE OR REPLACE PIPE my_pipe
  AUTO_INGEST = TRUE
AS
COPY INTO t_sales_log
FROM @my_s3_stage
FILE_FORMAT = csv_format;

-- パイプの状態確認
SHOW PIPES;

-- パイプの SQS 通知先を確認(S3 イベント通知に設定する)
SELECT SYSTEM$PIPE_STATUS('my_pipe');

S3 のイベント通知を Snowpipe の SQS キューに接続すると、ファイルが置かれるたびに自動で読み込まれる。


7. ウェアハウス管理とコスト最適化

ウェアハウスサイズと料金

サイズ クレジット/時間 用途
X-Small 1 軽いクエリ、開発・テスト
Small 2 通常の分析
Medium 4 中規模データ処理
Large 8 大規模集計
X-Large 16 重い処理

ウェアハウス操作

-- 作成
CREATE WAREHOUSE IF NOT EXISTS dev_wh
  WAREHOUSE_SIZE = 'X-SMALL'
  AUTO_SUSPEND = 60
  AUTO_RESUME = TRUE
  INITIALLY_SUSPENDED = TRUE;

-- サイズ変更(即座に反映、ダウンタイムなし)
ALTER WAREHOUSE dev_wh SET WAREHOUSE_SIZE = 'MEDIUM';

-- 手動停止
ALTER WAREHOUSE dev_wh SUSPEND;

-- 手動起動
ALTER WAREHOUSE dev_wh RESUME;

コスト確認

-- 過去 7 日のウェアハウス消費
SELECT
    WAREHOUSE_NAME,
    SUM(CREDITS_USED) AS total_credits
FROM TABLE(INFORMATION_SCHEMA.WAREHOUSE_METERING_HISTORY(
    DATE_RANGE_START => DATEADD('day', -7, CURRENT_DATE())
))
GROUP BY WAREHOUSE_NAME
ORDER BY total_credits DESC;

コスト削減チェックリスト

  • AUTO_SUSPEND = 60(デフォルト 600 は長すぎ)
  • AUTO_RESUME = TRUE
  • 開発用は X-Small
  • 使わないウェアハウスは SUSPEND
  • 定期実行はタスク化してウェアハウスを自動起動/停止

8. 関数リファレンス

文字列関数

UPPER('hello')              -- 'HELLO'
LOWER('HELLO')              -- 'hello'
TRIM('  hello  ')           -- 'hello'
LENGTH('hello')             -- 5
SUBSTRING('hello', 1, 3)    -- 'hel'
CONCAT('a', 'b', 'c')      -- 'abc'
REPLACE('hello', 'l', 'r')  -- 'herro'
SPLIT_PART('a-b-c', '-', 2) -- 'b'
REGEXP_LIKE('abc123', '\\d+') -- TRUE

日付関数

CURRENT_DATE()                          -- 2026-04-06
CURRENT_TIMESTAMP()                     -- 2026-04-06 15:30:00
DATE_TRUNC('month', usage_date)         -- 月初に切り捨て
DATEADD('day', -7, CURRENT_DATE())      -- 7日前
DATEDIFF('day', start_date, end_date)   -- 日数差
TO_DATE('2026-04-06', 'YYYY-MM-DD')     -- 文字列→DATE
TO_CHAR(usage_date, 'YYYY/MM/DD')       -- DATE→文字列
EXTRACT('month' FROM usage_date)        -- 月だけ取得
LAST_DAY(usage_date)                    -- 月末日

変換関数

TO_NUMBER('123.45', '999.99')  -- 文字列→数値
TO_VARCHAR(123)                 -- 数値→文字列
TO_TIMESTAMP('2026-04-06 15:30:00')
TRY_TO_NUMBER('abc')           -- 変換失敗時に NULL(エラーにならない)
CAST(col AS INTEGER)           -- 型変換
col::INTEGER                   -- 型変換(省略記法)

条件関数

CASE WHEN amount > 1000 THEN '高' WHEN amount > 500 THEN '中' ELSE '低' END
COALESCE(col1, col2, 'デフォルト')  -- 最初の非NULL値
NULLIF(col1, col2)                  -- col1=col2ならNULL
IFF(condition, true_val, false_val) -- Snowflake独自のIF
DECODE(col, 'a', 1, 'b', 2, 0)     -- 値マッピング

集計関数

COUNT(*)           -- 行数
COUNT(DISTINCT col) -- ユニーク数
SUM(amount)        -- 合計
AVG(amount)        -- 平均
MEDIAN(amount)     -- 中央値
MIN(amount)        -- 最小
MAX(amount)        -- 最大
STDDEV(amount)     -- 標準偏差
LISTAGG(col, ',')  -- 値を連結
ARRAY_AGG(col)     -- 配列にまとめる

9. ウィンドウ関数

-- 前月比較(LAG)
SELECT
    account_id, month, total,
    LAG(total) OVER (PARTITION BY account_id ORDER BY month) AS prev,
    total - LAG(total) OVER (PARTITION BY account_id ORDER BY month) AS diff
FROM monthly_usage;

-- 累計(SUM OVER)
SUM(amount) OVER (PARTITION BY account_id ORDER BY usage_date) AS cumulative

-- ランキング
ROW_NUMBER() OVER (PARTITION BY feature ORDER BY amount DESC) AS rank
RANK() OVER (ORDER BY amount DESC) AS rank_with_ties
DENSE_RANK() OVER (ORDER BY amount DESC) AS dense_rank

-- 移動平均(直近7日)
AVG(amount) OVER (
    PARTITION BY account_id
    ORDER BY usage_date
    ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) AS moving_avg_7d

-- 全体に対する割合
amount / SUM(amount) OVER () * 100 AS pct_of_total

-- グループ内の割合
amount / SUM(amount) OVER (PARTITION BY feature) * 100 AS pct_in_feature

10. ビューとマテリアライズドビュー

-- ビュー(毎回クエリを実行)
CREATE OR REPLACE VIEW v_monthly_usage AS
SELECT
    account_id,
    DATE_TRUNC('month', usage_date) AS month,
    feature,
    SUM(usage_amount) AS total
FROM t_sales_log
GROUP BY account_id, month, feature;

-- マテリアライズドビュー(結果をキャッシュ、自動更新)
CREATE OR REPLACE MATERIALIZED VIEW mv_daily_summary AS
SELECT
    usage_date,
    feature,
    COUNT(*) AS cnt,
    SUM(usage_amount) AS total
FROM t_sales_log
GROUP BY usage_date, feature;

11. タイムトラベル(過去データの復元)

誤って削除・更新したデータを復元できる。最大 90 日間。

-- 5 分前のデータを参照
SELECT * FROM t_sales_log AT(OFFSET => -60*5);

-- 特定時刻のデータを参照
SELECT * FROM t_sales_log AT(TIMESTAMP => '2026-04-06 10:00:00'::TIMESTAMP);

-- 誤って DELETE した場合の復元
CREATE TABLE t_sales_log_RESTORED AS
SELECT * FROM t_sales_log AT(TIMESTAMP => '2026-04-06 10:00:00'::TIMESTAMP);

-- DROP したテーブルの復元
UNDROP TABLE t_sales_log;

-- 保持期間の設定
ALTER TABLE t_sales_log SET DATA_RETENTION_TIME_IN_DAYS = 7;

12. ストリーム(変更データキャプチャ)

テーブルの変更(INSERT/UPDATE/DELETE)を自動追跡する。

-- ストリーム作成
CREATE OR REPLACE STREAM usage_stream ON TABLE t_sales_log;

-- 変更を確認
SELECT * FROM usage_stream;
-- METADATA$ACTION: INSERT / DELETE
-- METADATA$ISUPDATE: TRUE / FALSE
-- METADATA$ROW_ID: 行ID

-- ストリームにデータがあるか確認
SELECT SYSTEM$STREAM_HAS_DATA('usage_stream');

13. タスク(自動スケジュール実行)

-- 毎日朝 9 時に集計を実行するタスク
CREATE OR REPLACE TASK daily_aggregation
  WAREHOUSE = dev_wh
  SCHEDULE = 'USING CRON 0 9 * * * Asia/Tokyo'
AS
INSERT INTO T_DAILY_SUMMARY
SELECT
    CURRENT_DATE() - 1 AS target_date,
    feature,
    COUNT(*) AS cnt,
    SUM(usage_amount) AS total
FROM t_sales_log
WHERE usage_date = CURRENT_DATE() - 1
GROUP BY feature;

-- タスクを有効化(デフォルトは停止状態)
ALTER TASK daily_aggregation RESUME;

-- タスク一覧
SHOW TASKS;

-- 手動実行
EXECUTE TASK daily_aggregation;

-- タスクの実行履歴
SELECT *
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY())
ORDER BY SCHEDULED_TIME DESC
LIMIT 10;

14. ストリーム × タスクでパイプライン構築

S3 → Snowpipe(自動取込)→ テーブル → ストリーム(変更検知)→ タスク(定期実行)→ 集計テーブル

-- 1. ストリームで変更を監視
CREATE OR REPLACE STREAM usage_stream ON TABLE t_sales_log;

-- 2. タスクで新しいデータだけ処理
CREATE OR REPLACE TASK process_new_usage
  WAREHOUSE = dev_wh
  SCHEDULE = '5 MINUTE'
  WHEN SYSTEM$STREAM_HAS_DATA('usage_stream')
AS
INSERT INTO t_sales_log_PROCESSED
SELECT
    account_id, usage_date, feature, usage_amount,
    CURRENT_TIMESTAMP() AS processed_at
FROM usage_stream
WHERE METADATA$ACTION = 'INSERT';

ALTER TASK process_new_usage RESUME;

15. ロールとアクセス制御

-- 自分の情報
SELECT CURRENT_USER(), CURRENT_ROLE(), CURRENT_WAREHOUSE(), CURRENT_DATABASE();

-- ロール一覧
SHOW ROLES;

-- 権限付与
GRANT USAGE ON DATABASE my_database TO ROLE analyst_role;
GRANT USAGE ON SCHEMA my_database.public TO ROLE analyst_role;
GRANT SELECT ON ALL TABLES IN SCHEMA my_database.public TO ROLE analyst_role;

-- 今後作るテーブルにも自動で権限付与
GRANT SELECT ON FUTURE TABLES IN SCHEMA my_database.public TO ROLE analyst_role;

-- 権限確認
SHOW GRANTS TO ROLE analyst_role;
SHOW GRANTS ON TABLE t_sales_log;

16. ゼロコピークローニング

データをコピーせずに瞬時に複製。ストレージ追加なし。

-- テーブルのクローン
CREATE TABLE t_sales_log_DEV CLONE t_sales_log;

-- データベースごとクローン
CREATE DATABASE my_database_dev CLONE my_database;

-- 特定時点のクローン(タイムトラベル)
CREATE TABLE t_sales_log_SNAPSHOT CLONE t_sales_log
  AT(TIMESTAMP => '2026-04-06 10:00:00'::TIMESTAMP);

17. データ共有

-- 共有を作成
CREATE SHARE usage_share;
GRANT USAGE ON DATABASE my_database TO SHARE usage_share;
GRANT SELECT ON TABLE t_sales_log TO SHARE usage_share;

-- 共有先アカウントに付与
ALTER SHARE usage_share ADD ACCOUNTS = partner_account;

18. Snowflake × Python 連携

# pip install snowflake-connector-python
import snowflake.connector

conn = snowflake.connector.connect(
    user='USER',
    password='PASS',
    account='ACCOUNT',
    warehouse='DEV_WH',
    database='my_database',
    schema='public'
)

cur = conn.cursor()
cur.execute("SELECT * FROM t_sales_log LIMIT 10")
rows = cur.fetchall()

# pandas 連携
import pandas as pd
cur.execute("SELECT * FROM t_sales_log")
df = pd.DataFrame(cur.fetchall(), columns=[col[0] for col in cur.description])

19. パフォーマンスチューニング

症状 原因 対策
クエリが遅い ウェアハウス小さい サイズ UP
クエリが遅い フルスキャン WHERE で絞り込む
クエリが遅い JOIN が重い クラスタリングキー設定
同時実行で遅い リソース競合 ウェアハウスを分ける
コストが高い ウェアハウス停止してない AUTO_SUSPEND=60
-- クエリプロファイルで原因特定
-- Snowsight で実行後、「Query Profile」タブを確認

-- クラスタリングキー設定(大規模テーブル向け)
ALTER TABLE t_sales_log CLUSTER BY (account_id, usage_date);

-- クラスタリング状態確認
SELECT SYSTEM$CLUSTERING_INFORMATION('t_sales_log');

-- キャッシュ活用
-- 同じクエリは自動でキャッシュされる(24時間保持)
-- ウェアハウスローカルキャッシュも自動

20. トラブルシューティング

問題 対策
データが見えない SHOW GRANTS TO ROLE ロール名 で権限確認
COPY INTO 失敗 VALIDATION_MODE = 'RETURN_ERRORS' でエラー確認
タイムゾーンずれ ALTER SESSION SET TIMEZONE = 'Asia/Tokyo'
文字化け FILE_FORMATENCODING = 'UTF-8' 追加
テーブル消した UNDROP TABLE テーブル名
データ消した タイムトラベルで復元
コスト爆発 WAREHOUSE_METERING_HISTORY で確認
-- クエリ履歴(過去 24 時間)
SELECT QUERY_TEXT, EXECUTION_STATUS, ERROR_MESSAGE, TOTAL_ELAPSED_TIME
FROM TABLE(INFORMATION_SCHEMA.QUERY_HISTORY())
WHERE START_TIME >= DATEADD('hour', -24, CURRENT_TIMESTAMP())
ORDER BY START_TIME DESC;

-- セッション設定確認
SHOW PARAMETERS IN SESSION;

-- タイムゾーン設定
ALTER SESSION SET TIMEZONE = 'Asia/Tokyo';

21. 業務用クエリテンプレート

月次集計

SELECT
    u.account_id,
    DATE_TRUNC('month', u.usage_date) AS billing_month,
    u.feature,
    SUM(u.usage_amount) AS total_usage,
    SUM(u.usage_amount) * p.unit_price AS billing_amount
FROM t_sales_log u
JOIN M_PRICING p ON u.feature = p.feature
GROUP BY u.account_id, billing_month, u.feature, p.unit_price
ORDER BY u.account_id, billing_month;

前月比較

WITH monthly AS (
    SELECT
        account_id,
        DATE_TRUNC('month', usage_date) AS month,
        SUM(usage_amount) AS total
    FROM t_sales_log
    GROUP BY account_id, month
)
SELECT
    account_id, month, total,
    LAG(total) OVER (PARTITION BY account_id ORDER BY month) AS prev_month,
    ROUND((total - LAG(total) OVER (PARTITION BY account_id ORDER BY month))
          / NULLIF(LAG(total) OVER (PARTITION BY account_id ORDER BY month), 0) * 100, 1)
          AS growth_pct
FROM monthly
ORDER BY account_id, month;

異常値検知(平均の 3σ 超え)

WITH stats AS (
    SELECT feature,
        AVG(usage_amount) AS avg_val,
        STDDEV(usage_amount) AS std_val
    FROM t_sales_log
    GROUP BY feature
)
SELECT u.*, s.avg_val, s.std_val
FROM t_sales_log u
JOIN stats s ON u.feature = s.feature
WHERE u.usage_amount > s.avg_val + (3 * s.std_val);

日次トレンド(直近 30 日)

SELECT
    usage_date,
    feature,
    COUNT(*) AS cnt,
    SUM(usage_amount) AS total,
    AVG(usage_amount) OVER (
        PARTITION BY feature
        ORDER BY usage_date
        ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
    ) AS moving_avg_7d
FROM t_sales_log
WHERE usage_date >= DATEADD('day', -30, CURRENT_DATE())
GROUP BY usage_date, feature
ORDER BY usage_date;

アカウント別ランキング

SELECT
    account_id,
    SUM(usage_amount) AS total,
    RANK() OVER (ORDER BY SUM(usage_amount) DESC) AS rank
FROM t_sales_log
WHERE usage_date >= DATE_TRUNC('month', CURRENT_DATE())
GROUP BY account_id
ORDER BY rank;

22. ショートカット・Tips

Snowsight ショートカット

操作 キー
クエリ実行 Ctrl + Enter
全クエリ実行 Ctrl + Shift + Enter
コメントアウト Ctrl + /
フォーマット Ctrl + Shift + F

Tips

-- クエリ結果をテーブルに保存
CREATE TABLE result_table AS SELECT ...;

-- 一時テーブル(セッション終了で消える)
CREATE TEMPORARY TABLE tmp AS SELECT ...;

-- LIMIT なしで結果行数だけ確認
SELECT COUNT(*) FROM (SELECT ...);

-- カラム名に日本語を使う場合はダブルクォート
SELECT "都道府県名" FROM population;

-- セミコロンで複数クエリを区切り
SELECT 1; SELECT 2; SELECT 3;

23. 動的テーブル(Dynamic Tables)

ストリーム+タスクを使わずに、宣言的にデータパイプラインを構築する仕組み。SQL で「どう変換するか」を書くだけで、Snowflake が自動でリフレッシュしてくれる。

-- 動的テーブル作成
CREATE OR REPLACE DYNAMIC TABLE dt_monthly_usage
  TARGET_LAG = '5 minutes'    -- 5分以内にデータを反映
  WAREHOUSE = dev_wh
AS
SELECT
    account_id,
    DATE_TRUNC('month', usage_date) AS month,
    feature,
    SUM(usage_amount) AS total
FROM t_sales_log
GROUP BY account_id, month, feature;

-- 動的テーブル一覧
SHOW DYNAMIC TABLES;

-- リフレッシュ履歴確認
SELECT * FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLE_REFRESH_HISTORY())
ORDER BY REFRESH_END_TIME DESC LIMIT 10;

-- 手動リフレッシュ
ALTER DYNAMIC TABLE dt_monthly_usage REFRESH;

-- 一時停止
ALTER DYNAMIC TABLE dt_monthly_usage SUSPEND;

ストリーム+タスク vs 動的テーブル:

ストリーム+タスク 動的テーブル
複雑さ ストリーム作成→タスク作成→有効化 SQL 1 つ
制御 細かく制御可能 Snowflake に任せる
用途 複雑な処理ロジック シンプルな変換パイプライン
推奨 条件分岐やエラー処理が必要な場合 集計・変換だけなら動的テーブル

24. Cortex AI(SQL で AI を使う)

Snowflake 内で SQL だけで AI / ML 機能が使える。データを外部に出す必要がない。

LLM 関数(テキスト処理)

-- テキスト要約
SELECT SNOWFLAKE.CORTEX.SUMMARIZE(description) AS summary
FROM support_tickets;

-- 感情分析
SELECT
    comment_text,
    SNOWFLAKE.CORTEX.SENTIMENT(comment_text) AS sentiment_score
FROM customer_feedback;
-- 結果: -1(ネガティブ)〜 1(ポジティブ)

-- テキスト翻訳
SELECT SNOWFLAKE.CORTEX.TRANSLATE(
    description, 'ja', 'en'
) AS english_text
FROM support_tickets;

-- 自由形式の AI 質問
SELECT SNOWFLAKE.CORTEX.COMPLETE(
    'claude-3-5-sonnet',
    'このエラーメッセージの原因を説明してください: ' || error_message
) AS ai_response
FROM error_logs;

ML 関数(予測・異常検知)

-- 異常検知モデルの作成
CREATE OR REPLACE SNOWFLAKE.ML.ANOMALY_DETECTION usage_anomaly_model(
    INPUT_DATA => TABLE(
        SELECT usage_date AS timestamp, usage_amount AS value
        FROM t_sales_log
        WHERE feature = 'web_access'
    ),
    TIMESTAMP_COLNAME => 'timestamp',
    TARGET_COLNAME => 'value'
);

-- 異常検知の実行
CALL usage_anomaly_model!DETECT_ANOMALIES(
    INPUT_DATA => TABLE(
        SELECT usage_date AS timestamp, usage_amount AS value
        FROM t_sales_log
        WHERE feature = 'web_access' AND usage_date >= '2026-04-01'
    ),
    TIMESTAMP_COLNAME => 'timestamp',
    TARGET_COLNAME => 'value'
);

-- 時系列予測
CREATE OR REPLACE SNOWFLAKE.ML.FORECAST usage_forecast_model(
    INPUT_DATA => TABLE(
        SELECT usage_date AS timestamp, usage_amount AS value
        FROM t_sales_log WHERE feature = 'web_access'
    ),
    TIMESTAMP_COLNAME => 'timestamp',
    TARGET_COLNAME => 'value'
);

-- 30日先の予測
CALL usage_forecast_model!FORECAST(FORECASTING_PERIODS => 30);

Cortex Code(自然言語 → SQL)

Snowsight の左メニュー「Cortex Code」から、自然言語で質問するだけで SQL を自動生成。

質問: 「先月の利用量トップ10のアカウントを教えて」
→ SQLが自動生成 → 実行 → 結果表示

25. 半構造化データ(JSON / Parquet)

Snowflake は JSON をそのまま格納・クエリできる。

-- VARIANT 型のカラムを持つテーブル
CREATE TABLE events (
    event_id INTEGER,
    event_data VARIANT,
    created_at TIMESTAMP
);

-- JSON データ挿入
INSERT INTO events (event_id, event_data)
SELECT 1, PARSE_JSON('{
    "user_id": "u123",
    "action": "login",
    "device": {"type": "mobile", "os": "iOS"}
}');

-- JSON の値を取り出す
SELECT
    event_data:user_id::STRING AS user_id,
    event_data:action::STRING AS action,
    event_data:device.type::STRING AS device_type,
    event_data:device.os::STRING AS device_os
FROM events;

-- JSON 配列を展開(FLATTEN)
SELECT
    f.value:name::STRING AS item_name,
    f.value:price::NUMBER AS item_price
FROM orders,
LATERAL FLATTEN(input => order_data:items) f;

26. 外部テーブル(S3 のデータを直接クエリ)

データを Snowflake にコピーせずに、S3 に置いたままクエリできる。

-- 外部テーブル作成
CREATE OR REPLACE EXTERNAL TABLE ext_usage
  WITH LOCATION = @my_s3_stage
  FILE_FORMAT = parquet_format
  AUTO_REFRESH = TRUE;

-- クエリ(通常のテーブルと同じ SQL)
SELECT * FROM ext_usage WHERE account_id = 'A001';

27. UDF(ユーザー定義関数)

-- SQL UDF
CREATE OR REPLACE FUNCTION calc_billing(amount NUMBER, rate NUMBER)
  RETURNS NUMBER
AS
$$
  amount * rate
$$;

-- 使い方
SELECT calc_billing(usage_amount, 0.05) AS billing FROM t_sales_log;

-- Python UDF
CREATE OR REPLACE FUNCTION categorize_usage(amount NUMBER)
  RETURNS STRING
  LANGUAGE PYTHON
  RUNTIME_VERSION = '3.11'
  HANDLER = 'categorize'
AS
$$
def categorize(amount):
    if amount > 10000: return '大口'
    elif amount > 1000: return '中口'
    else: return '小口'
$$;

-- 使い方
SELECT feature, usage_amount, categorize_usage(usage_amount) AS category
FROM t_sales_log;

28. データマスキング(機密データの保護)

ロールに応じて機密データを自動的にマスクする。

-- マスキングポリシー作成
CREATE OR REPLACE MASKING POLICY mask_email AS (val STRING)
  RETURNS STRING ->
  CASE
    WHEN CURRENT_ROLE() IN ('ADMIN', 'SYSADMIN') THEN val
    ELSE '***@***.com'
  END;

-- テーブルに適用
ALTER TABLE customers MODIFY COLUMN email
  SET MASKING POLICY mask_email;

-- ADMIN ロール → 実際のメールが見える
-- それ以外 → '***@***.com' と表示される

29. 行アクセスポリシー(行レベルセキュリティ)

ロールに応じて見える行を制限する。

-- 行アクセスポリシー作成
CREATE OR REPLACE ROW ACCESS POLICY account_access AS (account_id STRING)
  RETURNS BOOLEAN ->
  CASE
    WHEN CURRENT_ROLE() = 'ADMIN' THEN TRUE           -- 全行見える
    WHEN account_id = CURRENT_USER() THEN TRUE          -- 自分のデータだけ
    ELSE FALSE
  END;

-- テーブルに適用
ALTER TABLE t_sales_log ADD ROW ACCESS POLICY account_access ON (account_id);

30. Snowpark(Python DataFrame API)

SQL を書かずに Python の DataFrame で Snowflake を操作できる。

from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, sum as sum_, avg

# セッション作成
session = Session.builder.configs({
    "account": "ACCOUNT",
    "user": "USER",
    "password": "PASS",
    "warehouse": "DEV_WH",
    "database": "my_database",
    "schema": "public"
}).create()

# テーブル読み込み
df = session.table("t_sales_log")

# フィルタ+集計(SQL を書かない)
result = (
    df.filter(col("feature") == "packet")
    .group_by("account_id")
    .agg(
        sum_("usage_amount").alias("total"),
        avg("usage_amount").alias("avg_val")
    )
    .sort(col("total").desc())
)

result.show()

# 結果をテーブルに保存
result.write.save_as_table("T_PACKET_SUMMARY")

31. Snowflake Notebooks(ブラウザで Python 実行)

Snowsight 上で Jupyter Notebook のように Python / SQL を実行できる。

  1. Snowsight → 「Notebooks」→「+」
  2. SQL セルと Python セルを自由に切り替え
  3. matplotlib / plotly でグラフ描画
  4. Snowpark が組み込み済み
# Notebook 内の Python セル
import streamlit as st
import matplotlib.pyplot as plt

# SQL の結果を DataFrame として取得
df = session.sql("SELECT usage_date, SUM(usage_amount) AS total FROM t_sales_log GROUP BY usage_date").to_pandas()

# グラフ描画
plt.figure(figsize=(12, 4))
plt.plot(df['USAGE_DATE'], df['TOTAL'])
plt.title('日次利用量推移')
plt.show()

32. 検索最適化サービス

大規模テーブルの特定カラムに対する検索を高速化する。

-- 検索最適化を有効化
ALTER TABLE t_sales_log ADD SEARCH OPTIMIZATION ON EQUALITY(account_id);

-- ポイント検索が高速になる
SELECT * FROM t_sales_log WHERE account_id = 'A001';

-- 検索最適化の状態確認
DESCRIBE SEARCH OPTIMIZATION ON t_sales_log;

33. マルチクラスターウェアハウス

同時アクセスが多いとき、ウェアハウスを自動でスケールアウトする。

CREATE OR REPLACE WAREHOUSE auto_scale_wh
  WAREHOUSE_SIZE = 'SMALL'
  MIN_CLUSTER_COUNT = 1
  MAX_CLUSTER_COUNT = 3      -- 最大3クラスターに自動拡張
  SCALING_POLICY = 'STANDARD'
  AUTO_SUSPEND = 60
  AUTO_RESUME = TRUE;

34. Apache Iceberg テーブル

外部ストレージ(S3 等)に Parquet 形式でデータを保持しつつ、Snowflake のクエリ性能を活用する。データレイクとの統合に使う。

-- 外部ボリューム作成(S3)
CREATE OR REPLACE EXTERNAL VOLUME my_iceberg_vol
  STORAGE_LOCATIONS = (
    (
      NAME = 'my-s3-location'
      STORAGE_BASE_URL = 's3://my-bucket/iceberg/'
      STORAGE_PROVIDER = 'S3'
      STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::123456789:role/my-role'
    )
  );

-- Iceberg テーブル作成
CREATE OR REPLACE ICEBERG TABLE usage_iceberg
  CATALOG = 'SNOWFLAKE'
  EXTERNAL_VOLUME = 'my_iceberg_vol'
  BASE_LOCATION = 'usage/'
AS SELECT * FROM t_sales_log;

35. Fail-safe(最終防衛線)

タイムトラベル期間が過ぎたあとも、Snowflake が内部的に 7 日間データを保持する。ユーザーが直接アクセスすることはできず、Snowflake サポートに依頼して復元する。

データ変更 → タイムトラベル期間(最大90日)→ Fail-safe(7日)→ 完全削除

コスト注意:Fail-safe 期間中もストレージ料金が発生する。


参考リンク

3
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?