はじめに
Snowflake の Snowpipe を使って、Azure Blob Storage 上に配置された JSONL ファイルを Snowflake-managed Iceberg Table にロードする検証を行いました。
今回の構成では、Iceberg Table は Snowflake をカタログとして作成し、Horizon Catalog 経由で外部エンジンから参照できる形を想定しています。
最終的な構成は以下です。
Azure Blob Storage
↓ BlobCreated event
Azure Event Grid
↓
Azure Storage Queue
↓
Snowflake Notification Integration
↓
Snowpipe
↓
Snowflake-managed Iceberg Table
↓
Horizon Catalog
検証構成
今回使った主な Snowflake オブジェクトは以下です。
| オブジェクト | 用途 |
|---|---|
| Database / Schema | 検証用オブジェクトの配置先 |
| File Format | JSONL の読み取り定義 |
| External Stage | Azure Blob Storage 上の JSONL 入力元 |
| Storage Integration | Snowflake が Azure Blob を読むための認証設定 |
| Notification Integration | Snowpipe が Azure Storage Queue から通知を受け取る設定 |
| External Volume | Iceberg Table のデータ・メタデータ出力先 |
| Iceberg Table | JSONL のロード先 |
| Pipe | Snowpipe のロード定義 |
JSONL ファイル
Snowpipe で取り込むファイルは JSONL 形式にしました。
JSONL は 1 行 1 JSON オブジェクトの形式です。
例:
{"sale_id":1001,"store_id":"S001","item_id":"I001","quantity":2,"amount":1200,"sold_at":"2026-06-15T10:00:00"}
{"sale_id":1002,"store_id":"S001","item_id":"I002","quantity":1,"amount":800,"sold_at":"2026-06-15T10:01:00"}
Pipe 側では以下の PATTERN を使いました。
PATTERN = '.*sale__.*\.jsonl'
そのため、ファイル名は以下のように sale__ を含み、拡張子を .jsonl にします。
sale__20260615_001.jsonl
sale__20260615_002.jsonl
Database / Schema 作成
USE ROLE ACCOUNTADMIN;
CREATE DATABASE IF NOT EXISTS SNOWPIPE_TEST_01;
CREATE SCHEMA IF NOT EXISTS SNOWPIPE_TEST_01.SCH_01;
File Format 作成
JSONL 用の File Format を作成します。
CREATE OR ALTER FILE FORMAT SNOWPIPE_TEST_01.SCH_01.FF_JSONL
TYPE = JSON
COMPRESSION = AUTO;
| オプション | 意味 |
|---|---|
TYPE = JSON |
入力ファイルを JSON として読み込みます。 |
COMPRESSION = AUTO |
gzip などの圧縮形式を Snowflake が自動判定します。 |
JSONL は 1 行 1 JSON オブジェクトなので、配列 JSON 用の STRIP_OUTER_ARRAY = TRUE は不要です。
Storage Integration 作成
External Stage から Azure Blob Storage を読むために、Storage Integration を作成します。
CREATE STORAGE INTEGRATION IF NOT EXISTS SI_INBOUND
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = AZURE
ENABLED = TRUE
AZURE_TENANT_ID = '<azure_tenant_id>'
STORAGE_ALLOWED_LOCATIONS = (
'azure://<storage_account>.blob.core.windows.net/<container>/inbound/'
);
| オプション | 意味 |
|---|---|
TYPE = EXTERNAL_STAGE |
外部ステージ用の Storage Integration であることを示します。 |
STORAGE_PROVIDER = AZURE |
接続先が Azure Blob Storage / ADLS であることを示します。 |
ENABLED = TRUE |
Integration を有効化します。 |
STORAGE_ALLOWED_LOCATIONS |
この Integration でアクセスを許可する Azure Storage の場所です。 |
作成後、以下を確認します。
DESC INTEGRATION SI_INBOUND;
ここで表示される AZURE_CONSENT_URL を開き、Azure 側で同意します。
また、AZURE_MULTI_TENANT_APP_NAME に対応する Snowflake のサービスプリンシパルに、Azure 側で Blob への権限を付与します。
今回付与した権限は以下です。
| 権限 | 用途 | 備考 |
|---|---|---|
| ストレージ BLOB データ共同作成者 | Blob の読み取り・一覧・書き込み | - |
| Storage Blob デリゲータ | getUserDelegationKey の実行 | リソースグループ レベルで付与 |
External Stage 作成
Azure Blob Storage 上の JSONL ファイル配置先を External Stage として作成します。
CREATE OR REPLACE STAGE SNOWPIPE_TEST_01.SCH_01.INBOUND_STAGE
URL = 'azure://<storage_account>.blob.core.windows.net/<container>/inbound/'
STORAGE_INTEGRATION = SI_INBOUND;
作成後、ファイル一覧を確認します。
LIST @SNOWPIPE_TEST_01.SCH_01.INBOUND_STAGE;
JSONL の読み取り確認は以下です。
SELECT
$1
FROM @SNOWPIPE_TEST_01.SCH_01.INBOUND_STAGE
(FILE_FORMAT => 'SNOWPIPE_TEST_01.SCH_01.FF_JSONL')
LIMIT 10;
注意点として、SELECT FROM @stage のテーブル関数引数では、以下のようなインライン指定はエラーになりました。
-- NG
(FILE_FORMAT => (TYPE = 'JSON'))
名前付き File Format を文字列で指定すると成功しました。
-- OK
(FILE_FORMAT => 'SNOWPIPE_TEST_01.SCH_01.FF_JSONL')
External Volume 作成
Iceberg Table の出力先として External Volume を作成します。出力先には ADLS を設定します。
CREATE OR REPLACE EXTERNAL VOLUME AZURE_SF_VOLUME
STORAGE_LOCATIONS =
(
(
NAME = 'azure-sf-container'
STORAGE_PROVIDER = 'AZURE'
AZURE_TENANT_ID = '<azure_tenant_id>'
STORAGE_BASE_URL = 'azure://<storage_account>.dfs.core.windows.net/<container>/snowflakecatalog/'
)
)
ALLOW_WRITES = TRUE;
| オプション | 意味 |
|---|---|
NAME |
Storage Location の識別名です。 |
STORAGE_PROVIDER = 'AZURE' |
接続先が Azure であることを示します。 |
STORAGE_BASE_URL |
Iceberg データ・メタデータを配置するベース URL です。 |
ALLOW_WRITES = TRUE |
Snowflake がこの外部ボリュームへ書き込みできるようにします。 |
作成後、確認します。
DESC EXTERNAL VOLUME AZURE_SF_VOLUME;
External Volume 用の Snowflake サービスプリンシパルにも、Azure 側で ADLS の権限を付与する必要があります。
| 権限 | 用途 | 備考 |
|---|---|---|
| ストレージ BLOB データ共同作成者 | Blob の読み取り・一覧・書き込み | - |
Iceberg Table 作成
Snowflake-managed Iceberg Table を作成します。
Horizon Catalog から参照させる場合でも、Snowflake-managed Iceberg Table として作成するため、CATALOG = 'SNOWFLAKE' を指定します。
CREATE ICEBERG TABLE IF NOT EXISTS SNOWPIPE_TEST_01.SCH_01.SALE_ICEBERG (
SALE_ID NUMBER(38,0),
STORE_ID STRING,
ITEM_ID STRING,
QUANTITY NUMBER(38,0),
AMOUNT NUMBER(18,2),
SOLD_AT TIMESTAMP_NTZ
)
CATALOG = 'SNOWFLAKE'
EXTERNAL_VOLUME = 'AZURE_SF_VOLUME'
BASE_LOCATION = 'sale_iceberg/';
Notification Integration 作成
Azure Event Grid から Storage Queue に送られた通知を Snowflake が読むため、Notification Integration を作成します。
CREATE NOTIFICATION INTEGRATION IF NOT EXISTS NI_INBOUND
ENABLED = TRUE
TYPE = QUEUE
NOTIFICATION_PROVIDER = AZURE_STORAGE_QUEUE
AZURE_STORAGE_QUEUE_PRIMARY_URI = 'https://<queue_storage_account>.queue.core.windows.net/<queue_name>'
AZURE_TENANT_ID = '<azure_tenant_id>';
| オプション | 意味 |
|---|---|
ENABLED = TRUE |
Integration を有効化します。 |
TYPE = QUEUE |
Queue ベースの通知を使うことを示します。 |
NOTIFICATION_PROVIDER = AZURE_STORAGE_QUEUE |
Azure Storage Queue から通知を受け取ることを示します。 |
AZURE_STORAGE_QUEUE_PRIMARY_URI |
Event Grid 通知が格納される Azure Storage Queue の URL です。 |
作成後、確認します。
DESC NOTIFICATION INTEGRATION NI_INBOUND;
ここで表示される AZURE_CONSENT_URL を開き、Azure 側で同意します。
その後、Snowflake のサービスプリンシパルに対して、Storage Queue への権限を付与します。
| 権限 | 用途 |
|---|---|
| ストレージ キュー データ共同作成者 | Snowflake が Queue の通知メッセージを読む |
Azure Event Grid Subscription
Azure 側では、Blob Storage にファイルが作成されたイベントを Storage Queue に送る Event Grid Subscription を作成します。
構成は以下です。
Blob Storage
→ Event Grid System Topic
→ Event Grid Subscription
→ Storage Queue
Azure Portal で作成する場合の例です。
| 項目 | 値 |
|---|---|
| System topic name | egst-snowpipe-inbound |
| Event subscription name | egsub-snowpipe-inbound |
| Event type | Blob Created |
| Endpoint type | Storage Queues |
| Endpoint | Snowpipe 用 Storage Queue |
Event Grid 作成時に以下のエラーが出た場合は、Azure サブスクリプションで Event Grid のリソースプロバイダーが未登録です。
サブスクリプションは、名前空間 'Microsoft.EventGrid' を使用するように登録されていません。
Azure Portal で以下を登録します。
Subscriptions
→ 対象サブスクリプション
→ Resource providers
→ Microsoft.EventGrid
→ Register
Pipe 作成
最後に Snowpipe を作成します。
CREATE PIPE IF NOT EXISTS SNOWPIPE_TEST_01.SCH_01.INBOUND_SALE_ICEBERG_PIPE
AUTO_INGEST = TRUE
INTEGRATION = 'NI_INBOUND'
AS
COPY INTO SNOWPIPE_TEST_01.SCH_01.SALE_ICEBERG
FROM @SNOWPIPE_TEST_01.SCH_01.INBOUND_STAGE
PATTERN = '.*sale__.*\.jsonl'
FILE_FORMAT = (FORMAT_NAME = SNOWPIPE_TEST_01.SCH_01.FF_JSONL)
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE を使うため、JSONL のキー名と Iceberg Table の列名を対応させます。
| JSONL key | Iceberg column |
|---|---|
| sale_id | SALE_ID |
| store_id | STORE_ID |
| item_id | ITEM_ID |
| quantity | QUANTITY |
| amount | AMOUNT |
| sold_at | SOLD_AT |
Pipe の起動
AUTO_INGEST = TRUE の場合、新しいファイルが Azure Blob Storage に置かれると、Event Grid → Storage Queue → Snowpipe の順に通知され、自動でロードされます。
既存ファイルを取り込みたい場合は、手動で REFRESH します。
ALTER PIPE SNOWPIPE_TEST_01.SCH_01.INBOUND_SALE_ICEBERG_PIPE REFRESH;
Pipe の状態確認は以下です。
SELECT SYSTEM$PIPE_STATUS('SNOWPIPE_TEST_01.SCH_01.INBOUND_SALE_ICEBERG_PIPE');
結果は以下です。
詰まったポイント
1. Event Grid のリソースプロバイダー未登録
Azure 側で Microsoft.EventGrid が未登録だと、System Topic / Event Subscription 作成に失敗します。
az provider register --namespace Microsoft.EventGrid
2. Pipe Notifications bind failure
以下のエラーが出ました。
Pipe Notifications bind failure
Internal error, could not locate queue for integration
原因は、Notification Integration に設定した Queue URI や Azure Queue 側の権限設定が不完全だったことです。
3. getUserDelegationKey の権限不足
以下のエラーが出ました。
Authorization permission mismatch in getUserDelegationKey
対応として、Storage Integration 用サービスプリンシパルに、リソースグループ レベルで Storage Blob Delegator 相当の権限を付与しました。
4. SELECT は成功するが COPY INTO Iceberg が失敗
SELECT FROM @stage は成功しているのに、COPY INTO Iceberg で以下のエラーが出ました。
Failed to access remote file: access denied. Please check your credentials
この場合、入力 Stage 側ではなく、Iceberg の出力先である External Volume 側の ADLS 権限不足でした。
External Volume 用の Snowflake サービスプリンシパルに ADLS の権限を付与したところ解決しました。
まとめ
今回の検証では、Snowpipe を使って Azure Blob Storage 上の JSONL ファイルを Snowflake-managed Iceberg Table にロードできました。
特に重要だったのは、Azure 側の権限が用途ごとに分かれている点です。
| Snowflake オブジェクト | Azure 側で必要な権限 |
|---|---|
| Storage Integration | 入力 Blob を読む権限 |
| Notification Integration | Storage Queue を読む権限 |
| External Volume | Iceberg 出力先 ADLS/Blob に書き込む権限 |
SELECT FROM @stage が成功しても、COPY INTO Iceberg では External Volume 側の書き込みが発生します。
そのため、入力元 Stage の権限だけでなく、Iceberg 出力先の ADLS 権限も忘れずに設定する必要があります。
今回の最終的な流れは以下でした。
1. Azure Blob Storage に JSONL を配置
2. Event Grid が BlobCreated を検知
3. Storage Queue に通知メッセージを送信
4. Snowpipe が Queue から通知を取得
5. External Stage から JSONL を読み取り
6. Iceberg Table にロード
7. External Volume 配下に Iceberg データ・メタデータが作成される
8. Horizon Catalog 経由で参照可能になる




