概要
Fivetran の Managed Data Lakes Service の Apache Iceberg テーブルを Snowflake に登録する手順を紹介します。下記のドキュメントにて手順が記載されていますので、合わせて確認してください。
本記事は下記記事の一部です。
出所:フルマネージドなデータ連携:データ統合の自動化を実現する Fivetran の全貌 #fivetran - Qiita
事前準備
下記の記事を参考に Fivetran の Managed Data Lakes Service によりデータ同期を実施します。
手順
Fivetran にて Snowflake で実行するコードを取得
Destinations
-> 作成した Destination -> Catalog integration
-> Snowflake
の値をコピーします。
上記の SQL には外部 Volume を作成する SQL と 外部カタログを作成する SQL が含まれています。
出所:Fivetran Iceberg REST Catalog | Integration and documentation
出所:Fivetran Iceberg REST Catalog | Integration and documentation
Snowflake にて 外部 Volume を作成
外部 Volume を作成します。
CREATE OR REPLACE EXTERNAL VOLUME fivetran_volume_jokester_waterfall
STORAGE_LOCATIONS =
(
(
NAME = 'jokester_waterfall-default'
STORAGE_PROVIDER = 'AZURE'
STORAGE_BASE_URL = 'azure://fivetranmdls01.blob.core.windows.net/adls-container'
AZURE_TENANT_ID = '095ca098-XXXX-XXXX-XXXXXXX'
)
);
AZURE_CONSENT_URL の URL を取得します。
DESC EXTERNAL VOLUME fivetran_volume_jokester_waterfall;
SELECT
PARSE_JSON($4):AZURE_MULTI_TENANT_APP_NAME::string AS AZURE_MULTI_TENANT_APP_NAME,
PARSE_JSON($4):AZURE_CONSENT_URL::string AS AZURE_CONSENT_URL
FROM TABLE(RESULT_SCAN(LAST_QUERY_ID()))
WHERE $2 = 'STORAGE_LOCATION_1';
Azure にログインしているブラウザにて、先ほど取得した URL にアクセスして、承認
を選択します。
追加されたサービスプリンシパルにストレージ BLOB データ共同作成者権限
を付与します。
外部 Volume の検証が正常終了することを確認します。
SELECT SYSTEM$VERIFY_EXTERNAL_VOLUME('fivetran_volume_jokester_waterfall');
カタログ統合を作成
{fivetran_delivered_schema_name}
の値を書き換えた上で、カタログ統合を作成します。
CREATE OR REPLACE CATALOG INTEGRATION fivetran_catalog_jokester_waterfall
CATALOG_SOURCE = POLARIS
TABLE_FORMAT = ICEBERG
CATALOG_NAMESPACE = '{fivetran_delivered_schema_name}'
REST_CONFIG = (
CATALOG_URI = 'https://polaris.fivetran.com/api/catalog'
WAREHOUSE = 'jokester_waterfall'
)
REST_AUTHENTICATION = (
TYPE = OAUTH
OAUTH_TOKEN_URI = 'https://polaris.fivetran.com/api/catalog/v1/oauth/tokens'
OAUTH_CLIENT_ID = 'ffb13f98be9c2bc3'
OAUTH_CLIENT_SECRET = '88c7ed5096727d8eb977a822ddf1adac'
OAUTH_ALLOWED_SCOPES = ('PRINCIPAL_ROLE:ALL')
)
ENABLED = TRUE,
REFRESH_INTERVAL_SECONDS = 300;
{fivetran_delivered_schema_name}
の値は、 Connections から取得できるようです。今回は非データベースのコネクターであるため、fivetran_metadata_jokester_waterfall
を指定しています。
Apache Iceberg テーブルを作成
データベースを作成します。
CREATE DATABASE IF NOT EXISTS FIVETRAN_CATALOG;
Apache Iceberg テーブルを作成します。
USE DATABASE FIVETRAN_CATALOG;
CREATE OR REPLACE ICEBERG TABLE DESTINATION
CATALOG = 'FIVETRAN_CATALOG_JOKESTER_WATERFALL'
EXTERNAL_VOLUME = 'FIVETRAN_VOLUME_JOKESTER_WATERFALL'
CATALOG_TABLE_NAME = 'destination'
AUTO_REFRESH = True
;
CATALOG_TABLE_NAME
に指定する名称は、 Connector のSchema
タブで確認できます。
テーブルにクエリを発行できることを確認します。
SELECT * FROM DESTINATION;