2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Snowflake Openflowのセットアップ~DB連携まで

Posted at

みなさん、こんにちは!

Openflow は、任意のソースから任意のターゲットへデータを受け渡しできる機能です。
Apache NiFi をベースに構築されており、外部のETLツールを介さずエンドツーエンドでETL処理を実現することができます。

本記事では、Openflow のセットアップからDB連携までの手順を詳しくご紹介します。

※執筆時点でAWSのみ利用可能です。AWS以外のクラウドを選択した場合、メニューに「Openflow」が表示されず利用できないためご注意ください。

環境構築(Openflow)

事前準備

Snowflake のワークシートで以下のSQLを実行します。

qiita.rb
USE ROLE ACCOUNTADMIN;

-- イメージリポジトリの作成と権限付与
CREATE DATABASE IF NOT EXISTS OPENFLOW;
USE OPENFLOW;
CREATE SCHEMA IF NOT EXISTS OPENFLOW;
USE SCHEMA OPENFLOW;
CREATE IMAGE REPOSITORY IF NOT EXISTS OPENFLOW;
grant usage on database OPENFLOW to role public;
grant usage on schema OPENFLOW to role public;
grant read on image repository OPENFLOW.OPENFLOW.OPENFLOW to role public;

-- OPENFLOW_ADMIN_ROLEの作成と権限付与
CREATE ROLE IF NOT EXISTS OPENFLOW_ADMIN_ROLE;
GRANT CREATE OPENFLOW DATA PLANE INTEGRATION ON ACCOUNT TO ROLE OPENFLOW_ADMIN_ROLE;
GRANT CREATE OPENFLOW RUNTIME INTEGRATION ON ACCOUNT TO ROLE OPENFLOW_ADMIN_ROLE;

-- Openflowユーザのセカンダリロール設定
ALTER USER <user_name> SET DEFAULT_SECONDARY_ROLES = ('ALL');

<user_name>は自分のユーザ名に置き換えてください。

メニューの「データ」→「Openflow」を開き、「Openflowを起動」をクリックします。
image.png

「Invalid consent request」と出て開けなかった場合、ユーザのデフォルトロールがADMINロールになっているため、以下のSQLでデフォルトロールを変更します。

qiita.rb
ALTER USER <user_name> SET DEFAULT_ROLE = 'PUBLIC';

Openflow を起動すると、以下のような画面が開きます。
image.png

ネットワーク環境構築(Bring your own VPC の場合)

AWSで、デプロイメントを作成するためのネットワーク環境を構築します。

作成するリソースは以下の通りです。

リソース 説明
VPC Openflow 専用のネットワーク空間
Public Subnet x 2 異なる2つのAZにパブリックサブネットを配置する。EKSのLB用に以下のタグを付与する:kubernetes.io/role/elb = 1
Private Subnet x 2 異なる2つのAZにプライベートサブネットを配置する
インターネットゲートウェイ パブリックサブネットのインターネット通信用
NATゲートウェイ プライベートサブネットのインターネット通信用
ルートテーブル サブネット間の通信制御・Public Subnet -> インターネットゲートウェイ・Private Subnet -> NATゲートウェイ

※Managed VPC の場合も同様の設定になっていることを確認します。

実際の設定例:
image.png

デプロイメント作成

「Create a deployment」をクリックし、デプロイメントの設定を行っていきます。
image.png

VPC構成はマネージドVPCと既存VPCのいずれかを選ぶことができます。
自動作成のVPCを利用するため、「Managed VPC」を選択します。
デプロイメントのオーナーロールは OPENFLOWADMINROLE を選択しておきます。
image.png
設定が完了したら、「Create deployment」をクリックします。

デプロイメントが作成されると、以下の画面が表示されます。
「Download template」でテンプレートをダウンロードします。(以下の画面はDeployments一覧の「Installation details」からも確認可能)
image.png

Snowsightで以下のSQLコマンドを実行します。
ネットワークポリシーを使用して Snowflake へのアクセスを制御している場合、コメントアウト部分を外して実行してください。(NATGATEWAYPUBLIC_IP は設定したNATゲートウェイのIPアドレスに置き換えてください)

qiita.rb
/*
-- ネットワークポリシーの設定
USE ROLE ACCOUNTADMIN;
USE DATABASE {REPLACE_WITH_YOUR_DB_NAME};

CREATE NETWORK RULE allow_openflow_deployment
MODE = INGRESS
TYPE = IPV4
VALUE_LIST = ('{$NAT_GATEWAY_PUBLIC_IP}/32');

-- Run this command to find your currently active network policy and copy the value column
SHOW PARAMETERS LIKE 'NETWORK_POLICY' IN ACCOUNT;
-- Now add the new network rule to this policy
ALTER NETWORK POLICY {ENTER_YOUR_ACTIVE_NETWORK_POLICY_NAME} ADD ALLOWED_NETWORK_RULE_LIST = (allow_openflow_deployment);
*/

-- イベントテーブルの作成
USE ROLE accountadmin;
GRANT create event table on schema OPENFLOW.OPENFLOW to role $ROLE_OF_DEPLOYMENT_OWNER;

USE ROLE $ROLE_OF_DEPLOYMENT_OWNER;
CREATE event table if not exists openflow.openflow.openflow_events;

-- Find the Data Plane Integrations
SHOW openflow data plane integrations;

ALTER openflow data plane integration
$OPENFLOW_deployment_UUID
SET event_table = 'openflow.openflow.openflow_events';

ダウンロードしたテンプレートファイル(YAML)をもとに CloudFormation でスタックの作成を行います。
image.png

設定はすべてデフォルトのままで次へ進み、作成を行います。

スタックが「CREATE_COMPLETE」状態になった後、OpenflowAgentInstance のEC2が起動し、必要な残りのリソースの作成を行います。
Snowflake Image Repository からのイメージダウンロードやEKSクラスターのデプロイがEC2によって自動的に実行されます。

リソースの作成が完了し、Snowflake側との接続が確立するとデプロイメントの状態が「Active」になります。(処理が問題なく進んだ場合でも、リソース作成からActive状態になるまでは45分ほどかかります)
image.png

ランタイム作成

「Create a runtime」をクリックし、ランタイムの作成を行います。
image.png

ランタイム名を入力します。
その他は適切な値を設定し、「Create」をクリックします。
image.png

ランタイムの作成は約2~3分で完了します。作成が完了すると、ランタイムの状態が「Active」になります。
image.png

メニューから「View canvas」をクリックし、アクセスを許可すると Openflow canvas が開きます。
image.png
image.png
以上で Openflow の環境構築は完了です。

環境構築(データベース)

レプリケーション元となるデータベースの作成と設定を行います。

データベース作成

AWS RDS で PostgreSQL データベースを作成します。
Snowflake からアクセスできるよう、パブリックアクセスを有効にしておきます。

データベースが作成されたら、「パラメータグループ」で以下の変更を保存します。(今回の場合、カスタムパラメータグループを使用しています)
rds.logical_replication=1
image.png

「変更」→「DB パラメータグループ」で変更したパラメータグループを指定して保存し、RDSを再起動します。

レプリケーション設定

psqlまたはクライアントアプリケーションでレプリケーションの設定を行っていきます。

今回の検証では、A5:SQL Mk-2 を使用します。RDSへの接続は以下のように設定します。
image.png
image.png

接続できたら、以下のSQLコマンドを順に実行します。

qiita.rb
-- レプリケーションが有効になっていることを確認
SELECT name,setting FROM pg_settings WHERE name IN ('wal_level','rds.logical_replication');
name setting
rds.logical_replication on
wal_level logical
qiita.rb
-- コネクタ用のユーザー作成と権限付与
CREATE USER openflow_user WITH PASSWORD '<任意のパスワード>';
GRANT rds_replication TO openflow_user;
qiita.rb
-- レプリケーション用テーブル作成
CREATE TABLE sample_data (
    id SERIAL PRIMARY KEY,
    content TEXT NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

INSERT INTO sample_data (content)
VALUES
    ('aaa'),
    ('bbb');
id content created_at
1 aaa 2025/06/26 17:46:32.468
2 bbb 2025/06/26 17:46:32.468
qiita.rb
-- publication の作成テーブル追加
CREATE PUBLICATION openflow_pub;
ALTER PUBLICATION openflow_pub ADD TABLE sample_data;

-- ユーザへの権限付与
GRANT USAGE ON SCHEMA public TO openflow_user;
GRANT SELECT ON TABLE sample_data TO openflow_user;

コネクタ設定

事前準備

Snowflake のワークシートで以下のSQLを実行します。

RSAキーペアの設定についてはこちらの記事の「1.認証設定」部分をご参照ください。

qiita.rb
-- レプリケーション用データベース作成
CREATE DATABASE openflow_replicated_data;

-- レプリケーションを行うOpenflowユーザの作成と権限付与
CREATE USER openflow_service_user TYPE=SERVICE COMMENT='Service user for automated access of Openflow';
CREATE ROLE openflow_service_role;
GRANT ROLE openflow_service_role TO USER openflow_service_user;
GRANT USAGE ON DATABASE openflow_replicated_data TO ROLE openflow_service_role;
GRANT CREATE SCHEMA ON DATABASE openflow_replicated_data TO ROLE openflow_service_role;

-- Openflowウェアハウスの作成と権限付与
CREATE WAREHOUSE openflow_compute_wh
  WITH
    WAREHOUSE_SIZE = 'MEDIUM'
    AUTO_SUSPEND = 300
    AUTO_RESUME = TRUE;
GRANT USAGE, OPERATE ON WAREHOUSE openflow_compute_wh TO ROLE openflow_service_role;

-- RSA公開鍵の設定
ALTER USER openflow_service_user SET RSA_PUBLIC_KEY = '<RSA公開鍵>';

コネクタ定義設定

こちらを参考にコネクタ定義の設定を行います。

以上でDB連携までの手順は完了です!お疲れ様でした。

トラブルシューティング

デプロイメントの状態が「Inactive」のまま進まない

openflow-agent-{data-plane-key} という名前のEC2インスタンスに接続し、エラーがないかログを確認します。
image.png

ログ確認用コマンド:

journalctl -xe -f -n 100 -u docker
journalctl -u openflow-apply-infrastructure -f -n 500
journalctl -u openflow-sync-images -f -n 500
tail -500f /var/log/cloud-init-output.log

今回の場合、以下のようなエラーが繰り返し発生していました。

[ec2-user@ip-10-10-6-22 ~]$ journalctl -xe -f -n 100 -u docker
Jun 23 06:12:24 ip-10-10-6-22.ap-northeast-1.compute.internal dockerd[27020]: time="2025-06-23T06:12:24.488934848Z" level=error msg="Upload failed: name unknown: The repository with name 'snowflake-openflow/runtime-operator' does not exist in the registry with id 'xxxxxxxxxxxx'"

原因はテンプレートファイルの環境変数の指定でした。

exists_check=$(aws ecr describe-repositories --region $AWS_REGION --repository-names $1 2>&1)
if [ $? -ne 0 ]; then
    if echo ${!exists_check} | grep -q RepositoryNotFoundException; then
    aws ecr create-repository --region $AWS_REGION --repository-name $1 --tags $agent_tags
    else
    >&2 echo ${!exists_check}
    fi
fi

${!exists_check}は関節参照で、変数exists_checkを展開した結果が格納されます。
ECRリポジトリが存在しなかった場合、exists_checkには以下のようなメッセージが格納されます。

An error occurred (RepositoryNotFoundException) when calling the DescribeRepositories operation: The repository with name 'snowflake-openflow/openflow-data-plane-agent-aws' does not exist in the registry with id 'xxxxxxxxxxxx'

そのため${!exists_check}${An error occurred ...}となりますが、そのような変数はないためエラーとなり、以下のif文部分のコマンド実行結果が0(正常終了)になることはありません。

echo ${!exists_check} | grep -q RepositoryNotFoundException;

よって、対象のECRリポジトリが存在しない場合、本来実行されるべき以下のコマンドが実行されず、エラーのままとなります。

aws ecr create-repository --region $AWS_REGION --repository-name $1 --tags $agent_tags

以上の内容を踏まえて、環境変数の指定誤りを修正し、再デプロイします。

テンプレートファイルを開き、以下の部分(OpenflowAgentInstance以降)をコピー&ペーストで置き換えればOKです。

▶ クリックして展開
※本来のものから間接参照その他を修正しています。

課金について

便利な Openflow ですが、課金には注意が必要です。

Openflow の課金対象要素は以下の通りです。

種類 説明
サービス Openflow のアクティブなコネクタランタイムに対して発生する費用
基盤 AWS の場合、費用が発生する。・VPC:基盤となるネットワーク・EC2:デプロイメント、ランタイムで必要となるリソース・EKS:ランタイムで必要となるリソース・S3:ログの出力先
取り込み データの取り込みに対して発生する費用
テレメトリー イベントのログやメトリクスに対して発生する費用

特に注意が必要なのは基盤部分です。
image.png
画像:Understanding Openflow costs | Snowflake Documentationより引用

公式ドキュメントにもあるように、ランタイムを作成した時点で以下の3種類のインスタンスが起動します。

  • エージェントインスタンス(コンテナイメージの最新化、インフラ構築)
  • 管理コンテナ(Openflowアプリケーション全体の管理)
  • ランタイムコンテナ(レプリケーション処理実行)

各インスタンスはデフォルトで常時稼働状態であるため、使用しない時間帯は停止するなどして、課金の発生を抑える必要があります。
image.png

例:PostgreSQL (RDS) の100GBのデータを1か月間継続ロードした場合

  • サービス:0.0225 * 1 * 24 * 30 * $4.30 = $69.66
  • 基盤
    ・VPC:NATゲートウェイの合計データ処理量が200GBと仮定して、$0.062 * 24 * 30 + 200 * $0.062 = $57.04

 ・EC2:
 ・t3.medium:$0.0136 * 24 * 30 = $9.792
 ・m7i.xlarge:$0.2604 * 24 * 30 = $187.488
 ・m7i.2xlarge:$0.5208 * 24 * 30 = $374.976
 ・EKS:クラスター1つに対し、$0.10 * 24 * 30 = $72
 ・S3:ログの最大出力量を1GBとして、$0.025 * 1 = $0.025

  • 取り込み:Snowpipe Streaming で合計データ処理量200GBと仮定して、0.0037 * 200 * $4.30 = $3.182
  • テレメトリー:ログの最大出力量を1GBとして、0.02 * 1 * $4.30 = $0.086

上記を合計すると、$774.249 ≒ 108,395円(1ドル140円)

クレジットの詳細:CreditConsumptionTable.pdf

計算例でもわかる通り、EC2部分の料金が大半を占めています。
最小の構成でもそこそこ大きなインスタンスが立ってしまうので、注意する必要があります。

さいごに

Openflow の使い方について、詳しくご紹介しました。

Openflow は多くのコネクタが提供されており、様々なデータソースを一元的に集約して Snowflake に取り込むことができる便利なサービスですが、デフォルトである程度パワフルな構成になっているため、想像以上に課金が発生する点については注意が必要です。

特定のデータソースの連携が目的であれば、こちらの記事でもご紹介しているように Snowpipe Streaming といったサービスを使う方法もあります。
ご利用のユースケースに応じて、最適な方法を選択するようにしてください。

参考

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?