みなさん、こんにちは!
Snowpipe は Snowflake で提供されているデータロード機能です。ステージ(データ取り込み対象のストレージ)でファイルが利用可能になったタイミングでデータロードを行うことができます。
COPY コマンドを手動で実行する一括ロードと違い、クラウドストレージのイベント通知や REST API による自動ロードを実現できるという点が特徴です。
本記事では Snowpipe と Azure Event Grid を利用してデータロードを自動化する方法についてご紹介します。
※操作はすべてACCOUNTADMIN
ロールで行っています。
設定手順
設定手順は以下の通りです。
1. テーブル準備
Snowflake のワークシートで以下のSQLを実行し、テーブルemployeesの作成とロード用のCSVファイル形式csv_formatの作成を行います。
-- employeesテーブル作成
CREATE OR REPLACE TABLE employees (
employee_id STRING,
first_name STRING,
last_name STRING,
email STRING,
department STRING,
salary INTEGER
);
-- ファイル形式作成
CREATE OR REPLACE FILE FORMAT csv_format
TYPE = 'CSV'
FIELD_DELIMITER = ','
SKIP_HEADER = 0;
2. ストレージへのアクセス構成
こちらの記事を参考に、以下の設定を行います。
- ストレージアカウント作成
- ストレージ統合(
azure_int
)作成 - 外部ステージ(
ext_stage_azure
)作成
3. Event Grid サブスクリプション構成
Azure ポータルでストレージアカウントコンソールを開き、ファイルロード対象のストレージアカウントの「イベント」で「+イベントサブスクリプション」をクリックします。
基本設定で以下の項目を設定します。
- 名前:イベントサブスクリプションの名前
- システムトピック名:イベント送信先となるシステムトピックの名前
- イベントの種類のフィルター:送信対象のイベント。今回はファイルのアップロード時にイベントを送信したいため、「Blob Created」を選択。
- エンドポイントの種類:「ストレージキュー」を選択
- エンドポイント:「エンドポイントの構成」をクリックして設定
その他はデフォルト設定のままで「作成」をクリックします。
4. 通知統合作成
Snowflake で以下のSQLを実行します。
CREATE NOTIFICATION INTEGRATION azure_notification_int
ENABLED = true
TYPE = QUEUE
NOTIFICATION_PROVIDER = AZURE_STORAGE_QUEUE
AZURE_STORAGE_QUEUE_PRIMARY_URI = '<queue_URL>'
AZURE_TENANT_ID = '<tenant_ID>';
<queue_URL>
は対象ストレージアカウントの「データストレージ」→「キュー」を開き、上記で設定したキューのURLの値を設定します。
<tenant_id>
は Azure ポータルから「Microsoft Entra ID」を開き、概要の「テナントID」で確認できます。
5. ストレージキューへのアクセス許可
Snowflake で以下のSQLを実行します。
DESC NOTIFICATION INTEGRATION azure_notification_int;
実行結果のAZURE_CONSENT_URL
のリンクをクリックします。
アクセス許可画面にリダイレクトされるので、「承諾」をクリックします。
これにより、Snowflake がストレージキューにアクセスするためのアプリケーションが作成されます。
次に、アプリケーションにストレージキューに対するロールを付与します。
対象ストレージキューの「アクセス制御(IAM)」を開き、「追加」をクリックします。
ロールの割り当ての追加で、「ストレージキューデータ共同作成者」を選択し「次へ」をクリックします。
「メンバーを選択する」をクリックし、AZURE_MULTI_TENANT_APP_NAME
に記載のアプリケーションを選択します。
「レビューと割り当て」をクリックして完了です。
6. パイプ作成
Snowflake で以下のSQLを実行します。
CREATE PIPE test_pipe
AUTO_INGEST = true
INTEGRATION = 'AZURE_NOTIFICATION_INT'
AS
COPY INTO employees
FROM @ext_stage_azure
FILE_FORMAT = csv_format;
test_pipe
という名前のパイプを作成するコマンドです。AS句以降に指定した COPY コマンドが、AZURE_NOTIFICATION_INT
から通知を受け取ったタイミングで実行されます。
作成したパイプの詳細はデータベース管理画面から確認することができます。
以上でパイプの設定は完了です。
動作確認
以下のemployees
テーブルに追加データをロードしてみます。
Azure ポータルから対象ストレージアカウントのコンテナにCSVファイルemployees_2.csv
をアップロードします。
CSVファイルの内容は以下の通りです。
1006,David,Wilson,david.wilson@example.com,Engineering,72000
1007,Susan,Martinez,susan.martinez@example.com,Marketing,68000
1008,Robert,Garcia,robert.garcia@example.com,Sales,71000
1009,Linda,Anderson,linda.anderson@example.com,HR,59000
1010,James,Thomas,james.thomas@example.com,Finance,82000
アップロード後、パイプが作動してテーブルにデータが取り込まれたことを確認できました。
さいごに
Snowpipe を利用すると、特定の場所にアップロードされたファイルのデータを自動的にテーブルに取り込むことができます。
パイプによりデータロードが自動化できることに加え、イベント通知やAPIリクエストがあったタイミングのみパイプが実行されるため運用コストも最小限に抑えることができます。
今回ご紹介したのは Azure の場合のみですが、AWS や Google Cloud でも同様にデータロードを自動化することが可能です。お使いの環境や要件に応じて、利用を検討してみてください。