はじめに
KafkaからSnowflakeにデータを連携する方法を公式サイトの情報を見ながら試してみます。Snowflakeはカラムナ(列指向)かつマイクロパーティションによる分割保存を基本としているため、JDBCコネクターを使用した連携とは異なる考慮事項があるのではないかと思います。難しいことはあとで考えるとして、まずはクイックにSnowflakeコネクターを使用してKafkaからSnowflakeにデータを連携してみます。
- 参考:Snowflake公式サイト
前提
- Kafka:Confluent 7.2.1
- Snowflake:Sink Connector 2.2.1
- java:11.0.14
- OS:Red Hat Enterprise Linux 8.9
やること
やってみた
- Kafkaコネクターを使用するロールの作成
- ロール: KAFKA_CONNECTOR_ROLE_1
- ユーザー: YSSKJCK
- データベース: MY_DATABASE
- スキーマ: PUBLIC
- テーブル: MY_KAFKA_TABLE
Snowflake Sink Connectorで使用するカスタムロールを作成する
-- Use a role that can create and manage roles and privileges.
USE ROLE SECURITYADMIN;
-- Create a Snowflake role with the privileges to work with the connector.
CREATE ROLE KAFKA_CONNECTOR_ROLE_1;
-- Grant privileges on the database.
GRANT USAGE ON DATABASE MY_DATABASE TO ROLE KAFKA_CONNECTOR_ROLE_1;
-- Grant privileges on the schema.
GRANT USAGE ON SCHEMA MY_DATABASE.PUBLIC TO ROLE KAFKA_CONNECTOR_ROLE_1;
GRANT CREATE TABLE ON SCHEMA MY_DATABASE.PUBLIC TO ROLE KAFKA_CONNECTOR_ROLE_1;
GRANT CREATE STAGE ON SCHEMA MY_DATABASE.PUBLIC TO ROLE KAFKA_CONNECTOR_ROLE_1;
GRANT CREATE PIPE ON SCHEMA MY_DATABASE.PUBLIC TO ROLE KAFKA_CONNECTOR_ROLE_1;
-- Grant the custom role to an existing user.
GRANT ROLE KAFKA_CONNECTOR_ROLE_1 TO USER YSSKJCK;
-- Set the custom role as the default role for the user.
-- If you encounter an 'Insufficient privileges' error, verify the role that has the OWNERSHIP privilege on the user.
ALTER USER YSSKJCK SET DEFAULT_ROLE = KAFKA_CONNECTOR_ROLE_1;
- Snowflake Sink ConnectorをConfluent Hubからインストール
confluent-hub install snowflakeinc/snowflake-kafka-connector:2.2.1
- Kafka Connectを再起動
confluent local services connect stop
confluent local services connect start
- Snowflake Sink Connectorがコネクタープラグインとして有効になったことを確認
curl localhost:8083/connector-plugins | jq
- トピック「sample-topic」にAvro形式でメッセージをProduceしておく
参考:Avro形式のメッセージをいますぐKafkaに送るためのサンプルコマンド
kafka-avro-console-producerでProduce(対話モード)
kafka-avro-console-producer \
--broker-list localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic sample-topic \
--property parse.key=true \
--property key.schema='{"type":"record",
"name":"myrecord",
"fields":[{"name":"col1","type":"string"}]}' \
--property key.separator=" " \
--property value.schema='{"type":"record",
"name":"myrecord",
"fields":[{"name":"col1","type":"string"},
{"name":"col2","type":"string"},
{"name":"col3","type":"string"}]}'
メッセージを入力
{"col1":"a1"} {"col1":"a1","col2":"a","col3":"test1"}
{"col1":"a2"} {"col1":"a2","col2":"aa","col3":"test2"}
{"col1":"a3"} {"col1":"a3","col2":"aaa","col3":"test3"}
- 秘密鍵と公開鍵の作成 (参考:キーペア認証の使用およびキーローテーション)
- 秘密鍵の暗号化バージョンを生成
- 秘密鍵を参照して公開鍵を生成
- ALTER USER を使用して、Snowflakeユーザーに公開キーを割り当て
openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 aes256 -inform PEM -out rsa_key.p8
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
ALTER USER YSSKJCK SET RSA_PUBLIC_KEY='MIIBIjANBg......fy7V6b9wSYZj+e82wIDAQAB';
- Snowflake Sink Connectorを作成する(トピック「sample-topic」にアタッチ)
curl -sS -X PUT http://localhost:8083/connectors/sample-topic-snowsink/config \
-H "Content-Type: application/json" \
--data '
{
"name":"sample-topic-snowsink",
"connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
"tasks.max":"1",
"topics":"sample-topic",
"snowflake.topic2table.map":"sample-topic:MY_KAFKA_TABLE_1",
"buffer.count.records":"10000",
"buffer.flush.time":"60",
"buffer.size.bytes":"5000000",
"snowflake.url.name":"https://...........ap-.....-..aws.snowflakecomputing.com:443",
"snowflake.user.name":"YSSKJCK",
"snowflake.private.key":"MIIFLTBXBgkqhkiG......6oh1iBzrdDBr+qIg4AE",
"snowflake.private.key.passphrase":"s.....n",
"snowflake.database.name":"MY_DATABASE",
"snowflake.schema.name":"PUBLIC",
"key.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://localhost:8081",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://localhost:8081"
}' | jq
# コネクターのステータス取得
curl localhost:8083/connectors/sample-topic-snowsink/status | jq
- Snowflake側を確認する
テーブルはできている。
できたテーブルはVARIANT型のRECORD_METADATAとRECORD_CONTENTのカラムを持っている。
VARIANT型のRECORD_METADATAにはトピック名、タイムスタンプ、オフセットに加えてメッセージのキー部が格納されている。RECORD_CONTENTにはメッセージのバリュー部が格納されている。
一時的にSnowpipeに使用されると思しきステージが作られている。
- さらにメッセージをトピックにProduceしてみる
メッセージを入力
{"col1":"z1"} {"col1":"z1","col2":"zzz","col3":"20240404-1:16:30"}
curl -X DELETE localhost:8083/connectors/sample-topic-snowsink
-
Snowflake側を確認する(ステージやパイプが残っているか)
残っているし実行中のまま。課金されたら嫌なので削除する。 -
Snowflake側の後片付け
参考:Kafkaコネクタで使用されるSnowflakeオブジェクトの削除
-- ステージのドロップ
DROP STAGE SNOWFLAKE_KAFKA_CONNECTOR_SAMPLE_TOPIC_SNOWSINK_1263312905_STAGE_MY_KAFKA_TABLE_1;
-- パイプのドロップ
DROP PIPE SNOWFLAKE_KAFKA_CONNECTOR_SAMPLE_TOPIC_SNOWSINK_1263312905_PIPE_MY_KAFKA_TABLE_1_0;
おわりに
SnowflakeコネクターはSnowflakeの内部ステージ(=ステージはSnowflakeがクエリーできるファイルオブジェクトの置き場のようなもの)に一時ファイルの形で作成し、Snowpipeというストリーム処理によって順次対象テーブルにバルクロードするような仕掛けのようです。実運用を見据えるともうすこし踏み込んだ確認が必要そうですが、当初の目的である「Kafka to Snowflake をクイックに」は実現できたので今回はここまでとします。