5
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

【データ基盤構築/Snowflake】SnowflakeのSnowpipeを使って、S3にファイルをエクスポートすると同時にSnowflakeのテーブルにも自動でロードされる仕組みを作る

Last updated at Posted at 2023-04-18

今回の課題

以前、下記の記事にて、S3からSnowflakeへのデータのロードを手動で行えるように実装したので、
今回はSnowpipe機能を使用し、ファイルがエクスポートされたと同時に自動的にSnowflakeにロードされる機能を実装したい。

実装する手順

前提として、外部ステージを作成しておく必要があります。
こちらの記事では、上記の記事で作成した外部ステージであるexternal_stageを使用する。
また、上記の記事でも記載されているが、ここでの外部ステージはS3のことである。

公式ドキュメントに記載がある通り、以下の流れで処理が走るように実装していく。

  1. データファイルはステージにロードされます。
  2. S3イベント通知は、SQS キューを介してSnowpipeにファイルをロードする準備ができたことを通知します。Snowpipeはファイルをキューにコピーします。
  3. Snowflakeが提供する仮想ウェアハウスは、指定されたパイプで定義されたパラメーターに基づいて、キューに入れられたファイルからターゲットテーブルにデータをロードします。
    -- https://docs.snowflake.com/ja/user-guide/data-load-snowpipe-auto-s3#option-1-creating-a-new-s3-event-notification-to-automate-snowpipe

1)Snowpipeを作成する

外部ステージのexternal_stageに対して、以下のようなクエリを実行する。

以下のクエリがどういった処理をするかというと、

外部ステージであるS3のバケット内の/rails_test_app_test.purchase_log/1/ディレクトリにparquet形式のファイルがエクスポートされると、
自動でCOPY INTOが走り、purchase_tableに外部ステージのデータがロードされる

といったイメージ。

create pipe snowpipe_practice auto_ingest = true as
  copy into DEMO_DB.PUBLIC.purchase_table
  from 
      (
      SELECT 
        $1:dt::varchar
        , $1:order_id::varchar
        , $1:user_id::varchar
        , $1:purchase_amount::varchar
      FROM
          @EXTERNAL_TABLE
          (file_format => 'parquet', pattern => '.*/rails_test_app_test.purchase_log/1/.*.parquet')
        overwrite
        )
;

ちなみに、Snowpipe作成のためのフォーマットは、以下のようになっている。

create pipe [Snowpipeの名称] auto_ingest=true as 
  copy into [ロード先となるテーブル]
  from @[外部ステージの名称]
  overwrite
;

overwriteをつけないと、データが上書きではなく挿入される形で追加される。(同じデータが重複してしまう形になるので注意する。

2)SQSの設定を行う

2)-1.「SQS」の情報を取得する

下記のクエリを実行して、
SQSのARNを取得し、次の行程にて、S3バケットの「イベント通知」の設定で通知先として指定する。
※SQSのARNは、以下のクエリで実行して抽出したデータのnotification_channelの値のこと

show pipes;

image.png

2)-2.S3バケットでSQSへの通知設定を有効化する

S3コンソールの「プロパティ」をクリックし、
image.png
「イベント通知を作成」をクリックし、イベント通知を設定する。
a7efbd3d94acd1c5511da699b18e6847.png

細かい設定はこちらの記事を参考に進めた。
2)-1で取得したSQSのARNは、このイベント通知の設定で、送信先として登録する。

3)S3にファイルをアップロードする

3)-1.S3バケットに対する権限のポリシーを作成する

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket",
                "s3:GetBucketLocation"
            ],
            "Resource": "arn:aws:s3:::[バケット名]",
            "Condition": {
                "StringLike": {
                    "s3:prefix": [
                        "*"
                    ]
                }
            }
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "s3:GetObjectVersion",
                "s3:DeleteObject",
                "s3:DeleteObjectVersion"
            ],
            "Resource": [
                "arn:aws:s3:::[バケット名]",
                "arn:aws:s3:::[バケット名]/*"
            ]
        },
        {
            "Sid": "AllowUseOfTheKey",
            "Effect": "Allow",
            "Action": [
                "kms:Encrypt",
                "kms:Decrypt",
                "kms:ReEncrypt*",
                "kms:GenerateDataKey*",
                "kms:DescribeKey"
            ],
            "Resource": "[Lambdaを使ってRDSからS3にデータをロードするときに使用したKMSのARN]"
        }
    ]
}

※ここでポリシーにKMS周りの権限を付与していなくてつまずいた

自分は、LambdaでStartExportTaskを使用して、
RDSからS3にスナップショットをロードするようにしていた。
StartExportTaskを使用してロードする場合、KMSキーで暗号化することが必須である。

Snowpipeを実装しているのにも関わらず、S3のデータが暗号化されていることが原因で、S3にスナップショットがエクスポートされても、Snowflakeにデータが自動でロードされないという不具合につまずいてしまった。
こちらの記事で統合ステージに許可しているS3バケットに対する権限のロールのポリシーにKMSの権限を持たせることで解決することができた。

※過去に、LambdaでRDSのスナップショットをS3にロードできるようにした記事は以下。

3)-2.RDSのスナップショットをS3にロードしてみる

最後に、Lambdaを実行して、RDSのスナップショットをS3にロードした。
その後、以下のクエリを実行したところ、Snowpipeで自動的にデータがロードされていることを確認することができた。

select * from purchase_log;

まとめ

以上で、Snowpipe機能を使用して、S3にデータがエクスポートされたと同時に、
Snowflakeのテーブルにも自動でデータがロードされる機能を実装することができた。

おまけ

SQSとS3バケットの「イベント通知」を初めて使ったので、
どういう機能なのかきちんと調べてみた。

SQS(Simple Queue Service)とは

フルマネージドのメッセージキューイングサービス。
メッセージキューイングサービスは、システム間で送信するデータを一時的に溜め込む場所を設けて、そこから順次送受信していく仕組み。

今回の記事では、データ送信側アプリケーション(プロデューサーと言う)であるS3がSQSにメッセージを送り、
データ受信側アプリケーション(コンシューマーと言う)であるSnowpipeがSQSからメッセージを取得していると言う動きをしている。

ちなみに、SQSで用意されているキュー方式には標準キューFIFOキューの2つ種類があるようだった。

標準キューとは

メッセージを順不同で送るキュー方式で、メッセージを2回以上送ってしまう可能性がある。
「1回だけ」「順序通りに」メッセージを送る用途には向いていない。
しかし、高速なキューイング処理が可能。

FIFOキューとは

先入れ先出しキューとも呼ばれていて、メッセージを「1回だけ」「順序通りに」送信するキュー方式。
受け取ったメッセージを受け取った順に送信するよう制御されている。

S3バケットの「イベント通知」

S3バケットで特定のイベントが発生したときに、AWSの他のサービスに通知することができる。
S3では、AWS SNS, AWS SQS, AWS Lambdaの3つのサービスにイベントを通知することができる。

今回の記事では、S3バケットにファイルがPUTされたというイベントが発生して、
AWS SQSにS3でイベントが発生したことを通知したと言うイメージ。

参考記事

5
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?