みなさん、こんにちは!
Openflow は、任意のソースから任意のターゲットへデータを受け渡しできる機能です。
Apache NiFi をベースに構築されており、外部のETLツールを介さずエンドツーエンドでETL処理を実現することができます。
本記事では、Openflow のセットアップからDB連携までの手順を詳しくご紹介します。
※執筆時点でAWSのみ利用可能です。AWS以外のクラウドを選択した場合、メニューに「Openflow」が表示されず利用できないためご注意ください。
環境構築(Openflow)
事前準備
Snowflake のワークシートで以下のSQLを実行します。
USE ROLE ACCOUNTADMIN;
-- イメージリポジトリの作成と権限付与
CREATE DATABASE IF NOT EXISTS OPENFLOW;
USE OPENFLOW;
CREATE SCHEMA IF NOT EXISTS OPENFLOW;
USE SCHEMA OPENFLOW;
CREATE IMAGE REPOSITORY IF NOT EXISTS OPENFLOW;
grant usage on database OPENFLOW to role public;
grant usage on schema OPENFLOW to role public;
grant read on image repository OPENFLOW.OPENFLOW.OPENFLOW to role public;
-- OPENFLOW_ADMIN_ROLEの作成と権限付与
CREATE ROLE IF NOT EXISTS OPENFLOW_ADMIN_ROLE;
GRANT CREATE OPENFLOW DATA PLANE INTEGRATION ON ACCOUNT TO ROLE OPENFLOW_ADMIN_ROLE;
GRANT CREATE OPENFLOW RUNTIME INTEGRATION ON ACCOUNT TO ROLE OPENFLOW_ADMIN_ROLE;
-- Openflowユーザのセカンダリロール設定
ALTER USER <user_name> SET DEFAULT_SECONDARY_ROLES = ('ALL');
※<user_name>
は自分のユーザ名に置き換えてください。
メニューの「データ」→「Openflow」を開き、「Openflowを起動」をクリックします。
「Invalid consent request」と出て開けなかった場合、ユーザのデフォルトロールがADMINロールになっているため、以下のSQLでデフォルトロールを変更します。
ALTER USER <user_name> SET DEFAULT_ROLE = 'PUBLIC';
Openflow を起動すると、以下のような画面が開きます。
ネットワーク環境構築(Bring your own VPC の場合)
AWSで、デプロイメントを作成するためのネットワーク環境を構築します。
作成するリソースは以下の通りです。
リソース | 説明 |
---|---|
VPC | Openflow 専用のネットワーク空間 |
Public Subnet x 2 | 異なる2つのAZにパブリックサブネットを配置する。EKSのLB用に以下のタグを付与する:kubernetes.io/role/elb = 1
|
Private Subnet x 2 | 異なる2つのAZにプライベートサブネットを配置する |
インターネットゲートウェイ | パブリックサブネットのインターネット通信用 |
NATゲートウェイ | プライベートサブネットのインターネット通信用 |
ルートテーブル | サブネット間の通信制御・Public Subnet -> インターネットゲートウェイ・Private Subnet -> NATゲートウェイ |
※Managed VPC の場合も同様の設定になっていることを確認します。
デプロイメント作成
「Create a deployment」をクリックし、デプロイメントの設定を行っていきます。
VPC構成はマネージドVPCと既存VPCのいずれかを選ぶことができます。
自動作成のVPCを利用するため、「Managed VPC」を選択します。
デプロイメントのオーナーロールは OPENFLOWADMINROLE を選択しておきます。
設定が完了したら、「Create deployment」をクリックします。
デプロイメントが作成されると、以下の画面が表示されます。
「Download template」でテンプレートをダウンロードします。(以下の画面はDeployments一覧の「Installation details」からも確認可能)
Snowsightで以下のSQLコマンドを実行します。
ネットワークポリシーを使用して Snowflake へのアクセスを制御している場合、コメントアウト部分を外して実行してください。(NATGATEWAYPUBLIC_IP は設定したNATゲートウェイのIPアドレスに置き換えてください)
/*
-- ネットワークポリシーの設定
USE ROLE ACCOUNTADMIN;
USE DATABASE {REPLACE_WITH_YOUR_DB_NAME};
CREATE NETWORK RULE allow_openflow_deployment
MODE = INGRESS
TYPE = IPV4
VALUE_LIST = ('{$NAT_GATEWAY_PUBLIC_IP}/32');
-- Run this command to find your currently active network policy and copy the value column
SHOW PARAMETERS LIKE 'NETWORK_POLICY' IN ACCOUNT;
-- Now add the new network rule to this policy
ALTER NETWORK POLICY {ENTER_YOUR_ACTIVE_NETWORK_POLICY_NAME} ADD ALLOWED_NETWORK_RULE_LIST = (allow_openflow_deployment);
*/
-- イベントテーブルの作成
USE ROLE accountadmin;
GRANT create event table on schema OPENFLOW.OPENFLOW to role $ROLE_OF_DEPLOYMENT_OWNER;
USE ROLE $ROLE_OF_DEPLOYMENT_OWNER;
CREATE event table if not exists openflow.openflow.openflow_events;
-- Find the Data Plane Integrations
SHOW openflow data plane integrations;
ALTER openflow data plane integration
$OPENFLOW_deployment_UUID
SET event_table = 'openflow.openflow.openflow_events';
ダウンロードしたテンプレートファイル(YAML)をもとに CloudFormation でスタックの作成を行います。
設定はすべてデフォルトのままで次へ進み、作成を行います。
スタックが「CREATE_COMPLETE」状態になった後、OpenflowAgentInstance のEC2が起動し、必要な残りのリソースの作成を行います。
Snowflake Image Repository からのイメージダウンロードやEKSクラスターのデプロイがEC2によって自動的に実行されます。
リソースの作成が完了し、Snowflake側との接続が確立するとデプロイメントの状態が「Active」になります。(処理が問題なく進んだ場合でも、リソース作成からActive状態になるまでは45分ほどかかります)
ランタイム作成
「Create a runtime」をクリックし、ランタイムの作成を行います。
ランタイム名を入力します。
その他は適切な値を設定し、「Create」をクリックします。
ランタイムの作成は約2~3分で完了します。作成が完了すると、ランタイムの状態が「Active」になります。
メニューから「View canvas」をクリックし、アクセスを許可すると Openflow canvas が開きます。
以上で Openflow の環境構築は完了です。
環境構築(データベース)
レプリケーション元となるデータベースの作成と設定を行います。
データベース作成
AWS RDS で PostgreSQL データベースを作成します。
Snowflake からアクセスできるよう、パブリックアクセスを有効にしておきます。
データベースが作成されたら、「パラメータグループ」で以下の変更を保存します。(今回の場合、カスタムパラメータグループを使用しています)
rds.logical_replication
=1
「変更」→「DB パラメータグループ」で変更したパラメータグループを指定して保存し、RDSを再起動します。
レプリケーション設定
psqlまたはクライアントアプリケーションでレプリケーションの設定を行っていきます。
今回の検証では、A5:SQL Mk-2 を使用します。RDSへの接続は以下のように設定します。
接続できたら、以下のSQLコマンドを順に実行します。
-- レプリケーションが有効になっていることを確認
SELECT name,setting FROM pg_settings WHERE name IN ('wal_level','rds.logical_replication');
name | setting |
---|---|
rds.logical_replication | on |
wal_level | logical |
-- コネクタ用のユーザー作成と権限付与
CREATE USER openflow_user WITH PASSWORD '<任意のパスワード>';
GRANT rds_replication TO openflow_user;
-- レプリケーション用テーブル作成
CREATE TABLE sample_data (
id SERIAL PRIMARY KEY,
content TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
INSERT INTO sample_data (content)
VALUES
('aaa'),
('bbb');
id | content | created_at |
---|---|---|
1 | aaa | 2025/06/26 17:46:32.468 |
2 | bbb | 2025/06/26 17:46:32.468 |
-- publication の作成、テーブル追加
CREATE PUBLICATION openflow_pub;
ALTER PUBLICATION openflow_pub ADD TABLE sample_data;
-- ユーザへの権限付与
GRANT USAGE ON SCHEMA public TO openflow_user;
GRANT SELECT ON TABLE sample_data TO openflow_user;
コネクタ設定
事前準備
Snowflake のワークシートで以下のSQLを実行します。
RSAキーペアの設定についてはこちらの記事の「1.認証設定」部分をご参照ください。
-- レプリケーション用データベース作成
CREATE DATABASE openflow_replicated_data;
-- レプリケーションを行うOpenflowユーザの作成と権限付与
CREATE USER openflow_service_user TYPE=SERVICE COMMENT='Service user for automated access of Openflow';
CREATE ROLE openflow_service_role;
GRANT ROLE openflow_service_role TO USER openflow_service_user;
GRANT USAGE ON DATABASE openflow_replicated_data TO ROLE openflow_service_role;
GRANT CREATE SCHEMA ON DATABASE openflow_replicated_data TO ROLE openflow_service_role;
-- Openflowウェアハウスの作成と権限付与
CREATE WAREHOUSE openflow_compute_wh
WITH
WAREHOUSE_SIZE = 'MEDIUM'
AUTO_SUSPEND = 300
AUTO_RESUME = TRUE;
GRANT USAGE, OPERATE ON WAREHOUSE openflow_compute_wh TO ROLE openflow_service_role;
-- RSA公開鍵の設定
ALTER USER openflow_service_user SET RSA_PUBLIC_KEY = '<RSA公開鍵>';
コネクタ定義設定
こちらを参考にコネクタ定義の設定を行います。
以上でDB連携までの手順は完了です!お疲れ様でした。
トラブルシューティング
デプロイメントの状態が「Inactive」のまま進まない
openflow-agent-{data-plane-key} という名前のEC2インスタンスに接続し、エラーがないかログを確認します。
ログ確認用コマンド:
journalctl -xe -f -n 100 -u docker
journalctl -u openflow-apply-infrastructure -f -n 500
journalctl -u openflow-sync-images -f -n 500
tail -500f /var/log/cloud-init-output.log
今回の場合、以下のようなエラーが繰り返し発生していました。
[ec2-user@ip-10-10-6-22 ~]$ journalctl -xe -f -n 100 -u docker
Jun 23 06:12:24 ip-10-10-6-22.ap-northeast-1.compute.internal dockerd[27020]: time="2025-06-23T06:12:24.488934848Z" level=error msg="Upload failed: name unknown: The repository with name 'snowflake-openflow/runtime-operator' does not exist in the registry with id 'xxxxxxxxxxxx'"
原因はテンプレートファイルの環境変数の指定でした。
exists_check=$(aws ecr describe-repositories --region $AWS_REGION --repository-names $1 2>&1)
if [ $? -ne 0 ]; then
if echo ${!exists_check} | grep -q RepositoryNotFoundException; then
aws ecr create-repository --region $AWS_REGION --repository-name $1 --tags $agent_tags
else
>&2 echo ${!exists_check}
fi
fi
${!exists_check}
は関節参照で、変数exists_check
を展開した結果が格納されます。
ECRリポジトリが存在しなかった場合、exists_check
には以下のようなメッセージが格納されます。
An error occurred (RepositoryNotFoundException) when calling the DescribeRepositories operation: The repository with name 'snowflake-openflow/openflow-data-plane-agent-aws' does not exist in the registry with id 'xxxxxxxxxxxx'
そのため${!exists_check}
は${An error occurred ...}
となりますが、そのような変数はないためエラーとなり、以下のif文部分のコマンド実行結果が0(正常終了)になることはありません。
echo ${!exists_check} | grep -q RepositoryNotFoundException;
よって、対象のECRリポジトリが存在しない場合、本来実行されるべき以下のコマンドが実行されず、エラーのままとなります。
aws ecr create-repository --region $AWS_REGION --repository-name $1 --tags $agent_tags
以上の内容を踏まえて、環境変数の指定誤りを修正し、再デプロイします。
テンプレートファイルを開き、以下の部分(OpenflowAgentInstance以降)をコピー&ペーストで置き換えればOKです。
▶ クリックして展開
※本来のものから間接参照その他を修正しています。
課金について
便利な Openflow ですが、課金には注意が必要です。
Openflow の課金対象要素は以下の通りです。
種類 | 説明 |
---|---|
サービス | Openflow のアクティブなコネクタランタイムに対して発生する費用 |
基盤 | AWS の場合、費用が発生する。・VPC:基盤となるネットワーク・EC2:デプロイメント、ランタイムで必要となるリソース・EKS:ランタイムで必要となるリソース・S3:ログの出力先 |
取り込み | データの取り込みに対して発生する費用 |
テレメトリー | イベントのログやメトリクスに対して発生する費用 |
特に注意が必要なのは基盤部分です。
画像:Understanding Openflow costs | Snowflake Documentationより引用
公式ドキュメントにもあるように、ランタイムを作成した時点で以下の3種類のインスタンスが起動します。
- エージェントインスタンス(コンテナイメージの最新化、インフラ構築)
- 管理コンテナ(Openflowアプリケーション全体の管理)
- ランタイムコンテナ(レプリケーション処理実行)
各インスタンスはデフォルトで常時稼働状態であるため、使用しない時間帯は停止するなどして、課金の発生を抑える必要があります。
例:PostgreSQL (RDS) の100GBのデータを1か月間継続ロードした場合
- サービス:0.0225 * 1 * 24 * 30 * $4.30 = $69.66
- 基盤
・VPC:NATゲートウェイの合計データ処理量が200GBと仮定して、$0.062 * 24 * 30 + 200 * $0.062 = $57.04
・EC2:
・t3.medium:$0.0136 * 24 * 30 = $9.792
・m7i.xlarge:$0.2604 * 24 * 30 = $187.488
・m7i.2xlarge:$0.5208 * 24 * 30 = $374.976
・EKS:クラスター1つに対し、$0.10 * 24 * 30 = $72
・S3:ログの最大出力量を1GBとして、$0.025 * 1 = $0.025
- 取り込み:Snowpipe Streaming で合計データ処理量200GBと仮定して、0.0037 * 200 * $4.30 = $3.182
- テレメトリー:ログの最大出力量を1GBとして、0.02 * 1 * $4.30 = $0.086
上記を合計すると、$774.249 ≒ 108,395円(1ドル140円)
クレジットの詳細:CreditConsumptionTable.pdf
計算例でもわかる通り、EC2部分の料金が大半を占めています。
最小の構成でもそこそこ大きなインスタンスが立ってしまうので、注意する必要があります。
さいごに
Openflow の使い方について、詳しくご紹介しました。
Openflow は多くのコネクタが提供されており、様々なデータソースを一元的に集約して Snowflake に取り込むことができる便利なサービスですが、デフォルトである程度パワフルな構成になっているため、想像以上に課金が発生する点については注意が必要です。
特定のデータソースの連携が目的であれば、こちらの記事でもご紹介しているように Snowpipe Streaming といったサービスを使う方法もあります。
ご利用のユースケースに応じて、最適な方法を選択するようにしてください。