はじめに
スプレッドシートでの集計に限界を感じて、Snowflake で分析基盤を作ろうとしたとき「結局何からやればいいのか分からない」状態になった。
公式ドキュメントは膨大、Zenn の記事は 1,290 本、Qiita にも大量にある。全部読むのは無理。
この記事はそのときに整理した内容で、業務で Snowflake を使う前提で必要な知識を一通りまとめた。「DWH って何?」から「異常検知 SQL」まで、この 1 記事で完結する。
目次
- なぜ DWH が必要なのか
- なぜ Snowflake が選ばれるのか
- Snowflake の何が凄いのか
- アーキテクチャ(3 層構造)
- ログインと画面の使い方
- SQL 基本操作
- データ型一覧
- データの取り込み(S3 → Snowflake)
- Snowpipe(リアルタイム取り込み)
- ウェアハウス管理とコスト最適化
- 関数リファレンス
- ウィンドウ関数
- ビューとマテリアライズドビュー
- タイムトラベル(過去データの復元)
- ストリーム(変更データキャプチャ)
- タスク(自動スケジュール実行)
- ストリーム × タスクでパイプライン構築
- ロールとアクセス制御
- ゼロコピークローニング
- データ共有
- Snowflake × Python 連携
- パフォーマンスチューニング
- トラブルシューティング
- 業務用クエリテンプレート
- ショートカット・Tips
- 動的テーブル(Dynamic Tables)
- Cortex AI(SQL で AI を使う)
- 半構造化データ(JSON / Parquet)
- 外部テーブル(S3 直接クエリ)
- UDF(ユーザー定義関数)
- データマスキング
- 行アクセスポリシー
- Snowpark(Python DataFrame API)
- Snowflake Notebooks
- 検索最適化サービス
- マルチクラスターウェアハウス
- Apache Iceberg テーブル
- Fail-safe(最終防衛線)
1. なぜ DWH(データウェアハウス)が必要なのか
まず Snowflake の前に、DWH という概念を腹落ちさせる。
業務データベースと分析用データベースは別物
会社には「業務用 DB」がある。ECサイトならユーザーの注文データ、SaaS なら顧客の利用ログ。この DB は「注文を処理する」「ログインを認証する」ために最適化されている。
ここに「先月の売上を集計して」「顧客ごとの利用傾向を分析して」という重いクエリを投げると何が起きるか。業務 DB が遅くなる。 ユーザーがログインできなくなったり、注文が通らなくなったりする。
だから分析用のデータベースを別に用意する。これが DWH(データウェアハウス)。
ポイント:業務 DB に重い分析クエリを投げない。DWH に分離する。
DWH がないとどうなるか
| やりたいこと | DWH なし | DWH あり |
|---|---|---|
| 月次レポート | スプシに手作業で転記 | SQL 1 本で完了 |
| 前年比較 | 古いデータが消えてて不可能 | 全期間のデータが残ってる |
| 部門横断分析 | 各部署に「データください」 | 1 つの DB に集約済み |
| リアルタイム集計 | 業務 DB に負荷をかける | DWH で安全に実行 |
| データ共有 | CSV でメール添付 | アクセス権付与するだけ |
要するに DWH は「データを 1 箇所に集めて、安全に・高速に分析するための専用倉庫」。
DWH・データレイク・データマートの違い
「DWH 以外にも似た言葉があってよく分からない」という声があるので整理する。
| データレイク | DWH | データマート | |
|---|---|---|---|
| 例え | 倉庫 | 整理された棚 | 部署ごとの引き出し |
| データ | 生データ(なんでも入れる) | 整理・統合されたデータ | 部署用に絞ったデータ |
| 用途 | 保管 | 全社分析 | 部門分析 |
| ユーザー | エンジニア | アナリスト・経営層 | 各部署の担当者 |
Snowflake はこの 3 つ全部の機能を 1 つで提供する。だから「データプラットフォーム」と呼ばれる。
なぜ今 DWH が必要になっているか
10 年前は「月次レポートは経理が Excel で作る」で回っていた。今は違う。
- データ量が爆発的に増えた(ログ、IoT、行動データ)
- リアルタイムに近い判断が求められる
- 部門をまたいだデータ統合が必要
- AI / ML にデータを食わせる基盤が必要
Excel やスプレッドシートでは限界がある。「先月の集計に 3 日かかる」「データが壊れてやり直し」「誰かが数式を壊した」——この手の問題は DWH で根本解決できる。
2. なぜ DWH の中で Snowflake が選ばれるのか
DWH は Snowflake だけじゃない。BigQuery(Google)、Redshift(AWS)、Databricks もある。その中でなぜ Snowflake が選ばれているのか。
競合との比較
| Snowflake | BigQuery | Redshift | Databricks | |
|---|---|---|---|---|
| クラウド | AWS/GCP/Azure 全対応 | GCP のみ | AWS のみ | マルチクラウド |
| 料金モデル | 使った分だけ(秒単位) | クエリ量課金 | 常時稼働(RI あり) | 使った分だけ |
| スケール | ボタン 1 つで即時 | 自動 | 手動リサイズ | 自動 |
| セットアップ | ほぼゼロ | 簡単 | やや複雑 | やや複雑 |
| 半構造化データ | VARIANT 型で直接 | STRUCT/ARRAY | JSON 関数 | Delta Lake |
| データ共有 | ゼロコピー共有 | Analytics Hub | なし | Delta Sharing |
| タイムトラベル | 最大 90 日 | 7 日 | なし(スナップショット) | タイムトラベルあり |
| AI/ML 統合 | Cortex AI | Vertex AI | SageMaker | MLflow |
Snowflake のコアアーキテクチャ:マルチクラスターシェアードデータ
Snowflake の設計思想を一言で言うと「1 つのデータを、複数の独立した計算エンジンが同時に使える」。
営業チームが重いクエリを回しても、経理チームのクエリに影響しない。データのコピーも発生しない。これが従来の DB にない最大の強み。
Snowflake が選ばれる 5 つの理由
1. マルチクラウド
AWS にいても GCP にいても Azure にいても使える。ベンダーロックインを避けたい企業に刺さる。BigQuery は GCP、Redshift は AWS に縛られる。
2. ストレージとコンピュートの完全分離
「データは保存しておくだけなのに課金される」問題がない。ウェアハウスを止めればコンピュート費用はゼロ。Redshift は常時稼働が基本。
3. ゼロコピー共有
データをコピーせずに他のチームや取引先と共有できる。CSV エクスポート→メール添付→インポートの地獄から解放される。これは Snowflake の最大の差別化ポイント。
4. 圧倒的にシンプル
インフラの知識がほぼ不要。ブラウザでログインして SQL を打つだけ。DBA(データベース管理者)がいなくても運用できる。
5. エコシステムの充実
dbt、Fivetran、Airbyte、Looker、Tableau、Streamlit——主要な ETL / BI ツールが全部対応している。
どういう会社が Snowflake を選ぶか
| 条件 | 向いている DWH |
|---|---|
| GCP で統一したい | BigQuery |
| AWS で統一したい+常時稼働 OK | Redshift |
| ML/AI がメイン用途 | Databricks |
| マルチクラウド・シンプル・データ共有重視 | Snowflake |
| 小規模でとりあえず始めたい | BigQuery(無料枠あり) |
3. Snowflake の何が凄いのか
普通のデータベース(MySQL、PostgreSQL)と何が違うのか。素人目線で整理する。
一言で言うと
「サーバーを持たずに、巨大なデータを高速に分析できるクラウド DB」
従来の DB との違い
| 従来の DB(MySQL 等) | Snowflake | |
|---|---|---|
| サーバー管理 | 自分で構築・運用 | 不要。ブラウザで SQL 打つだけ |
| スケール | サーバーのスペックに依存 | ボタン 1 つで拡張・縮小 |
| 料金 | サーバー常時稼働=常に課金 | 使った分だけ。止めれば 0 円 |
| データ量 | TB 超えると厳しい | PB(ペタバイト)でも平気 |
| 同時アクセス | ユーザー増えると遅くなる | ウェアハウスを分ければ影響なし |
| 半構造化データ | JSON 扱いが面倒 | VARIANT 型で直接格納・クエリ |
| 過去データ復元 | バックアップから手動復元 | タイムトラベルで 1 行で復元 |
なぜ企業が Snowflake を選ぶのか
1. ストレージとコンピュートが分離している
従来の DB はデータを保存するサーバーとクエリを実行するサーバーが同じ。だから「データは保存しておくだけなのに、サーバーが動いてて課金される」問題が発生する。
Snowflake はデータの保存(ストレージ)とクエリの実行(ウェアハウス)が完全に別。ウェアハウスを止めてもデータは消えない。使わない時間は 0 円。
2. 秒単位でスケールできる
「月末の集計処理が重いから、月末だけウェアハウスを Large にして、終わったら X-Small に戻す」ができる。ダウンタイムなし。SQL 1 行で変更。
3. S3 / GCS / Azure Blob から直接読める
データを Snowflake にコピーしなくても、S3 に置いたまま分析できる(外部テーブル)。データの二重管理が不要。
4. 「やらかした」ときに戻せる(タイムトラベル)
DELETE FROM t_sales_log(WHERE なし)を実行してしまった。従来の DB なら「バックアップあります?」から始まる地獄。Snowflake なら UNDROP TABLE t_sales_log または SELECT * FROM t_sales_log AT(OFFSET => -300) で 5 分前のデータが見える。
5. データ共有が一瞬
別のチームや取引先とデータを共有するとき、CSV でエクスポート→メールで送付→インポート、という作業が不要。Snowflake 上でアクセス権を付与するだけ。データのコピーは発生しない。
ユーザーの業務での位置づけ
S3(利用ログデータ)
↓ COPY INTO / Snowpipe
Snowflake(分析用 DB)
↓ SQL で集計・分析
BI ツール(ダッシュボード表示)
S3 上の利用ログデータを Snowflake に取り込んで、マスタと突合して集計・分析する。今まで手作業(スプレッドシート + 関数 + 目視確認)でやっていた定常集計を、Snowflake + SQL で自動化する。
2. Snowflake のアーキテクチャ(3 層構造)
3 つのポイント:
- 各層が独立してスケール可能
- ウェアハウスを止めてもデータは消えない(ストレージ分離)
- 複数ウェアハウスが同じデータに同時アクセス可能
ポイント:
- ストレージとコンピュートが分離 → ウェアハウスを止めてもデータは消えない
- 複数ウェアハウスが同じデータに同時アクセス可能
- サーバー管理不要。ブラウザから SQL を打つだけ
オブジェクト階層
アカウント
├── データベース
│ ├── スキーマ
│ │ ├── テーブル
│ │ ├── ビュー
│ │ ├── ステージ(外部ストレージの参照)
│ │ ├── ストリーム(変更追跡)
│ │ ├── タスク(定期実行)
│ │ ├── パイプ(Snowpipe)
│ │ └── 関数 / プロシージャ
│ └── ...
├── ウェアハウス(計算リソース)
├── ロール(権限管理)
└── ユーザー
2. ログインと画面の使い方
ログイン
- https://app.snowflake.com にアクセス
- アカウント名・ユーザー名・パスワードを入力
Snowsight(Web UI)の主要画面
| メニュー | 用途 |
|---|---|
| Worksheets | SQL を書いて実行する場所 |
| Dashboards | チャート・グラフを配置 |
| Data | データベース・テーブルの構造確認 |
| Marketplace | 外部データセットの入手 |
| Activity | クエリ履歴・コスト確認 |
| Admin | ウェアハウス・ユーザー・ロール管理 |
ワークシートの使い方
- 「Worksheets」→「+」→「SQL Worksheet」
- 右上でロールとウェアハウスを選択
- 左上でデータベースとスキーマを選択
- SQL を入力 →
Ctrl+Enterで実行
-- 最初にやること
USE ROLE SYSADMIN;
USE WAREHOUSE my_wh;
USE DATABASE my_database;
USE SCHEMA public;
3. SQL 基本操作
データを見る
-- テーブル一覧
SHOW TABLES;
-- カラム構造を確認
DESCRIBE TABLE t_sales_log;
-- 中身を確認(先頭 100 行)
SELECT * FROM t_sales_log LIMIT 100;
-- 行数を確認
SELECT COUNT(*) FROM t_sales_log;
絞り込み(WHERE)
-- 完全一致
WHERE account_id = 'A001'
-- 部分一致
WHERE feature LIKE '%packet%'
-- 範囲指定
WHERE usage_date BETWEEN '2026-03-01' AND '2026-03-31'
-- 複数条件
WHERE account_id = 'A001' AND feature = 'web_access'
-- IN句
WHERE feature IN ('web_access', 'pv', 'mail')
-- NULL チェック
WHERE note IS NULL
WHERE note IS NOT NULL
集計(GROUP BY)
SELECT
feature,
COUNT(*) AS cnt,
SUM(usage_amount) AS total,
AVG(usage_amount) AS avg_val,
MIN(usage_amount) AS min_val,
MAX(usage_amount) AS max_val
FROM t_sales_log
GROUP BY feature
ORDER BY total DESC;
テーブル作成
CREATE TABLE t_sales_log (
account_id STRING,
usage_date DATE,
feature STRING,
usage_amount NUMBER(18,2),
created_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
-- 既存テーブルから作成(構造+データコピー)
CREATE TABLE t_sales_log_BACKUP AS SELECT * FROM t_sales_log;
-- 構造だけコピー(データなし)
CREATE TABLE t_sales_log_EMPTY LIKE t_sales_log;
データ操作
-- 挿入
INSERT INTO t_sales_log (account_id, usage_date, feature, usage_amount)
VALUES ('A001', '2026-04-01', 'web_access', 1500);
-- 更新
UPDATE t_sales_log SET usage_amount = 2000
WHERE account_id = 'A001' AND usage_date = '2026-04-01';
-- 削除
DELETE FROM t_sales_log WHERE usage_date < '2025-01-01';
-- テーブル削除
DROP TABLE t_sales_log_BACKUP;
-- テーブル中身だけ削除(構造は残す)
TRUNCATE TABLE t_sales_log_BACKUP;
JOIN
-- INNER JOIN(両方に存在するもの)
SELECT u.*, p.unit_price
FROM t_sales_log u
INNER JOIN M_PRICING p ON u.feature = p.feature;
-- LEFT JOIN(左テーブルは全行残す)
SELECT u.*, p.unit_price
FROM t_sales_log u
LEFT JOIN M_PRICING p ON u.feature = p.feature;
サブクエリ・CTE
-- CTE(WITH句):複雑なクエリを読みやすくする
WITH monthly_usage AS (
SELECT
account_id,
DATE_TRUNC('month', usage_date) AS month,
SUM(usage_amount) AS total
FROM t_sales_log
GROUP BY account_id, month
)
SELECT * FROM monthly_usage
WHERE total > 10000
ORDER BY month;
4. データ型一覧
| カテゴリ | 型 | 説明 | 例 |
|---|---|---|---|
| 数値 | NUMBER(p,s) | 精度・スケール指定 | NUMBER(18,2) |
| 数値 | INTEGER | 整数 | 38桁まで |
| 数値 | FLOAT | 浮動小数点 | 倍精度 |
| 文字列 | STRING / VARCHAR | 可変長文字列 | 最大16MB |
| 文字列 | CHAR(n) | 固定長 | CHAR(10) |
| 日付時刻 | DATE | 日付のみ | 2026-04-06 |
| 日付時刻 | TIMESTAMP_NTZ | タイムゾーンなし | 2026-04-06 15:30:00 |
| 日付時刻 | TIMESTAMP_LTZ | ローカルTZ | セッションTZに依存 |
| 日付時刻 | TIMESTAMP_TZ | TZ付き | 2026-04-06 15:30:00 +09:00 |
| 日付時刻 | TIME | 時刻のみ | 15:30:00 |
| 論理 | BOOLEAN | 真偽値 | TRUE / FALSE |
| 半構造化 | VARIANT | JSON等を格納 | {'key': 'value'} |
| 半構造化 | OBJECT | キーバリュー | JSON オブジェクト |
| 半構造化 | ARRAY | 配列 | [1, 2, 3] |
| バイナリ | BINARY | バイナリデータ | - |
5. データの取り込み(S3 → Snowflake)
全体フロー
ステップ 1:ファイルフォーマット定義
-- CSV
CREATE OR REPLACE FILE FORMAT csv_format
TYPE = 'CSV'
FIELD_DELIMITER = ','
SKIP_HEADER = 1
NULL_IF = ('', 'NULL')
FIELD_OPTIONALLY_ENCLOSED_BY = '"';
-- JSON
CREATE OR REPLACE FILE FORMAT json_format
TYPE = 'JSON'
STRIP_OUTER_ARRAY = TRUE;
-- Parquet
CREATE OR REPLACE FILE FORMAT parquet_format
TYPE = 'PARQUET';
ステップ 2:ステージ作成
-- 外部ステージ(S3)
CREATE OR REPLACE STAGE my_s3_stage
URL = 's3://your-bucket/usage-data/'
CREDENTIALS = (AWS_KEY_ID = '***' AWS_SECRET_KEY = '***')
FILE_FORMAT = csv_format;
-- 内部ステージ(Snowflake 内部ストレージ)
CREATE OR REPLACE STAGE my_internal_stage
FILE_FORMAT = csv_format;
-- ステージの中身を確認
LIST @my_s3_stage;
ステップ 3:データ読み込み
-- S3 → テーブル
COPY INTO t_sales_log
FROM @my_s3_stage
FILE_FORMAT = csv_format
ON_ERROR = 'CONTINUE'
PURGE = FALSE;
-- 特定ファイルだけ読み込み
COPY INTO t_sales_log
FROM @my_s3_stage/2026/04/
PATTERN = '.*usage.*[.]csv'
FILE_FORMAT = csv_format;
-- 読み込み結果確認
SELECT COUNT(*) FROM t_sales_log;
エラー確認
-- エラーだけ確認(実際には読み込まない)
COPY INTO t_sales_log
FROM @my_s3_stage
FILE_FORMAT = csv_format
VALIDATION_MODE = 'RETURN_ERRORS';
-- 直近の COPY 履歴
SELECT * FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY(
TABLE_NAME => 't_sales_log',
START_TIME => DATEADD('hour', -24, CURRENT_TIMESTAMP())
));
6. Snowpipe(リアルタイム取り込み)
S3 にファイルが置かれたら自動で取り込む仕組み。
-- Snowpipe 作成
CREATE OR REPLACE PIPE my_pipe
AUTO_INGEST = TRUE
AS
COPY INTO t_sales_log
FROM @my_s3_stage
FILE_FORMAT = csv_format;
-- パイプの状態確認
SHOW PIPES;
-- パイプの SQS 通知先を確認(S3 イベント通知に設定する)
SELECT SYSTEM$PIPE_STATUS('my_pipe');
S3 のイベント通知を Snowpipe の SQS キューに接続すると、ファイルが置かれるたびに自動で読み込まれる。
7. ウェアハウス管理とコスト最適化
ウェアハウスサイズと料金
| サイズ | クレジット/時間 | 用途 |
|---|---|---|
| X-Small | 1 | 軽いクエリ、開発・テスト |
| Small | 2 | 通常の分析 |
| Medium | 4 | 中規模データ処理 |
| Large | 8 | 大規模集計 |
| X-Large | 16 | 重い処理 |
ウェアハウス操作
-- 作成
CREATE WAREHOUSE IF NOT EXISTS dev_wh
WAREHOUSE_SIZE = 'X-SMALL'
AUTO_SUSPEND = 60
AUTO_RESUME = TRUE
INITIALLY_SUSPENDED = TRUE;
-- サイズ変更(即座に反映、ダウンタイムなし)
ALTER WAREHOUSE dev_wh SET WAREHOUSE_SIZE = 'MEDIUM';
-- 手動停止
ALTER WAREHOUSE dev_wh SUSPEND;
-- 手動起動
ALTER WAREHOUSE dev_wh RESUME;
コスト確認
-- 過去 7 日のウェアハウス消費
SELECT
WAREHOUSE_NAME,
SUM(CREDITS_USED) AS total_credits
FROM TABLE(INFORMATION_SCHEMA.WAREHOUSE_METERING_HISTORY(
DATE_RANGE_START => DATEADD('day', -7, CURRENT_DATE())
))
GROUP BY WAREHOUSE_NAME
ORDER BY total_credits DESC;
コスト削減チェックリスト
- AUTO_SUSPEND = 60(デフォルト 600 は長すぎ)
- AUTO_RESUME = TRUE
- 開発用は X-Small
- 使わないウェアハウスは SUSPEND
- 定期実行はタスク化してウェアハウスを自動起動/停止
8. 関数リファレンス
文字列関数
UPPER('hello') -- 'HELLO'
LOWER('HELLO') -- 'hello'
TRIM(' hello ') -- 'hello'
LENGTH('hello') -- 5
SUBSTRING('hello', 1, 3) -- 'hel'
CONCAT('a', 'b', 'c') -- 'abc'
REPLACE('hello', 'l', 'r') -- 'herro'
SPLIT_PART('a-b-c', '-', 2) -- 'b'
REGEXP_LIKE('abc123', '\\d+') -- TRUE
日付関数
CURRENT_DATE() -- 2026-04-06
CURRENT_TIMESTAMP() -- 2026-04-06 15:30:00
DATE_TRUNC('month', usage_date) -- 月初に切り捨て
DATEADD('day', -7, CURRENT_DATE()) -- 7日前
DATEDIFF('day', start_date, end_date) -- 日数差
TO_DATE('2026-04-06', 'YYYY-MM-DD') -- 文字列→DATE
TO_CHAR(usage_date, 'YYYY/MM/DD') -- DATE→文字列
EXTRACT('month' FROM usage_date) -- 月だけ取得
LAST_DAY(usage_date) -- 月末日
変換関数
TO_NUMBER('123.45', '999.99') -- 文字列→数値
TO_VARCHAR(123) -- 数値→文字列
TO_TIMESTAMP('2026-04-06 15:30:00')
TRY_TO_NUMBER('abc') -- 変換失敗時に NULL(エラーにならない)
CAST(col AS INTEGER) -- 型変換
col::INTEGER -- 型変換(省略記法)
条件関数
CASE WHEN amount > 1000 THEN '高' WHEN amount > 500 THEN '中' ELSE '低' END
COALESCE(col1, col2, 'デフォルト') -- 最初の非NULL値
NULLIF(col1, col2) -- col1=col2ならNULL
IFF(condition, true_val, false_val) -- Snowflake独自のIF
DECODE(col, 'a', 1, 'b', 2, 0) -- 値マッピング
集計関数
COUNT(*) -- 行数
COUNT(DISTINCT col) -- ユニーク数
SUM(amount) -- 合計
AVG(amount) -- 平均
MEDIAN(amount) -- 中央値
MIN(amount) -- 最小
MAX(amount) -- 最大
STDDEV(amount) -- 標準偏差
LISTAGG(col, ',') -- 値を連結
ARRAY_AGG(col) -- 配列にまとめる
9. ウィンドウ関数
-- 前月比較(LAG)
SELECT
account_id, month, total,
LAG(total) OVER (PARTITION BY account_id ORDER BY month) AS prev,
total - LAG(total) OVER (PARTITION BY account_id ORDER BY month) AS diff
FROM monthly_usage;
-- 累計(SUM OVER)
SUM(amount) OVER (PARTITION BY account_id ORDER BY usage_date) AS cumulative
-- ランキング
ROW_NUMBER() OVER (PARTITION BY feature ORDER BY amount DESC) AS rank
RANK() OVER (ORDER BY amount DESC) AS rank_with_ties
DENSE_RANK() OVER (ORDER BY amount DESC) AS dense_rank
-- 移動平均(直近7日)
AVG(amount) OVER (
PARTITION BY account_id
ORDER BY usage_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) AS moving_avg_7d
-- 全体に対する割合
amount / SUM(amount) OVER () * 100 AS pct_of_total
-- グループ内の割合
amount / SUM(amount) OVER (PARTITION BY feature) * 100 AS pct_in_feature
10. ビューとマテリアライズドビュー
-- ビュー(毎回クエリを実行)
CREATE OR REPLACE VIEW v_monthly_usage AS
SELECT
account_id,
DATE_TRUNC('month', usage_date) AS month,
feature,
SUM(usage_amount) AS total
FROM t_sales_log
GROUP BY account_id, month, feature;
-- マテリアライズドビュー(結果をキャッシュ、自動更新)
CREATE OR REPLACE MATERIALIZED VIEW mv_daily_summary AS
SELECT
usage_date,
feature,
COUNT(*) AS cnt,
SUM(usage_amount) AS total
FROM t_sales_log
GROUP BY usage_date, feature;
11. タイムトラベル(過去データの復元)
誤って削除・更新したデータを復元できる。最大 90 日間。
-- 5 分前のデータを参照
SELECT * FROM t_sales_log AT(OFFSET => -60*5);
-- 特定時刻のデータを参照
SELECT * FROM t_sales_log AT(TIMESTAMP => '2026-04-06 10:00:00'::TIMESTAMP);
-- 誤って DELETE した場合の復元
CREATE TABLE t_sales_log_RESTORED AS
SELECT * FROM t_sales_log AT(TIMESTAMP => '2026-04-06 10:00:00'::TIMESTAMP);
-- DROP したテーブルの復元
UNDROP TABLE t_sales_log;
-- 保持期間の設定
ALTER TABLE t_sales_log SET DATA_RETENTION_TIME_IN_DAYS = 7;
12. ストリーム(変更データキャプチャ)
テーブルの変更(INSERT/UPDATE/DELETE)を自動追跡する。
-- ストリーム作成
CREATE OR REPLACE STREAM usage_stream ON TABLE t_sales_log;
-- 変更を確認
SELECT * FROM usage_stream;
-- METADATA$ACTION: INSERT / DELETE
-- METADATA$ISUPDATE: TRUE / FALSE
-- METADATA$ROW_ID: 行ID
-- ストリームにデータがあるか確認
SELECT SYSTEM$STREAM_HAS_DATA('usage_stream');
13. タスク(自動スケジュール実行)
-- 毎日朝 9 時に集計を実行するタスク
CREATE OR REPLACE TASK daily_aggregation
WAREHOUSE = dev_wh
SCHEDULE = 'USING CRON 0 9 * * * Asia/Tokyo'
AS
INSERT INTO T_DAILY_SUMMARY
SELECT
CURRENT_DATE() - 1 AS target_date,
feature,
COUNT(*) AS cnt,
SUM(usage_amount) AS total
FROM t_sales_log
WHERE usage_date = CURRENT_DATE() - 1
GROUP BY feature;
-- タスクを有効化(デフォルトは停止状態)
ALTER TASK daily_aggregation RESUME;
-- タスク一覧
SHOW TASKS;
-- 手動実行
EXECUTE TASK daily_aggregation;
-- タスクの実行履歴
SELECT *
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY())
ORDER BY SCHEDULED_TIME DESC
LIMIT 10;
14. ストリーム × タスクでパイプライン構築
S3 → Snowpipe(自動取込)→ テーブル → ストリーム(変更検知)→ タスク(定期実行)→ 集計テーブル
-- 1. ストリームで変更を監視
CREATE OR REPLACE STREAM usage_stream ON TABLE t_sales_log;
-- 2. タスクで新しいデータだけ処理
CREATE OR REPLACE TASK process_new_usage
WAREHOUSE = dev_wh
SCHEDULE = '5 MINUTE'
WHEN SYSTEM$STREAM_HAS_DATA('usage_stream')
AS
INSERT INTO t_sales_log_PROCESSED
SELECT
account_id, usage_date, feature, usage_amount,
CURRENT_TIMESTAMP() AS processed_at
FROM usage_stream
WHERE METADATA$ACTION = 'INSERT';
ALTER TASK process_new_usage RESUME;
15. ロールとアクセス制御
-- 自分の情報
SELECT CURRENT_USER(), CURRENT_ROLE(), CURRENT_WAREHOUSE(), CURRENT_DATABASE();
-- ロール一覧
SHOW ROLES;
-- 権限付与
GRANT USAGE ON DATABASE my_database TO ROLE analyst_role;
GRANT USAGE ON SCHEMA my_database.public TO ROLE analyst_role;
GRANT SELECT ON ALL TABLES IN SCHEMA my_database.public TO ROLE analyst_role;
-- 今後作るテーブルにも自動で権限付与
GRANT SELECT ON FUTURE TABLES IN SCHEMA my_database.public TO ROLE analyst_role;
-- 権限確認
SHOW GRANTS TO ROLE analyst_role;
SHOW GRANTS ON TABLE t_sales_log;
16. ゼロコピークローニング
データをコピーせずに瞬時に複製。ストレージ追加なし。
-- テーブルのクローン
CREATE TABLE t_sales_log_DEV CLONE t_sales_log;
-- データベースごとクローン
CREATE DATABASE my_database_dev CLONE my_database;
-- 特定時点のクローン(タイムトラベル)
CREATE TABLE t_sales_log_SNAPSHOT CLONE t_sales_log
AT(TIMESTAMP => '2026-04-06 10:00:00'::TIMESTAMP);
17. データ共有
-- 共有を作成
CREATE SHARE usage_share;
GRANT USAGE ON DATABASE my_database TO SHARE usage_share;
GRANT SELECT ON TABLE t_sales_log TO SHARE usage_share;
-- 共有先アカウントに付与
ALTER SHARE usage_share ADD ACCOUNTS = partner_account;
18. Snowflake × Python 連携
# pip install snowflake-connector-python
import snowflake.connector
conn = snowflake.connector.connect(
user='USER',
password='PASS',
account='ACCOUNT',
warehouse='DEV_WH',
database='my_database',
schema='public'
)
cur = conn.cursor()
cur.execute("SELECT * FROM t_sales_log LIMIT 10")
rows = cur.fetchall()
# pandas 連携
import pandas as pd
cur.execute("SELECT * FROM t_sales_log")
df = pd.DataFrame(cur.fetchall(), columns=[col[0] for col in cur.description])
19. パフォーマンスチューニング
| 症状 | 原因 | 対策 |
|---|---|---|
| クエリが遅い | ウェアハウス小さい | サイズ UP |
| クエリが遅い | フルスキャン | WHERE で絞り込む |
| クエリが遅い | JOIN が重い | クラスタリングキー設定 |
| 同時実行で遅い | リソース競合 | ウェアハウスを分ける |
| コストが高い | ウェアハウス停止してない | AUTO_SUSPEND=60 |
-- クエリプロファイルで原因特定
-- Snowsight で実行後、「Query Profile」タブを確認
-- クラスタリングキー設定(大規模テーブル向け)
ALTER TABLE t_sales_log CLUSTER BY (account_id, usage_date);
-- クラスタリング状態確認
SELECT SYSTEM$CLUSTERING_INFORMATION('t_sales_log');
-- キャッシュ活用
-- 同じクエリは自動でキャッシュされる(24時間保持)
-- ウェアハウスローカルキャッシュも自動
20. トラブルシューティング
| 問題 | 対策 |
|---|---|
| データが見えない |
SHOW GRANTS TO ROLE ロール名 で権限確認 |
| COPY INTO 失敗 |
VALIDATION_MODE = 'RETURN_ERRORS' でエラー確認 |
| タイムゾーンずれ | ALTER SESSION SET TIMEZONE = 'Asia/Tokyo' |
| 文字化け |
FILE_FORMAT に ENCODING = 'UTF-8' 追加 |
| テーブル消した | UNDROP TABLE テーブル名 |
| データ消した | タイムトラベルで復元 |
| コスト爆発 |
WAREHOUSE_METERING_HISTORY で確認 |
-- クエリ履歴(過去 24 時間)
SELECT QUERY_TEXT, EXECUTION_STATUS, ERROR_MESSAGE, TOTAL_ELAPSED_TIME
FROM TABLE(INFORMATION_SCHEMA.QUERY_HISTORY())
WHERE START_TIME >= DATEADD('hour', -24, CURRENT_TIMESTAMP())
ORDER BY START_TIME DESC;
-- セッション設定確認
SHOW PARAMETERS IN SESSION;
-- タイムゾーン設定
ALTER SESSION SET TIMEZONE = 'Asia/Tokyo';
21. 業務用クエリテンプレート
月次集計
SELECT
u.account_id,
DATE_TRUNC('month', u.usage_date) AS billing_month,
u.feature,
SUM(u.usage_amount) AS total_usage,
SUM(u.usage_amount) * p.unit_price AS billing_amount
FROM t_sales_log u
JOIN M_PRICING p ON u.feature = p.feature
GROUP BY u.account_id, billing_month, u.feature, p.unit_price
ORDER BY u.account_id, billing_month;
前月比較
WITH monthly AS (
SELECT
account_id,
DATE_TRUNC('month', usage_date) AS month,
SUM(usage_amount) AS total
FROM t_sales_log
GROUP BY account_id, month
)
SELECT
account_id, month, total,
LAG(total) OVER (PARTITION BY account_id ORDER BY month) AS prev_month,
ROUND((total - LAG(total) OVER (PARTITION BY account_id ORDER BY month))
/ NULLIF(LAG(total) OVER (PARTITION BY account_id ORDER BY month), 0) * 100, 1)
AS growth_pct
FROM monthly
ORDER BY account_id, month;
異常値検知(平均の 3σ 超え)
WITH stats AS (
SELECT feature,
AVG(usage_amount) AS avg_val,
STDDEV(usage_amount) AS std_val
FROM t_sales_log
GROUP BY feature
)
SELECT u.*, s.avg_val, s.std_val
FROM t_sales_log u
JOIN stats s ON u.feature = s.feature
WHERE u.usage_amount > s.avg_val + (3 * s.std_val);
日次トレンド(直近 30 日)
SELECT
usage_date,
feature,
COUNT(*) AS cnt,
SUM(usage_amount) AS total,
AVG(usage_amount) OVER (
PARTITION BY feature
ORDER BY usage_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) AS moving_avg_7d
FROM t_sales_log
WHERE usage_date >= DATEADD('day', -30, CURRENT_DATE())
GROUP BY usage_date, feature
ORDER BY usage_date;
アカウント別ランキング
SELECT
account_id,
SUM(usage_amount) AS total,
RANK() OVER (ORDER BY SUM(usage_amount) DESC) AS rank
FROM t_sales_log
WHERE usage_date >= DATE_TRUNC('month', CURRENT_DATE())
GROUP BY account_id
ORDER BY rank;
22. ショートカット・Tips
Snowsight ショートカット
| 操作 | キー |
|---|---|
| クエリ実行 | Ctrl + Enter |
| 全クエリ実行 | Ctrl + Shift + Enter |
| コメントアウト | Ctrl + / |
| フォーマット | Ctrl + Shift + F |
Tips
-- クエリ結果をテーブルに保存
CREATE TABLE result_table AS SELECT ...;
-- 一時テーブル(セッション終了で消える)
CREATE TEMPORARY TABLE tmp AS SELECT ...;
-- LIMIT なしで結果行数だけ確認
SELECT COUNT(*) FROM (SELECT ...);
-- カラム名に日本語を使う場合はダブルクォート
SELECT "都道府県名" FROM population;
-- セミコロンで複数クエリを区切り
SELECT 1; SELECT 2; SELECT 3;
23. 動的テーブル(Dynamic Tables)
ストリーム+タスクを使わずに、宣言的にデータパイプラインを構築する仕組み。SQL で「どう変換するか」を書くだけで、Snowflake が自動でリフレッシュしてくれる。
-- 動的テーブル作成
CREATE OR REPLACE DYNAMIC TABLE dt_monthly_usage
TARGET_LAG = '5 minutes' -- 5分以内にデータを反映
WAREHOUSE = dev_wh
AS
SELECT
account_id,
DATE_TRUNC('month', usage_date) AS month,
feature,
SUM(usage_amount) AS total
FROM t_sales_log
GROUP BY account_id, month, feature;
-- 動的テーブル一覧
SHOW DYNAMIC TABLES;
-- リフレッシュ履歴確認
SELECT * FROM TABLE(INFORMATION_SCHEMA.DYNAMIC_TABLE_REFRESH_HISTORY())
ORDER BY REFRESH_END_TIME DESC LIMIT 10;
-- 手動リフレッシュ
ALTER DYNAMIC TABLE dt_monthly_usage REFRESH;
-- 一時停止
ALTER DYNAMIC TABLE dt_monthly_usage SUSPEND;
ストリーム+タスク vs 動的テーブル:
| ストリーム+タスク | 動的テーブル | |
|---|---|---|
| 複雑さ | ストリーム作成→タスク作成→有効化 | SQL 1 つ |
| 制御 | 細かく制御可能 | Snowflake に任せる |
| 用途 | 複雑な処理ロジック | シンプルな変換パイプライン |
| 推奨 | 条件分岐やエラー処理が必要な場合 | 集計・変換だけなら動的テーブル |
24. Cortex AI(SQL で AI を使う)
Snowflake 内で SQL だけで AI / ML 機能が使える。データを外部に出す必要がない。
LLM 関数(テキスト処理)
-- テキスト要約
SELECT SNOWFLAKE.CORTEX.SUMMARIZE(description) AS summary
FROM support_tickets;
-- 感情分析
SELECT
comment_text,
SNOWFLAKE.CORTEX.SENTIMENT(comment_text) AS sentiment_score
FROM customer_feedback;
-- 結果: -1(ネガティブ)〜 1(ポジティブ)
-- テキスト翻訳
SELECT SNOWFLAKE.CORTEX.TRANSLATE(
description, 'ja', 'en'
) AS english_text
FROM support_tickets;
-- 自由形式の AI 質問
SELECT SNOWFLAKE.CORTEX.COMPLETE(
'claude-3-5-sonnet',
'このエラーメッセージの原因を説明してください: ' || error_message
) AS ai_response
FROM error_logs;
ML 関数(予測・異常検知)
-- 異常検知モデルの作成
CREATE OR REPLACE SNOWFLAKE.ML.ANOMALY_DETECTION usage_anomaly_model(
INPUT_DATA => TABLE(
SELECT usage_date AS timestamp, usage_amount AS value
FROM t_sales_log
WHERE feature = 'web_access'
),
TIMESTAMP_COLNAME => 'timestamp',
TARGET_COLNAME => 'value'
);
-- 異常検知の実行
CALL usage_anomaly_model!DETECT_ANOMALIES(
INPUT_DATA => TABLE(
SELECT usage_date AS timestamp, usage_amount AS value
FROM t_sales_log
WHERE feature = 'web_access' AND usage_date >= '2026-04-01'
),
TIMESTAMP_COLNAME => 'timestamp',
TARGET_COLNAME => 'value'
);
-- 時系列予測
CREATE OR REPLACE SNOWFLAKE.ML.FORECAST usage_forecast_model(
INPUT_DATA => TABLE(
SELECT usage_date AS timestamp, usage_amount AS value
FROM t_sales_log WHERE feature = 'web_access'
),
TIMESTAMP_COLNAME => 'timestamp',
TARGET_COLNAME => 'value'
);
-- 30日先の予測
CALL usage_forecast_model!FORECAST(FORECASTING_PERIODS => 30);
Cortex Code(自然言語 → SQL)
Snowsight の左メニュー「Cortex Code」から、自然言語で質問するだけで SQL を自動生成。
質問: 「先月の利用量トップ10のアカウントを教えて」
→ SQLが自動生成 → 実行 → 結果表示
25. 半構造化データ(JSON / Parquet)
Snowflake は JSON をそのまま格納・クエリできる。
-- VARIANT 型のカラムを持つテーブル
CREATE TABLE events (
event_id INTEGER,
event_data VARIANT,
created_at TIMESTAMP
);
-- JSON データ挿入
INSERT INTO events (event_id, event_data)
SELECT 1, PARSE_JSON('{
"user_id": "u123",
"action": "login",
"device": {"type": "mobile", "os": "iOS"}
}');
-- JSON の値を取り出す
SELECT
event_data:user_id::STRING AS user_id,
event_data:action::STRING AS action,
event_data:device.type::STRING AS device_type,
event_data:device.os::STRING AS device_os
FROM events;
-- JSON 配列を展開(FLATTEN)
SELECT
f.value:name::STRING AS item_name,
f.value:price::NUMBER AS item_price
FROM orders,
LATERAL FLATTEN(input => order_data:items) f;
26. 外部テーブル(S3 のデータを直接クエリ)
データを Snowflake にコピーせずに、S3 に置いたままクエリできる。
-- 外部テーブル作成
CREATE OR REPLACE EXTERNAL TABLE ext_usage
WITH LOCATION = @my_s3_stage
FILE_FORMAT = parquet_format
AUTO_REFRESH = TRUE;
-- クエリ(通常のテーブルと同じ SQL)
SELECT * FROM ext_usage WHERE account_id = 'A001';
27. UDF(ユーザー定義関数)
-- SQL UDF
CREATE OR REPLACE FUNCTION calc_billing(amount NUMBER, rate NUMBER)
RETURNS NUMBER
AS
$$
amount * rate
$$;
-- 使い方
SELECT calc_billing(usage_amount, 0.05) AS billing FROM t_sales_log;
-- Python UDF
CREATE OR REPLACE FUNCTION categorize_usage(amount NUMBER)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.11'
HANDLER = 'categorize'
AS
$$
def categorize(amount):
if amount > 10000: return '大口'
elif amount > 1000: return '中口'
else: return '小口'
$$;
-- 使い方
SELECT feature, usage_amount, categorize_usage(usage_amount) AS category
FROM t_sales_log;
28. データマスキング(機密データの保護)
ロールに応じて機密データを自動的にマスクする。
-- マスキングポリシー作成
CREATE OR REPLACE MASKING POLICY mask_email AS (val STRING)
RETURNS STRING ->
CASE
WHEN CURRENT_ROLE() IN ('ADMIN', 'SYSADMIN') THEN val
ELSE '***@***.com'
END;
-- テーブルに適用
ALTER TABLE customers MODIFY COLUMN email
SET MASKING POLICY mask_email;
-- ADMIN ロール → 実際のメールが見える
-- それ以外 → '***@***.com' と表示される
29. 行アクセスポリシー(行レベルセキュリティ)
ロールに応じて見える行を制限する。
-- 行アクセスポリシー作成
CREATE OR REPLACE ROW ACCESS POLICY account_access AS (account_id STRING)
RETURNS BOOLEAN ->
CASE
WHEN CURRENT_ROLE() = 'ADMIN' THEN TRUE -- 全行見える
WHEN account_id = CURRENT_USER() THEN TRUE -- 自分のデータだけ
ELSE FALSE
END;
-- テーブルに適用
ALTER TABLE t_sales_log ADD ROW ACCESS POLICY account_access ON (account_id);
30. Snowpark(Python DataFrame API)
SQL を書かずに Python の DataFrame で Snowflake を操作できる。
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, sum as sum_, avg
# セッション作成
session = Session.builder.configs({
"account": "ACCOUNT",
"user": "USER",
"password": "PASS",
"warehouse": "DEV_WH",
"database": "my_database",
"schema": "public"
}).create()
# テーブル読み込み
df = session.table("t_sales_log")
# フィルタ+集計(SQL を書かない)
result = (
df.filter(col("feature") == "packet")
.group_by("account_id")
.agg(
sum_("usage_amount").alias("total"),
avg("usage_amount").alias("avg_val")
)
.sort(col("total").desc())
)
result.show()
# 結果をテーブルに保存
result.write.save_as_table("T_PACKET_SUMMARY")
31. Snowflake Notebooks(ブラウザで Python 実行)
Snowsight 上で Jupyter Notebook のように Python / SQL を実行できる。
- Snowsight → 「Notebooks」→「+」
- SQL セルと Python セルを自由に切り替え
- matplotlib / plotly でグラフ描画
- Snowpark が組み込み済み
# Notebook 内の Python セル
import streamlit as st
import matplotlib.pyplot as plt
# SQL の結果を DataFrame として取得
df = session.sql("SELECT usage_date, SUM(usage_amount) AS total FROM t_sales_log GROUP BY usage_date").to_pandas()
# グラフ描画
plt.figure(figsize=(12, 4))
plt.plot(df['USAGE_DATE'], df['TOTAL'])
plt.title('日次利用量推移')
plt.show()
32. 検索最適化サービス
大規模テーブルの特定カラムに対する検索を高速化する。
-- 検索最適化を有効化
ALTER TABLE t_sales_log ADD SEARCH OPTIMIZATION ON EQUALITY(account_id);
-- ポイント検索が高速になる
SELECT * FROM t_sales_log WHERE account_id = 'A001';
-- 検索最適化の状態確認
DESCRIBE SEARCH OPTIMIZATION ON t_sales_log;
33. マルチクラスターウェアハウス
同時アクセスが多いとき、ウェアハウスを自動でスケールアウトする。
CREATE OR REPLACE WAREHOUSE auto_scale_wh
WAREHOUSE_SIZE = 'SMALL'
MIN_CLUSTER_COUNT = 1
MAX_CLUSTER_COUNT = 3 -- 最大3クラスターに自動拡張
SCALING_POLICY = 'STANDARD'
AUTO_SUSPEND = 60
AUTO_RESUME = TRUE;
34. Apache Iceberg テーブル
外部ストレージ(S3 等)に Parquet 形式でデータを保持しつつ、Snowflake のクエリ性能を活用する。データレイクとの統合に使う。
-- 外部ボリューム作成(S3)
CREATE OR REPLACE EXTERNAL VOLUME my_iceberg_vol
STORAGE_LOCATIONS = (
(
NAME = 'my-s3-location'
STORAGE_BASE_URL = 's3://my-bucket/iceberg/'
STORAGE_PROVIDER = 'S3'
STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::123456789:role/my-role'
)
);
-- Iceberg テーブル作成
CREATE OR REPLACE ICEBERG TABLE usage_iceberg
CATALOG = 'SNOWFLAKE'
EXTERNAL_VOLUME = 'my_iceberg_vol'
BASE_LOCATION = 'usage/'
AS SELECT * FROM t_sales_log;
35. Fail-safe(最終防衛線)
タイムトラベル期間が過ぎたあとも、Snowflake が内部的に 7 日間データを保持する。ユーザーが直接アクセスすることはできず、Snowflake サポートに依頼して復元する。
データ変更 → タイムトラベル期間(最大90日)→ Fail-safe(7日)→ 完全削除
コスト注意:Fail-safe 期間中もストレージ料金が発生する。