今回の課題
以前、下記の記事にて、S3からSnowflakeへのデータのロードを手動で行えるように実装したので、
今回はSnowpipe機能を使用し、ファイルがエクスポートされたと同時に自動的にSnowflakeにロードされる機能を実装したい。
実装する手順
前提として、外部ステージを作成しておく必要があります。
こちらの記事では、上記の記事で作成した外部ステージであるexternal_stage
を使用する。
また、上記の記事でも記載されているが、ここでの外部ステージはS3のことである。
公式ドキュメントに記載がある通り、以下の流れで処理が走るように実装していく。
- データファイルはステージにロードされます。
- S3イベント通知は、SQS キューを介してSnowpipeにファイルをロードする準備ができたことを通知します。Snowpipeはファイルをキューにコピーします。
- Snowflakeが提供する仮想ウェアハウスは、指定されたパイプで定義されたパラメーターに基づいて、キューに入れられたファイルからターゲットテーブルにデータをロードします。
-- https://docs.snowflake.com/ja/user-guide/data-load-snowpipe-auto-s3#option-1-creating-a-new-s3-event-notification-to-automate-snowpipe
1)Snowpipeを作成する
外部ステージのexternal_stage
に対して、以下のようなクエリを実行する。
以下のクエリがどういった処理をするかというと、
外部ステージであるS3のバケット内の/rails_test_app_test.purchase_log/1/
ディレクトリにparquet形式のファイルがエクスポートされると、
自動でCOPY INTO
が走り、purchase_table
に外部ステージのデータがロードされる
といったイメージ。
create pipe snowpipe_practice auto_ingest = true as
copy into DEMO_DB.PUBLIC.purchase_table
from
(
SELECT
$1:dt::varchar
, $1:order_id::varchar
, $1:user_id::varchar
, $1:purchase_amount::varchar
FROM
@EXTERNAL_TABLE
(file_format => 'parquet', pattern => '.*/rails_test_app_test.purchase_log/1/.*.parquet')
overwrite
)
;
ちなみに、Snowpipe作成のためのフォーマットは、以下のようになっている。
create pipe [Snowpipeの名称] auto_ingest=true as
copy into [ロード先となるテーブル]
from @[外部ステージの名称]
overwrite
;
※overwrite
をつけないと、データが上書きではなく挿入される形で追加される。(同じデータが重複してしまう形になるので注意する。
2)SQSの設定を行う
2)-1.「SQS」の情報を取得する
下記のクエリを実行して、
SQSのARNを取得し、次の行程にて、S3バケットの「イベント通知」の設定で通知先として指定する。
※SQSのARNは、以下のクエリで実行して抽出したデータのnotification_channel
の値のこと
show pipes;
2)-2.S3バケットでSQSへの通知設定を有効化する
S3コンソールの「プロパティ」をクリックし、
「イベント通知を作成」をクリックし、イベント通知を設定する。
細かい設定はこちらの記事を参考に進めた。
2)-1
で取得したSQSのARNは、このイベント通知の設定で、送信先として登録する。
3)S3にファイルをアップロードする
3)-1.S3バケットに対する権限のポリシーを作成する
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetBucketLocation"
],
"Resource": "arn:aws:s3:::[バケット名]",
"Condition": {
"StringLike": {
"s3:prefix": [
"*"
]
}
}
},
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject",
"s3:GetObjectVersion",
"s3:DeleteObject",
"s3:DeleteObjectVersion"
],
"Resource": [
"arn:aws:s3:::[バケット名]",
"arn:aws:s3:::[バケット名]/*"
]
},
{
"Sid": "AllowUseOfTheKey",
"Effect": "Allow",
"Action": [
"kms:Encrypt",
"kms:Decrypt",
"kms:ReEncrypt*",
"kms:GenerateDataKey*",
"kms:DescribeKey"
],
"Resource": "[Lambdaを使ってRDSからS3にデータをロードするときに使用したKMSのARN]"
}
]
}
※ここでポリシーにKMS周りの権限を付与していなくてつまずいた
自分は、LambdaでStartExportTaskを使用して、
RDSからS3にスナップショットをロードするようにしていた。
StartExportTaskを使用してロードする場合、KMSキーで暗号化することが必須である。
Snowpipeを実装しているのにも関わらず、S3のデータが暗号化されていることが原因で、S3にスナップショットがエクスポートされても、Snowflakeにデータが自動でロードされないという不具合につまずいてしまった。
こちらの記事で統合ステージに許可しているS3バケットに対する権限のロールのポリシーにKMSの権限を持たせることで解決することができた。
※過去に、LambdaでRDSのスナップショットをS3にロードできるようにした記事は以下。
3)-2.RDSのスナップショットをS3にロードしてみる
最後に、Lambdaを実行して、RDSのスナップショットをS3にロードした。
その後、以下のクエリを実行したところ、Snowpipeで自動的にデータがロードされていることを確認することができた。
select * from purchase_log;
まとめ
以上で、Snowpipe機能を使用して、S3にデータがエクスポートされたと同時に、
Snowflakeのテーブルにも自動でデータがロードされる機能を実装することができた。
おまけ
SQSとS3バケットの「イベント通知」を初めて使ったので、
どういう機能なのかきちんと調べてみた。
SQS(Simple Queue Service)とは
フルマネージドのメッセージキューイングサービス。
メッセージキューイングサービスは、システム間で送信するデータを一時的に溜め込む場所を設けて、そこから順次送受信していく仕組み。
今回の記事では、データ送信側アプリケーション(プロデューサーと言う)であるS3がSQSにメッセージを送り、
データ受信側アプリケーション(コンシューマーと言う)であるSnowpipeがSQSからメッセージを取得していると言う動きをしている。
ちなみに、SQSで用意されているキュー方式には標準キュー
とFIFOキュー
の2つ種類があるようだった。
標準キューとは
メッセージを順不同で送るキュー方式で、メッセージを2回以上送ってしまう可能性がある。
「1回だけ」「順序通りに」メッセージを送る用途には向いていない。
しかし、高速なキューイング処理が可能。
FIFOキューとは
先入れ先出しキューとも呼ばれていて、メッセージを「1回だけ」「順序通りに」送信するキュー方式。
受け取ったメッセージを受け取った順に送信するよう制御されている。
S3バケットの「イベント通知」
S3バケットで特定のイベントが発生したときに、AWSの他のサービスに通知することができる。
S3では、AWS SNS
, AWS SQS
, AWS Lambda
の3つのサービスにイベントを通知することができる。
今回の記事では、S3バケットにファイルがPUTされたというイベントが発生して、
AWS SQS
にS3でイベントが発生したことを通知したと言うイメージ。
参考記事