はじめに
AWS Database Migration Service(以下、DMS)はデータソースに対する更新をキャプチャーし、更新差分データをターゲットに伝搬することができるAWSサービスです。ここではDMSのデータソースをDb2、ターゲットをS3にしてSnowflake上のテーブルへのロードを試します。
DMSからS3への連携はこちらの記事(AWS Database Migration Service(DMS)でDb2をS3に一括移行&(CDC的な)継続的レプリケーションする)で確認済みです。
ここではDMSから配置されたS3上のオブジェクトをSnowflakeに反映する部分にフォーカスしています。
前提
実施環境として、データソースとなるDb2はCloud Pak for Data上にデプロイされたものを使用しました。
Cloud Pak for Data:Version 4.7
Openshift:Version 4.12
Cloud:IBM Cloud
(外部公開用のPublic Load Balancer IPを使用して接続)
こちらを見ると2024年3月1日現在DMSがデータソースとして対応しているDb2のバージョンが確認できます。
- IBM DB 2 for Linux、UNIX、および Windows (Db2 LUW)
- バージョン 9.7、すべてのフィックスパック
- バージョン 10.1、すべてのフィックスパック
- バージョン 10.5、フィックスパック 5 を除くすべてのフィックスパック
- バージョン 11.1、すべてのフィックスパック
- バージョン 11.5 Mods (1~8)、フィックスパック 0 のみ
- IBM Db2 for z/OS
- バージョン 12
- Amazon RDS for IBM Db2 LUW
用意したDb2環境は上記の対応製品・バージョンに該当していないような気もしましたが、そこはあまりこだわらず試しています。
参考:DMSのAWS公式
実施手順
-
SnowflakeからS3を外部ステージとして使用できるようにする
Showflake公式の Amazon S3へのセキュアアクセスの構成 を参考に実施します。
ここでは以下の3つの方法について記載があります。オプション1:Amazon S3にアクセスするためのSnowflakeストレージ統合の構成 オプション2:Amazon S3にアクセスするための AWS IAM ロールの設定 --- 廃止 オプション3:Amazon S3にアクセスするための AWS IAM ユーザー認証情報の設定
オプション1が推奨とされています。
オプション2は廃止となっています。
今回は、セキュリティ面ではオプション1が良いと考えられますが、楽をとってオプション3で実施します。
(既存のS3アクセス権限のあるIAMユーザーの認証情報を使用する) -
S3のファイルをSnowflakeのテーブルに反映する
やってみた
- SnowflakeからS3を外部ステージとして使用できるようにする
-- データベース、スキーマの指定
USE SCHEMA MY_DATABASE.PUBLIC;
-- 外部ステージを定義(*****は認証情報をマスキング)
CREATE OR REPLACE STAGE my_S3_stage
URL='s3://yssk-mybacket/snowflake-test/'
CREDENTIALS=(AWS_KEY_ID='*****' AWS_SECRET_KEY='*****')
ENCRYPTION=(TYPE='AWS_SSE_KMS' KMS_KEY_ID = 'aws/key');
(指定したCREDENTIALSはS3へのアクセス権限があるIAMユーザーのもの)
Snowsightで外部ステージを確認
list @my_S3_stage;
SELECT t.$1, t.$2, t.$3 FROM @my_S3_stage t WHERE t.$1 NOT IN ('I','U','D');
SELECT t.$1, t.$2, t.$3, t.$4 FROM @my_S3_stage t WHERE t.$1 IN ('I','U','D');
- S3のファイルをSnowflakeのテーブルに反映する
COPY INTO MY_DATABASE.PUBLIC.MY_LOAD_TEST FROM
(SELECT t.$1, t.$2, t.$3 FROM @my_S3_stage/LOAD00000001.csv t);
INSERT INTO MY_DATABASE.PUBLIC.MY_LOAD_TEST (NO, NAME, TEXT)
SELECT t.$2, t.$3, t.$4 FROM @my_S3_stage t WHERE t.$1 = 'I';
MERGE INTO MY_DATABASE.PUBLIC.MY_LOAD_TEST a
USING (
SELECT
$2 AS NO,
$3 AS NAME,
$4 AS TEXT
FROM
@my_S3_stage t
WHERE
$1 = 'U'
) t
ON (
a.NO = t.NO
)
WHEN MATCHED THEN
UPDATE SET
a.NAME = t.NAME,
a.TEXT = t.TEXT;
MERGE INTO MY_DATABASE.PUBLIC.MY_LOAD_TEST a
USING (
SELECT
$2 AS NO,
$3 AS NAME,
$4 AS TEXT
FROM
@my_S3_stage t
WHERE
$1 = 'D'
) t
ON (
a.NO = t.NO
)
WHEN MATCHED THEN
DELETE;
まとめ
やりたいことは確認できました。
実運用ではタスクを設定して一定間隔でINSERT・UPDATE・DELETE分を随時反映させたり、Snowpipeによるストリーミング処理を実装してS3へのオブジェクト配置をトリガーに即ロードさせたりする必要があるのではないかと思います。
また、エラーハンドリングなども必要になります。
(本稿を作成中に発見した クラスメソッド様の記事:AWS DMSから出力されたファイルをSnowflakeにロードしてみた がSnowpipeを使用していて非常に参考になります)
取っ掛かりとして、参考になればと思います。