<やりたいこと>
Snowpipeを作成して、AWSのS3バケットに置かれたファイルのデータをSnowflakeのテーブルに自動でロードさせるようにします。
Snowpipeの作成手順を忘れないために今回の投稿に残しておこうと思います。
Snowpipeを使うと、以下①~③のフローにてS3バケット上のファイルのデータがSnowflakeのテーブルにロードされます。
① S3バケットのフォルダBにファイルが作成されたことが検知されます。
② S3バケットのイベント通知機能によりファイルの存在がAWSのSQSに通知されます。
③ 通知後、SnowflakeからCOPYコマンドが自動実行されます。
<前提条件>
1.SnowflakeのエディションはStandard以上であること。
2.AWSとSnowflakeが同じリージョンであること。
3.Snowflake上に外部ステージが作成されていること。
外部ステージの作成については、以前投稿した記事を参照いただけるようお願いします。
<作業手順>
① Snowpipeを作成します。
② S3バケットでSQSへの通知設定を有効化します。
① Snowpipeを作成します。
①-1 Snowpipeで使用する「外部ステージ」を確認します。
以下のSQLを実行して、Snowpipeで指定する「外部ステージ」の名称を取得し、メモ帳に控えます。
select stage_schema,
stage_name,
stage_url,
stage_type
from <外部ステージが保存されているデータベース>.information_schema.stages;
①-2 CREATE PIPEを実行してSnowpipeを作成します。
手順①-1で確認した「外部ステージ」を適用して以下のSQLを実行します。
create pipe <Snowpipeの名称> auto_ingest=true as
copy into <COPYコマンドでロード先となるテーブル>
from @<外部ステージの名称>;
①-3 Snowpipeで使う「SQS」の情報を取得します。
SQSのARNを取得し、S3バケットの「イベント通知」の設定で通知先として指定します。
取得したARNは、後述の手順で使うのでメモ帳に控えておきます。
show pipes;
② S3バケットでSQSへの通知設定を有効化します。
②-1 AWSのS3コンソールを開き、「外部ステージ」の参照先フォルダのS3 URIを取得します。
後続の手順の「イベント通知」の設定で、プレフィクスとして使うのでメモ帳に控えておきます。
以下は、バケット名を含めたパスですが、プレフィクスの部分のみを使います。
s3://muramatsu-snowflake/trial_db/trial_schema/
②-2 S3コンソールの「プロパティ」を開き、「イベント通知」を設定します。
<検証>
以下①~⑤の手順にてSnowpipeが正常に機能することを確認します。
① Snowpipeのロード先のテーブルにデータが存在しないこを確認します。
② Snowpipeのロード先のテーブル定義を確認します。
③ Snowpipeのロード先のテーブル定義に一致するテストデータを用意し、外部ステージの参照先S3バケットにファイルをアップロードします。
④ ファイルをアップロードしてから2~3分後にテーブルに対してSELECTを実行し、データがロードできたこと確認します。
⑤ 「INFORMATION_SCHEMA.LOAD_HISTORY」に対してSELECTを実行し、Snowpipeでのロードが成功したことを確認します。
① Snowpipeのロード先のテーブルにデータが存在しないこを確認します。
ロード先のテーブルにSELECTを実行してデータが存在しないことを確認します。
② Snowpipeのロード先のテーブル定義を確認します。
select get_ddl('table','Snowpipeでのロード先のテーブル名');
③ Snowpipeのロード先のテーブル定義に一致するテストデータを用意し、外部ステージの参照先S3バケットにファイルをアップロードします。
④ ファイルをアップロードしてから2~3分後にテーブルに対してSELECTを実行し、データがロードできたこと確認します。
⑤ 「INFORMATION_SCHEMA.LOAD_HISTORY」に対してSELECTを実行し、Snowpipeのロードが成功したことを確認します。
以下のSQLの実行結果で、「ROW_COUNT」と「ROW_PARSED」が一致することを確認します。
select * from <Snowpipeのロード先のテーブルが格納されるデータベース名>.information_schema.load_history;
おわりに
SnowpipeによりS3バケットに作成されたファイルが自動でSnowflakeのテーブルにロードできることを確認しました。実際の運用では、不正なファイルによりSnowpipeが失敗することも想定され、エラーハンドリングも必要と考えます。エラーハンドリングの方法も考えてみたいと思います。
ご案内
株式会社ジールでは、「ITリテラシーがない」「初期費用がかけられない」「親切・丁寧な支援がほしい」「ノーコード・ローコードがよい」「運用・保守の手間をかけられない」などのお客様の声を受けて、オールインワン型データ活用プラットフォーム「ZEUSCloud」を月額利用料にてご提供しております。
ご興味がある方は是非下記のリンクをご覧ください:
https://www.zdh.co.jp/products-services/cloud-data/zeuscloud/?utm_source=qiita&utm_medium=referral&utm_campaign=qiita_zeuscloud_content-area