LoginSignup
0
0

Kafka to Snowflake 101 : KafkaからSnowflake Sink Connectorを使ってクイックにデータ連携する

Posted at

はじめに

KafkaからSnowflakeにデータを連携する方法を公式サイトの情報を見ながら試してみます。Snowflakeはカラムナ(列指向)かつマイクロパーティションによる分割保存を基本としているため、JDBCコネクターを使用した連携とは異なる考慮事項があるのではないかと思います。難しいことはあとで考えるとして、まずはクイックにSnowflakeコネクターを使用してKafkaからSnowflakeにデータを連携してみます。

前提

  • Kafka:Confluent 7.2.1
  • Snowflake:Sink Connector 2.2.1
  • java:11.0.14
  • OS:Red Hat Enterprise Linux 8.9

やること

image.png

やってみた

  • Kafkaコネクターを使用するロールの作成
    • ロール: KAFKA_CONNECTOR_ROLE_1
    • ユーザー: YSSKJCK
    • データベース: MY_DATABASE
    • スキーマ: PUBLIC
    • テーブル: MY_KAFKA_TABLE
Snowflake Sink Connectorで使用するカスタムロールを作成する
-- Use a role that can create and manage roles and privileges.
USE ROLE SECURITYADMIN;

-- Create a Snowflake role with the privileges to work with the connector.
CREATE ROLE KAFKA_CONNECTOR_ROLE_1;

-- Grant privileges on the database.
GRANT USAGE ON DATABASE MY_DATABASE TO ROLE KAFKA_CONNECTOR_ROLE_1;

-- Grant privileges on the schema.
GRANT USAGE ON SCHEMA MY_DATABASE.PUBLIC TO ROLE KAFKA_CONNECTOR_ROLE_1;
GRANT CREATE TABLE ON SCHEMA MY_DATABASE.PUBLIC TO ROLE KAFKA_CONNECTOR_ROLE_1;
GRANT CREATE STAGE ON SCHEMA MY_DATABASE.PUBLIC TO ROLE KAFKA_CONNECTOR_ROLE_1;
GRANT CREATE PIPE ON SCHEMA MY_DATABASE.PUBLIC TO ROLE KAFKA_CONNECTOR_ROLE_1;

-- Grant the custom role to an existing user.
GRANT ROLE KAFKA_CONNECTOR_ROLE_1 TO USER YSSKJCK;

-- Set the custom role as the default role for the user.
-- If you encounter an 'Insufficient privileges' error, verify the role that has the OWNERSHIP privilege on the user.
ALTER USER YSSKJCK SET DEFAULT_ROLE = KAFKA_CONNECTOR_ROLE_1;
  • Snowflake Sink ConnectorをConfluent Hubからインストール
confluent-hub install snowflakeinc/snowflake-kafka-connector:2.2.1

image.png

  • Kafka Connectを再起動
confluent local services connect stop
confluent local services connect start

image.png

  • Snowflake Sink Connectorがコネクタープラグインとして有効になったことを確認
curl localhost:8083/connector-plugins | jq

image.png

kafka-avro-console-producerでProduce(対話モード)
kafka-avro-console-producer \
  --broker-list localhost:9092 \
  --property schema.registry.url=http://localhost:8081 \
  --topic sample-topic \
  --property parse.key=true \
  --property key.schema='{"type":"record",
                          "name":"myrecord",
                          "fields":[{"name":"col1","type":"string"}]}' \
  --property key.separator=" " \
  --property value.schema='{"type":"record",
                            "name":"myrecord",
                            "fields":[{"name":"col1","type":"string"},
                                      {"name":"col2","type":"string"},
                                      {"name":"col3","type":"string"}]}'
メッセージを入力
{"col1":"a1"} {"col1":"a1","col2":"a","col3":"test1"}
{"col1":"a2"} {"col1":"a2","col2":"aa","col3":"test2"}
{"col1":"a3"} {"col1":"a3","col2":"aaa","col3":"test3"}

image.png

openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 aes256 -inform PEM -out rsa_key.p8
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub

image.png
image.png
image.png
image.png
image.png
image.png

ALTER USER YSSKJCK SET RSA_PUBLIC_KEY='MIIBIjANBg......fy7V6b9wSYZj+e82wIDAQAB';

image.png

  • Snowflake Sink Connectorを作成する(トピック「sample-topic」にアタッチ)
curl -sS -X PUT http://localhost:8083/connectors/sample-topic-snowsink/config \
-H "Content-Type: application/json" \
--data '
{
  "name":"sample-topic-snowsink",
  "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
  "tasks.max":"1",
  "topics":"sample-topic",
  "snowflake.topic2table.map":"sample-topic:MY_KAFKA_TABLE_1",
  "buffer.count.records":"10000",
  "buffer.flush.time":"60",
  "buffer.size.bytes":"5000000",
  "snowflake.url.name":"https://...........ap-.....-..aws.snowflakecomputing.com:443",
  "snowflake.user.name":"YSSKJCK",
  "snowflake.private.key":"MIIFLTBXBgkqhkiG......6oh1iBzrdDBr+qIg4AE",
  "snowflake.private.key.passphrase":"s.....n",
  "snowflake.database.name":"MY_DATABASE",
  "snowflake.schema.name":"PUBLIC",
  "key.converter":"io.confluent.connect.avro.AvroConverter",
  "key.converter.schema.registry.url":"http://localhost:8081",
  "value.converter":"io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url":"http://localhost:8081"
}' | jq
# コネクターのステータス取得
curl localhost:8083/connectors/sample-topic-snowsink/status | jq

image.png

  • Snowflake側を確認する
    image.png
    テーブルはできている。
    image.png
    できたテーブルはVARIANT型のRECORD_METADATAとRECORD_CONTENTのカラムを持っている。
    image.png
    image.png
    image.png
    VARIANT型のRECORD_METADATAにはトピック名、タイムスタンプ、オフセットに加えてメッセージのキー部が格納されている。RECORD_CONTENTにはメッセージのバリュー部が格納されている。

image.png
一時的にSnowpipeに使用されると思しきステージが作られている。

image.png
Snowpipeのパイプ定義も作られている。

  • さらにメッセージをトピックにProduceしてみる
メッセージを入力
{"col1":"z1"} {"col1":"z1","col2":"zzz","col3":"20240404-1:16:30"}

image.png

  • Snowflake側を確認する(追加メッセージが格納されているか)
    image.png
    すぐにSnowflakeのテーブルにメッセージが連携された。

  • Snowflake Sink Connectorを削除する

curl -X DELETE localhost:8083/connectors/sample-topic-snowsink

image.png

-- ステージのドロップ
DROP STAGE SNOWFLAKE_KAFKA_CONNECTOR_SAMPLE_TOPIC_SNOWSINK_1263312905_STAGE_MY_KAFKA_TABLE_1;
-- パイプのドロップ
DROP PIPE SNOWFLAKE_KAFKA_CONNECTOR_SAMPLE_TOPIC_SNOWSINK_1263312905_PIPE_MY_KAFKA_TABLE_1_0;

image.png
image.png

おわりに

SnowflakeコネクターはSnowflakeの内部ステージ(=ステージはSnowflakeがクエリーできるファイルオブジェクトの置き場のようなもの)に一時ファイルの形で作成し、Snowpipeというストリーム処理によって順次対象テーブルにバルクロードするような仕掛けのようです。実運用を見据えるともうすこし踏み込んだ確認が必要そうですが、当初の目的である「Kafka to Snowflake をクイックに」は実現できたので今回はここまでとします。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0