はじめに
データ活用基盤構築業務の延長でAzure×Snowpipeの検証をし、(だいぶ)試行錯誤しながら実装までこぎつけたその備忘録です。
やりたいこと
- AzureBlobストレージにおいたcsvファイルを自動でSnowflake上のテーブルに登録
- パフォーマンスの確認
想定場面としては、データレイク(今回はAzureBlobストレージ)からデータウェアハウス(今回はSnowflake)にデータを自動でロードするという設定です。
目的がAzure×Snowpipeの実装経験とSnowpipeの性能確認のため加工変換は割愛しています。
Snowflakeは個人のトライアル環境(30日間無料!)で、Azureは自社の検証環境を借りて実装しました!
そもそもSnowpipeとは?
「Snowpipeを使用すると、ファイルがステージで利用可能になり次第、ファイルからデータをロードできます。つまり、スケジュールで COPY ステートメントを手動で実行してより大きなバッチをロードするのではなく、マイクロバッチのファイルからデータをロードして数分以内にユーザーが利用できるようにします。」
Snowflake: https://docs.snowflake.com/ja/user-guide/data-load-snowpipe-intro
↑公式サイトによる説明です。簡単にいうと、外部ステージに置かれたファイルを自動で読み込み、対象のテーブルへデータを登録してくれるというものです。この本検証においては外部ステージとしてAzure Blobストレージを指定しました。
ざっっっくりイメージ図
公式サイトの手順を参考に実装してみました。自分のやりやすいように所々順番や、やり方を変えています。
早速内容です!(SnowflakeとAzureの環境が使えることを前提としています)
1. Snowflakeにデータベース/スキーマ/テーブルを準備
まずデータを格納するための箱の準備です。任意のデータベースやスキーマ、テーブルを作成します。
2. Azureにてストレージアカウントを2つ作成
①ファイルをアップロードする用
ファイルをアップロードするためストレージアカウントを1つ作成します。検証のため、設定は基本的にデフォルトのままにしています。(※デフォルトのネットワーク設定では「すべてのネットワークからのパブリックアクセスを有効にする」になっていますが、後の手順でロール割り当てを利用したアクセス制御を行うのでそのままでOKです)
作成できたら、コンテナーを1つ作成します。
②メッセージ受け取り用
①のコンテナーにファイルがアップロードされたというメッセージを貯めるためのストレージアカウントを作成します。ストレージアカウントを1つ作成します。設定は①と同様です。アカウントを作成できたらキューを1つ任意の名前で作成します。
3. イベントの作成
2-①で作成したストレージアカウントの「イベント」よりイベントサブスクリプションを作成します。
(※公式の手順書では EventBridge から作成してますが、個人的にこのやり方のほうがわかりやすかったのでこのようにしてます)
設定はこちら
名前:任意
設定:デフォルトのまま
エンドポイント:2-②で作成したストレージキュー
この手順により2-①で作成したコンテナーにファイルがアップロードされるとメッセージが2-②で作成したストレージキューに溜まる設定となります。
4. Snowflakeにてストレージ統合を作成
①CREATE STORAGE INTEGRATION
以下のようなSQL文をSnowflakeのWorksheetで実行し、ストレージ統合を作成します。Snowflakeの外部ステージとAzureのコンテナーを統合します(表現が正しいかは微妙)。
パラメーターの詳細な説明は公式サイトに載っています。
https://docs.snowflake.com/ja/user-guide/data-load-snowpipe-auto-azure#step-1-create-a-cloud-storage-integration-in-snowflake
CREATE OR REPLACE STORAGE INTEGRATION SI_A
TYPE = EXTERNAL_STAGE //外部ステージ
STORAGE_PROVIDER = AZURE
ENABLED = TRUE
AZURE_TENANT_ID = 'xxxxx' //EntraIDにて確認可能
STORAGE_ALLOWED_LOCATIONS = ('azure://xxxxx'); //2-①で作成したファイルアップロード用コンテナー
②Azureストレージアカウントのアクセス制御設定をする
以下のSQLを実行します。
DESC STORAGE INTEGRATION SI_A;
実行後、以下二つの値をメモしておきます。
・AZURE_CONSENT_URL →新規タブに張り付ける
・AZURE_MULTI_TENANT_APP_NAME →後ほど使用する
AZURE_CONSENT_URL にあるURLをタブで開くと許可を求めるページに遷移しますので許可します。成功すると、Snowflakeのトップ画面に遷移します。
この操作によりSnowflakeのアプリケーションがAzure側に登録されます。
Azureに戻り、ファイルアップロード用に作成したストレージアカウントから「アクセス制御→ロール割り当て→追加→ロール割り当ての追加」
を押下します。
設定内容はこちら。
ロール:ストレージ BLOB データ共同作成者
メンバー:先ほどメモした AZURE_MULTI_TENANT_APP_NAME のアンダースコアの前にある文字列でヒットしたもの
https://docs.snowflake.com/ja/user-guide/data-load-snowpipe-auto-azure#step-1-create-a-cloud-storage-integration-in-snowflake
割り当てを完了します。
この操作によりストレージ統合で指定したコンテナへSnowflakeがアクセスすることが可能になります。(BLOBデータ共同作成者の権限を持ったSnowflakeサービスプリンシパルを割り当てた)
5. 通知統合を作成
①CREATE NOTIFICATION INTEGRATION
以下のようなSQL文をSnowflakeのWorksheetで実行し、通知統合を作成します。Azureのキューに溜まるメッセージを検知するように統合設定をします(表現が正しいかは微妙)。
パラメーターの詳細な説明は公式サイトに載っています。
https://docs.snowflake.com/ja/user-guide/data-load-snowpipe-auto-azure#step-2-creating-the-notification-integration
CREATE NOTIFICATION INTEGRATION NI_A
ENABLED = TRUE
TYPE = QUEUE
NOTIFICATION_PROVIDER = AZURE_STORAGE_QUEUE
AZURE_STORAGE_QUEUE_PRIMARY_URI = 'https://xxxxx' //2-②で作成したメッセージ受け取り用キュー
AZURE_TENANT_ID = 'xxxxx'; //EntraIDにて確認可能
②Azureストレージアカウントのアクセス制御設定をする
(手順は4-②と同じですが内容は異なります!)
以下のSQLを実行します。
DESC NOTIFICATION INTEGRATION NI_A;
実行後、以下二つの値をメモしておきます。
・AZURE_CONSENT_URL →新規タブに張り付ける
・AZURE_MULTI_TENANT_APP_NAME →後ほど使用する
先ほどと同様に AZURE_CONSENT_URL にあるURLをタブで開き許可します。この操作によりSnowflakeのアプリケーションがAzure側に登録されます。
Azureに戻り、今度はメッセージ受け取り用に作成したストレージアカウントに「ロール割り当ての追加」を行います。
設定はこちら。
ロール:ストレージ キュー データ 共同作成者
メンバー:先ほどメモした AZURE_MULTI_TENANT_APP_NAME のアンダースコアの前にある文字列でヒットしたもの
https://docs.snowflake.com/ja/user-guide/data-load-snowpipe-auto-azure#step-2-creating-the-notification-integration
割当を完了します。
この操作により通知統合で指定したキューへSnowflakeがアクセスすることが可能になります。(ストレージキュー共同作成者の権限を持ったSnowflakeサービスプリンシパルを割り当てた)
6. ステージの作成
2-①で作成したコンテナーとつながるステージを作成します。
作成先のデータベースやスキーマに注意!
CREATE STAGE DB.PUBLIC.STAGETEST_A
URL = 'azure://xxxxx' //2-①で作成したコンテナー
STORAGE_INTEGRATION = SI_A; //4で作成したストレージ統合
7. パイプの作成
自動でデータをロードするパイプを作成します。
FILE_FORMAT には今回事前に、ロードする際のフォーマット(ヘッダースキップなど)をまとめた MYCSVFORMAT を使用しています。
パラメーターの詳細については以下に載っています。
https://docs.snowflake.com/ja/user-guide/data-load-snowpipe-auto-azure#step-4-create-a-pipe-with-auto-ingest-enabled
作成先のデータベースやスキーマに注意!
CREATE OR REPLACE PIPE DB.PUBLIC.SNOWPIPE_A
AUTO_INGEST=TRUE
INTEGRATION = NI_A //5で作成した通知統合
AS
COPY INTO DB.PUBLIC.TB
FROM @DB.PUBLIC.STAGETEST_A
FILE_FORMAT = (FORMAT_NAME = 'MYCSVFORMAT'); //事前に作成したフォーマットファイルを指定。詳細はサイトをチェック↑
動作確認
それでは、Azureに戻り、2-①で作成したコンテナーにcsvファイルをアップロードしてデータがテーブルに登録されるか確認します。登録されない場合は、トラブルシューティングに沿って原因を探ることをおすすめします。
https://docs.snowflake.com/ja/user-guide/data-load-snowpipe-ts
手順は以上です!
気づいたこと、感想
作りながら感じたことです。
- ファイルの中身にエラーが含まれる際(型違い、桁あふれ、ルールに反するもの等)に、行をスキップするのか、ファイル全体をスキップするのか、を決めておく
→パイプ作成時にオプションパラメータを設定するか、ファイルフォーマットを事前に作成して指定するのがおすすめ
- エラーにより自動ロードされなかった際のエラー通知方法や通知された後の動きも考慮が必要
動作確認をする中で気づきましたが、何かしらの理由でアップロードに失敗した際に、何も起こらないのです(SELECT TABLEするまでわからない)。これじゃあ取り込まれなかったときに気づけないじゃん!ということで、エラー通知を有効にする設定も実装しつつ色々試したのがこちら。
→Snowpipeのエラー通知を有効にしてみた🔰
- 一度Blobコンテナーに上げたファイルは14日間自動アップロード対象外、同名で上書きしてもアップロードされない
動作確認のためにファイルを何度もアップロードする際は毎回名前を変更する必要あり
→もし上記を回避したい場合方法は以下4つ
- COPY INTOを使用して手動でそのファイルをターゲットテーブルにデータロードする
- SNOWPIPEを再生成する
- ALTER PIPE REFRESHを使用する
- 14日後にそのファイルを再ロードする
https://docs.snowflake.com/ja/user-guide/data-load-snowpipe-ts#unable-to-reload-modified-data-modified-data-loaded-unintentionally
- 30MB, 24万行のcsvファイルを30秒経たずにロード完了できたためパフォーマンスは問題なさそう
大きなデータでもパフォーマンスを気にすることなく、しかも自動で取り込んでくれるのはとても使いやすいが自動な分、どう監視するかなどの運用方法はしっかり固めておかないとならない。(これに限らずですが)
番外編
- ストレージアカウントをストレージ統合用と通知統合用の二つ準備しますが、一つでできないかなーと思い試しましたがダメでした。理由をサポートに問い合わせしましたが不明でした。正しくロール割り当てしていればいけそうなのにな~、と思いつつ、、、何かご存じの方いらっしゃいましたら教えてください(Azureの話?)。
ですが、ファイル用 / 通知用とストレージアカウントによって分けることで、役割が明確に分離されるのは意味がありそうですね!
今回の目的だったAzure×Snowpipeの実装経験とパフォーマンスの確認ができました!
以上まとめでした。