0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Snowpipe で Azure Blob 上の JSONL ファイルを Snowflake-managed Iceberg Table にロードする

0
Last updated at Posted at 2026-06-16

はじめに

Snowflake の Snowpipe を使って、Azure Blob Storage 上に配置された JSONL ファイルを Snowflake-managed Iceberg Table にロードする検証を行いました。

今回の構成では、Iceberg Table は Snowflake をカタログとして作成し、Horizon Catalog 経由で外部エンジンから参照できる形を想定しています。

最終的な構成は以下です。

Azure Blob Storage
  ↓ BlobCreated event
Azure Event Grid
  ↓
Azure Storage Queue
  ↓
Snowflake Notification Integration
  ↓
Snowpipe
  ↓
Snowflake-managed Iceberg Table
  ↓
Horizon Catalog

検証構成

今回使った主な Snowflake オブジェクトは以下です。

オブジェクト 用途
Database / Schema 検証用オブジェクトの配置先
File Format JSONL の読み取り定義
External Stage Azure Blob Storage 上の JSONL 入力元
Storage Integration Snowflake が Azure Blob を読むための認証設定
Notification Integration Snowpipe が Azure Storage Queue から通知を受け取る設定
External Volume Iceberg Table のデータ・メタデータ出力先
Iceberg Table JSONL のロード先
Pipe Snowpipe のロード定義

JSONL ファイル

Snowpipe で取り込むファイルは JSONL 形式にしました。
JSONL は 1 行 1 JSON オブジェクトの形式です。

例:

{"sale_id":1001,"store_id":"S001","item_id":"I001","quantity":2,"amount":1200,"sold_at":"2026-06-15T10:00:00"}
{"sale_id":1002,"store_id":"S001","item_id":"I002","quantity":1,"amount":800,"sold_at":"2026-06-15T10:01:00"}

Pipe 側では以下の PATTERN を使いました。

PATTERN = '.*sale__.*\.jsonl'

そのため、ファイル名は以下のように sale__ を含み、拡張子を .jsonl にします。

sale__20260615_001.jsonl
sale__20260615_002.jsonl

Database / Schema 作成

USE ROLE ACCOUNTADMIN;

CREATE DATABASE IF NOT EXISTS SNOWPIPE_TEST_01;

CREATE SCHEMA IF NOT EXISTS SNOWPIPE_TEST_01.SCH_01;

File Format 作成

JSONL 用の File Format を作成します。

CREATE OR ALTER FILE FORMAT SNOWPIPE_TEST_01.SCH_01.FF_JSONL
  TYPE = JSON
  COMPRESSION = AUTO;
オプション 意味
TYPE = JSON 入力ファイルを JSON として読み込みます。
COMPRESSION = AUTO gzip などの圧縮形式を Snowflake が自動判定します。

JSONL は 1 行 1 JSON オブジェクトなので、配列 JSON 用の STRIP_OUTER_ARRAY = TRUE は不要です。

Storage Integration 作成

External Stage から Azure Blob Storage を読むために、Storage Integration を作成します。

CREATE STORAGE INTEGRATION IF NOT EXISTS SI_INBOUND
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = AZURE
  ENABLED = TRUE
  AZURE_TENANT_ID = '<azure_tenant_id>'
  STORAGE_ALLOWED_LOCATIONS = (
    'azure://<storage_account>.blob.core.windows.net/<container>/inbound/'
  );
オプション 意味
TYPE = EXTERNAL_STAGE 外部ステージ用の Storage Integration であることを示します。
STORAGE_PROVIDER = AZURE 接続先が Azure Blob Storage / ADLS であることを示します。
ENABLED = TRUE Integration を有効化します。
STORAGE_ALLOWED_LOCATIONS この Integration でアクセスを許可する Azure Storage の場所です。

作成後、以下を確認します。

DESC INTEGRATION SI_INBOUND;

ここで表示される AZURE_CONSENT_URL を開き、Azure 側で同意します。

また、AZURE_MULTI_TENANT_APP_NAME に対応する Snowflake のサービスプリンシパルに、Azure 側で Blob への権限を付与します。

今回付与した権限は以下です。

権限 用途 備考
ストレージ BLOB データ共同作成者 Blob の読み取り・一覧・書き込み -
Storage Blob デリゲータ getUserDelegationKey の実行 リソースグループ レベルで付与

External Stage 作成

Azure Blob Storage 上の JSONL ファイル配置先を External Stage として作成します。

CREATE OR REPLACE STAGE SNOWPIPE_TEST_01.SCH_01.INBOUND_STAGE
  URL = 'azure://<storage_account>.blob.core.windows.net/<container>/inbound/'
  STORAGE_INTEGRATION = SI_INBOUND;

作成後、ファイル一覧を確認します。

LIST @SNOWPIPE_TEST_01.SCH_01.INBOUND_STAGE;

JSONL の読み取り確認は以下です。

SELECT
  $1
FROM @SNOWPIPE_TEST_01.SCH_01.INBOUND_STAGE
  (FILE_FORMAT => 'SNOWPIPE_TEST_01.SCH_01.FF_JSONL')
LIMIT 10;

注意点として、SELECT FROM @stage のテーブル関数引数では、以下のようなインライン指定はエラーになりました。

-- NG
(FILE_FORMAT => (TYPE = 'JSON'))

名前付き File Format を文字列で指定すると成功しました。

-- OK
(FILE_FORMAT => 'SNOWPIPE_TEST_01.SCH_01.FF_JSONL')

External Volume 作成

Iceberg Table の出力先として External Volume を作成します。出力先には ADLS を設定します。

image.png

CREATE OR REPLACE EXTERNAL VOLUME AZURE_SF_VOLUME
  STORAGE_LOCATIONS =
    (
      (
        NAME = 'azure-sf-container'
        STORAGE_PROVIDER = 'AZURE'
        AZURE_TENANT_ID = '<azure_tenant_id>'
        STORAGE_BASE_URL = 'azure://<storage_account>.dfs.core.windows.net/<container>/snowflakecatalog/'
      )
    )
  ALLOW_WRITES = TRUE;
オプション 意味
NAME Storage Location の識別名です。
STORAGE_PROVIDER = 'AZURE' 接続先が Azure であることを示します。
STORAGE_BASE_URL Iceberg データ・メタデータを配置するベース URL です。
ALLOW_WRITES = TRUE Snowflake がこの外部ボリュームへ書き込みできるようにします。

作成後、確認します。

DESC EXTERNAL VOLUME AZURE_SF_VOLUME;

External Volume 用の Snowflake サービスプリンシパルにも、Azure 側で ADLS の権限を付与する必要があります。

権限 用途 備考
ストレージ BLOB データ共同作成者 Blob の読み取り・一覧・書き込み -

Iceberg Table 作成

Snowflake-managed Iceberg Table を作成します。

Horizon Catalog から参照させる場合でも、Snowflake-managed Iceberg Table として作成するため、CATALOG = 'SNOWFLAKE' を指定します。

CREATE ICEBERG TABLE IF NOT EXISTS SNOWPIPE_TEST_01.SCH_01.SALE_ICEBERG (
  SALE_ID NUMBER(38,0),
  STORE_ID STRING,
  ITEM_ID STRING,
  QUANTITY NUMBER(38,0),
  AMOUNT NUMBER(18,2),
  SOLD_AT TIMESTAMP_NTZ
)
  CATALOG = 'SNOWFLAKE'
  EXTERNAL_VOLUME = 'AZURE_SF_VOLUME'
  BASE_LOCATION = 'sale_iceberg/';

Notification Integration 作成

Azure Event Grid から Storage Queue に送られた通知を Snowflake が読むため、Notification Integration を作成します。

image.png

CREATE NOTIFICATION INTEGRATION IF NOT EXISTS NI_INBOUND
  ENABLED = TRUE
  TYPE = QUEUE
  NOTIFICATION_PROVIDER = AZURE_STORAGE_QUEUE
  AZURE_STORAGE_QUEUE_PRIMARY_URI = 'https://<queue_storage_account>.queue.core.windows.net/<queue_name>'
  AZURE_TENANT_ID = '<azure_tenant_id>';
オプション 意味
ENABLED = TRUE Integration を有効化します。
TYPE = QUEUE Queue ベースの通知を使うことを示します。
NOTIFICATION_PROVIDER = AZURE_STORAGE_QUEUE Azure Storage Queue から通知を受け取ることを示します。
AZURE_STORAGE_QUEUE_PRIMARY_URI Event Grid 通知が格納される Azure Storage Queue の URL です。

作成後、確認します。

DESC NOTIFICATION INTEGRATION NI_INBOUND;

ここで表示される AZURE_CONSENT_URL を開き、Azure 側で同意します。

その後、Snowflake のサービスプリンシパルに対して、Storage Queue への権限を付与します。

権限 用途
ストレージ キュー データ共同作成者 Snowflake が Queue の通知メッセージを読む

Azure Event Grid Subscription

Azure 側では、Blob Storage にファイルが作成されたイベントを Storage Queue に送る Event Grid Subscription を作成します。

構成は以下です。

Blob Storage
  → Event Grid System Topic
  → Event Grid Subscription
  → Storage Queue

Azure Portal で作成する場合の例です。

項目
System topic name egst-snowpipe-inbound
Event subscription name egsub-snowpipe-inbound
Event type Blob Created
Endpoint type Storage Queues
Endpoint Snowpipe 用 Storage Queue

image.png

Event Grid 作成時に以下のエラーが出た場合は、Azure サブスクリプションで Event Grid のリソースプロバイダーが未登録です。

サブスクリプションは、名前空間 'Microsoft.EventGrid' を使用するように登録されていません。

Azure Portal で以下を登録します。

Subscriptions
  → 対象サブスクリプション
  → Resource providers
  → Microsoft.EventGrid
  → Register

image.png

Pipe 作成

最後に Snowpipe を作成します。

CREATE PIPE IF NOT EXISTS SNOWPIPE_TEST_01.SCH_01.INBOUND_SALE_ICEBERG_PIPE
  AUTO_INGEST = TRUE
  INTEGRATION = 'NI_INBOUND'
AS
COPY INTO SNOWPIPE_TEST_01.SCH_01.SALE_ICEBERG
FROM @SNOWPIPE_TEST_01.SCH_01.INBOUND_STAGE
PATTERN = '.*sale__.*\.jsonl'
FILE_FORMAT = (FORMAT_NAME = SNOWPIPE_TEST_01.SCH_01.FF_JSONL)
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;

MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE を使うため、JSONL のキー名と Iceberg Table の列名を対応させます。

JSONL key Iceberg column
sale_id SALE_ID
store_id STORE_ID
item_id ITEM_ID
quantity QUANTITY
amount AMOUNT
sold_at SOLD_AT

Pipe の起動

AUTO_INGEST = TRUE の場合、新しいファイルが Azure Blob Storage に置かれると、Event Grid → Storage Queue → Snowpipe の順に通知され、自動でロードされます。

既存ファイルを取り込みたい場合は、手動で REFRESH します。

ALTER PIPE SNOWPIPE_TEST_01.SCH_01.INBOUND_SALE_ICEBERG_PIPE REFRESH;

Pipe の状態確認は以下です。

SELECT SYSTEM$PIPE_STATUS('SNOWPIPE_TEST_01.SCH_01.INBOUND_SALE_ICEBERG_PIPE');

結果は以下です。

image.png

詰まったポイント

1. Event Grid のリソースプロバイダー未登録

Azure 側で Microsoft.EventGrid が未登録だと、System Topic / Event Subscription 作成に失敗します。

az provider register --namespace Microsoft.EventGrid

2. Pipe Notifications bind failure

以下のエラーが出ました。

Pipe Notifications bind failure
Internal error, could not locate queue for integration

原因は、Notification Integration に設定した Queue URI や Azure Queue 側の権限設定が不完全だったことです。

3. getUserDelegationKey の権限不足

以下のエラーが出ました。

Authorization permission mismatch in getUserDelegationKey

対応として、Storage Integration 用サービスプリンシパルに、リソースグループ レベルで Storage Blob Delegator 相当の権限を付与しました。

4. SELECT は成功するが COPY INTO Iceberg が失敗

SELECT FROM @stage は成功しているのに、COPY INTO Iceberg で以下のエラーが出ました。

Failed to access remote file: access denied. Please check your credentials

この場合、入力 Stage 側ではなく、Iceberg の出力先である External Volume 側の ADLS 権限不足でした。

External Volume 用の Snowflake サービスプリンシパルに ADLS の権限を付与したところ解決しました。

まとめ

今回の検証では、Snowpipe を使って Azure Blob Storage 上の JSONL ファイルを Snowflake-managed Iceberg Table にロードできました。

特に重要だったのは、Azure 側の権限が用途ごとに分かれている点です。

Snowflake オブジェクト Azure 側で必要な権限
Storage Integration 入力 Blob を読む権限
Notification Integration Storage Queue を読む権限
External Volume Iceberg 出力先 ADLS/Blob に書き込む権限

SELECT FROM @stage が成功しても、COPY INTO Iceberg では External Volume 側の書き込みが発生します。
そのため、入力元 Stage の権限だけでなく、Iceberg 出力先の ADLS 権限も忘れずに設定する必要があります。

今回の最終的な流れは以下でした。

1. Azure Blob Storage に JSONL を配置
2. Event Grid が BlobCreated を検知
3. Storage Queue に通知メッセージを送信
4. Snowpipe が Queue から通知を取得
5. External Stage から JSONL を読み取り
6. Iceberg Table にロード
7. External Volume 配下に Iceberg データ・メタデータが作成される
8. Horizon Catalog 経由で参照可能になる
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?