はじめに
今回はSnowPro Coreの勉強の一環として実施したデータロードのハンズオンをご紹介いたします。
前提条件
- AWSアカウントでIAM・S3の作成権限があること
- Snowflakeアカウントが作成済みであること
構成
AWSリソースの準備
S3バケットの作成
ロードするファイルを置くためのバケットを作成しておきます。
今回は名前以外、デフォルトの設定で作成しております。
作成後、バケットポリシーも変更不要です。
IAM ポリシー・ロールの作成
S3バケットのファイルをSnowflake側が取得できるようにIAMポリシーとIAMロールを作成します。
- ロールにアタッチするIAMポリシーの権限
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::作成したバケット名",
"arn:aws:s3:::作成したバケット名/*"
]
}
]
}
ロールの作成では信頼されたエンティティタイプで「AWSアカウント」を選択し、外部IDは一旦適当な値を入れておきます。
※後から信頼ポリシーを修正します。
Snowflakeの設定
作業用ワークシートの作成
snowflakeへログイン後、左の「+」アイコンからSQLワークシートを選ぶとSQLワークシートが作成されます。
下のコマンドプロンプトみたいなアイコン(プロジェクト)からワークシートのページへ移動して作成することも可能です。
データベースとテーブルの作成
-- データベース作成
CREATE DATABASE IF NOT EXISTS QIITA_DEMO_DB;
-- データベース使用
USE QIITA_DEMO_DB;
-- サンプルデータ用テーブル
CREATE OR REPLACE TABLE customer_data (
customer_id INT,
customer_name VARCHAR(100),
email VARCHAR(100),
registration_date DATE,
last_login TIMESTAMP
);
Storage Integrationの作成
-- storage integrationの作成
create or replace storage integration s3_integration
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'S3'
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::XXXXXX' -- 作成したポリシーのARNをコピペする
STORAGE_ALLOWED_LOCATIONS = ('s3://作成したバケット名/');
-- Integrationの詳細を確認
DESC INTEGRATION;
IAMロールの信頼ポリシー修正
DESC INTEGRATION ストレージ統合名;
のコマンド結果の
・STORAGE_AWS_IAM_USER_ARN
・STORAGE_AWS_EXTERNAL_ID
のproperty_valueの値を、以下に入力して信頼ポリシーを修正します。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {
"AWS": "ここにSTORAGE_AWS_IAM_USER_ARNを入力"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "ここにSTORAGE_AWS_EXTERNAL_IDを入力"
}
}
}
]
}
外部ステージとSnowpipeの作成
外部ステージの作成
-- External Stage作成
CREATE OR REPLACE STAGE my_s3_stage
STORAGE_INTEGRATION = s3_integration
URL = 's3://my-snowflake-data-bucket/'
FILE_FORMAT = (TYPE = CSV FIELD_DELIMITER = ',' SKIP_HEADER = 1);
-- ステージ作成
CREATE OR REPLACE STAGE s3_stage
URL='s3://qiita-demo-snowflake-202506/'
STORAGE_INTEGRATION = s3_integration;
Snowpipeの作成
-- Snowpipe作成
CREATE OR REPLACE pipe user_pipe
auto_ingest = TRUE
AS
COPY INTO customer_data
FROM @s3_stage
FILE_FORMAT = (TYPE = 'CSV' FIELD_DELIMITER = ',' SKIP_HEADER = 1 TIMESTAMP_FORMAT = 'YYYY-MM-DD HH24:MI:SS');
S3イベント通知作成
S3にファイルが置かれた際にSnowpipeのSQSにイベント通知を行うように設定します。
SnowpipeのSQSArn確認
以下のSQLを実行し、「notification_channel」の値をコピーします。
-- sqs確認
DESC PIPE USER_PIPE;
イベント通知作成
バケットのプロパティ内のイベント通知からイベント通知を作成します。
イベントタイプにはすべてのオブジェクト作成イベントにチェックを入れます。
通知先にはSQSキューを選択し、コピーしたARNを張り付け保存します。
テストデータでの動作確認
customer_dataテーブルに入れるデータをCSVファイルとして作成し、S3バケットにアップロードします。
Snowpipeによって自動的にテーブルにロードされることを確認します。
サンプルCSVファイルの準備
customer_id,customer_name,email,registration_date,last_login
1,田中太郎,tanaka@example.com,2024-01-15,2024-06-15 10:30:00
2,佐藤花子,sato@example.com,2024-02-20,2024-06-14 15:45:00
3,鈴木次郎,suzuki@example.com,2024-03-10,2024-06-13 09:20:00
S3へのファイルアップロード
データロード結果の確認
-- データ確認
SELECT * FROM customer_data;
まとめ
初学者の方も、本記事の手順に従って段階的に進めることで、動作する環境を構築できると思います。
これを応用して色々と試す際は、snowpipeでは1度ロードされたファイルはS3に置きなおしても再ロードされないことにも注意しましょう。