今回の課題
前回の記事で、RDSスナップショットをS3にparquet形式でエクスポートしたので、
そのデータをSnowflakeにロードしたい。
↓前回の記事
実装手順
S3のparquetファイルをSnowflakeにロードした手順を記載していく。
前回の続きなので、
1)「Snowflakeにエクスポートするデータを格納しているS3バケット」にアクセス可能なIAMポリシーを作成する
AWSのIAMコンソールにて、下記でポリシーを作成する。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetBucketLocation"
],
"Resource": "arn:aws:s3:::*",
"Condition": {
"StringLike": {
"s3:prefix": [
"*"
]
}
}
},
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject",
"s3:GetObjectVersion",
"s3:DeleteObject",
"s3:DeleteObjectVersion"
],
"Resource": [
"[アクセスしたいバケットのARN]",
"[アクセスしたいバケットのARN]/*"
]
}
]
}
2)IAMロールを作成して、そのロールに1)で作成したポリシーをアタッチする
下記の画像のように、
信頼されたエンティティタイプ
をAWSアカウント
に設定して、新しくロールを作成し、
1)で作成したポリシーをアタッチする。
※画像引用先:Snowflakeで外部ステージを作成する
3)Snowflakeにてストレージ統合を作成する
ストレージ統合とは、クラウドストレージとSnowflakeを接続するためのオブジェクト。
クラウドストレージにアクセスする際には、「ストレージ統合」を経由してアクセスする。
単一のストレージは、複数の外部ステージをサポートできる。
-- 外部ストレージ統合
CREATE or replace STORAGE INTEGRATION INT_S3_ACCESS
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = S3
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = [2で作成したロールのARN]
STORAGE_ALLOWED_LOCATIONS = ('s3://[バケット名]')
;
こちらのクエリを実行することでストレージ統合が作成された。
この「ストレージ統合」を経由して、外部のクラウドストレージにアクセスすることができるようになった。
4)Snowflakeが使用するAWSの「IAMユーザー」と「外部ID」を取得する
Snowflakeのworksheetにて、下記のクエリを実行して抽出されたデータから、
STORAGE_AWS_IAM_USER_ARN
、STORAGE_AWS_EXTERNAL_ID
のproperty_valueの値を覚えておく。
desc integration [ストレージ統合の名称];
ここで確認したSTORAGE_AWS_IAM_USER_ARN
、STORAGE_AWS_EXTERNAL_ID
のproperty_valueの値を、AWSのIAMロール作成時に利用する。
5)2)で作成したロールの信頼ポリシーを編集する
そして次に、ロールの信頼ポリシーを編集
をクリックして、
3)にてdesc integration
で取得した、STORAGE_AWS_IAM_USER_ARN
とSTORAGE_AWS_EXTERNAL_ID
の値を下記に追記する。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "[desc integrationで取得したSTORAGE_AWS_IAM_USER_ARNの値]"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "[desc integrationで取得したSTORAGE_AWS_EXTERNAL_IDの値]"
}
}
}
]
}
6)ファイルフォーマットの作成
下記のクエリで、Snowflakeのテーブルにアクセスまたはロードするための、
ファイルの形式を事前に定義しておく。
今回はparquet形式のファイルをSowflakeにロードしたいので、
type = parquet
と定義している。
create or replace file format parquet_format
type = parquet
;
7)外部ステージを作成
下記のクエリで外部ステージを作成する。
これによって、Snowflakeから外部ステージのデータに対してクエリが実行できるようになった。
create or replace stage external_stage
url = 's3://[バケット名]'
storage_integration = INT_S3_ACCESS
file_format = parquet_format
;
下記のようなクエリで、この外部ステージのファイルに対してクエリを実行することができる。
FROM句に、@[外部ステージ名]/[S3のファイル名]/[S3のファイル名]
と記述することで、S3内のファイルにアクセスしてクエリでデータを抽出することができる。
select $1 from @EXTERNAL_TABLE/mysnapshot20230402043959/rails_test_app_test/rails_test_app_test.purchase_log/1/part-00000-cc307475-00bf-4e8f-a351-b358a1636058-c000.gz.parquet
(file_format => parquet_format)
そして、上記のクエリを実行することで、下記の画像のようにparquet形式のデータを抽出することができる。
※上記のクエリで呼び出せない場合は、rdsからS3にスナップショットをエクスポートする際にKMSで暗号化していることが原因かもしれないので、こちらを確認してください。
8)テーブルを作成する
S3のparquet形式のファイル(=RDSのスナップショット)を格納するためのテーブルを作成しておく。
-- 購入履歴のようなテーブルを今回はサンプルで用意した。
create table DEMO_DB.PUBLIC.purchase_log(
dt varchar(255)
, order_id integer
, user_id varchar(255)
, purchase_amount integer
)
;
9)8)で作成したテーブルにS3のデータを格納する
下記のクエリで、parquet形式のファイルをテーブルにきれいに収まるように調整して、
8)で作成したテーブルにデータを格納する。
copy into purchase_table
from
(
SELECT
$1:dt::varchar
, $1:order_id::varchar
, $1:user_id::varchar
, $1:purchase_amount::varchar
FROM
@EXTERNAL_TABLE/mysnapshot20230402043959/rails_test_app_test/rails_test_app_test.purchase_log/1/part-00000-cc307475-00bf-4e8f-a351-b358a1636058-c000.gz.parquet
(file_format => parquet_format)
)
;
下記のクエリでを実行すると、
SELECT
$1:dt::varchar
, $1:order_id::varchar
, $1:user_id::varchar
, $1:purchase_amount::varchar
FROM
@EXTERNAL_TAB/mysnapshot20230402043959/rails_test_app_test/rails_test_app_test.purchase_log/1/part-00000-cc307475-00bf-4e8f-a351-b358a1636058-c000.gz.parquet
(file_format => parquet_format)
;
下記の画像のように、parquet形式のデータをテーブルの形に整形することができる。
まとめ
以上で、S3からSnowflakeにデータをロードすることができた。
参考記事