0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

みなさん、こんにちは!

Snowpipe Streaming は Snowpipe の補完を目的とした機能であり、Snowpipe で対応できないストリーミングデータの取り込みを行うことができます。
Snowpipe Streaming は、以下の2つの方法で利用できます。

  • Java SDKを利用したクライアントアプリケーションからのデータストリーミング
  • Kafkaコネクタ(Snowflake Connector for Kafka)を用いた Kafka からのデータストリーミング

以降では、Azure Event Hubs とKafkaコネクタを用いたデータストリーミングの詳細についてまとめます。
Java SDKを利用したクライアントアプリケーションからのデータストリーミングについては、こちらの記事をご参照ください。

※検証の際、操作はすべてACCOUNTADMINロールで行っています。

手順

以下の手順に従い、各種設定を行います。

1. 認証設定

キーペア作成

クライアントアプリケーションから Snowflake に対して通信を行うにあたり、認証用のRSAキーペアを作成する必要があります。
Linux 環境でターミナルから以下のコマンドを実行します。

qiita.rb
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt

これにより、以下のような秘密鍵 rsa_key.p8 が生成されます。

qiita.rb
-----BEGIN PRIVATE KEY-----
MIIEvgIB...
-----END PRIVATE KEY-----

※rsa_key.p8 は第三者に見られることがないよう、安全な場所に保管します。

また、以下のコマンドで公開鍵を作成します。

qiita.rb
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub

公開鍵の中身は以下のようになっています。

qiita.rb
-----BEGIN PUBLIC KEY-----
MIIBIjAN...
-----END PUBLIC KEY-----

公開鍵登録

作成した公開鍵を Snowflake ユーザに割り当てます。
Snowflake のワークシートで以下のようなSQLを実行します。

qiita.rb
ALTER USER example_user SET RSA_PUBLIC_KEY='MIIBIjAN...';

example_userRSA_PUBLIC_KEYの値はご自身のものを指定してください。

補足

サポートされているツールで生成されたキーペアでない場合、キー登録時に「Invalid Public key」のようなエラーが出ます。
Windows の場合は OpenSSH をダウンロードしてキーペアを作成するか、上記のように WSL 等の Linux 環境で生成したキーペアを使用してください。

2. Kafkaコネクタ設定

Azure Event Hubs と Kafka のセットアップについては、こちらの記事をご参照ください。
セットアップが完了したら、Kafkaコネクタの設定を行います。

Kafkaコネクタダウンロード

ターミナルで以下のコマンドを実行し、Kafkaコネクタと必要なjarをダウンロードします。

qiita.rb
$ cd kafka_2.13-3.9.0/libs/
$ curl https://repo1.maven.org/maven2/com/snowflake/snowflake-kafka-connector/2.2.1/snowflake-kafka-connector-2.2.1.jar --output snowflake-kafka-connector-2.2.1.jar
$ curl https://repo1.maven.org/maven2/net/snowflake/snowflake-ingest-sdk/2.1.0/snowflake-ingest-sdk-2.1.0.jar --output snowflake-ingest-sdk-2.1.0.jar
$ curl https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc/3.14.5/snowflake-jdbc-3.14.5.jar --output snowflake-jdbc-3.14.5.jar
$ curl https://repo1.maven.org/maven2/org/bouncycastle/bc-fips/1.0.1/bc-fips-1.0.1.jar --output bc-fips-1.0.1.jar
$ curl https://repo1.maven.org/maven2/org/bouncycastle/bcpkix-fips/1.0.3/bcpkix-fips-1.0.3.jar --output bcpkix-fips-1.0.3.jar

※各ライブラリのバージョンは継続的に更新されているためご注意ください。
 上記は検証時点で問題なく機能したバージョンの組み合わせとなっています。

設定ファイル作成

下記コマンドで kafka-cli ディレクトリに移動します。

qiita.rb
$ cd ~/azure-event-hubs-for-kafka/quickstart/kafka-cli/

Kafkaコネクタをスタンドアロンモードで起動するため、以下の内容を記載した connect-standalone.properties ファイルを作成します。

qiita.rb
bootstrap.servers=<Event Hubs namespace>.servicebus.windows.net:9093

# Snowflake value conversion
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

security.protocol=SASL_SSL
sasl.mechanism=PLAIN
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
plugin.path=<kafka_home>/libs

以下の項目はご自身の環境のものを設定してください。
※括弧内のデータは今回の設定例

  • <Event Hubs namespace>:Event Hubs名前空間(eh-snowflake-je
  • <kafka_home>:Kafkaをインストールしたディレクトリのパス(/home/tani/kafka_2.13-3.9.0

また、以下の内容を記載した SF_connect.properties ファイルを作成します。

qiita.rb
name=kafka_snowpipe_streaming
connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
tasks.max=1
topics=<topic_name>
snowflake.topic2table.map=<topic_name>:<table_name>
buffer.count.records=10000
buffer.flush.time=5
buffer.size.bytes=20000000
snowflake.url.name=<account_id>.snowflakecomputing.com:443
snowflake.user.name=<user_name>
snowflake.private.key=<private_key>
snowflake.database.name=<db_name>
snowflake.schema.name=<schema_name>
snowflake.role.name=<role_name>
snowflake.ingestion.method=SNOWPIPE_STREAMING
snowflake.enable.schematization=false
value.converter.schemas.enable=false
jmx=true
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
errors.tolerance=all

以下の項目はご自身の環境のものを指定してください。
※括弧内のデータは今回の設定例

  • <topic_name>:Kafkaトピック名(test-topic
  • <account_id>:アカウント識別子
  • <user_name>:ユーザ名(TANI
  • <private_key>:秘密鍵(MIIEvgIB...
  • <table_name>:対象テーブル名(snowpipe_streaming_kafka
  • <db_name>:対象データベース名(TEST
  • <schema_name>:対象スキーマ名(PUBLIC
  • <role_name>:ロール名(ACCOUNTADMIN

テーブルが存在しない場合、Kafkaコネクターが<table_name>で指定されたテーブルを自動的に作成するため、事前に作成しておく必要はありません。
<user_name>, <account_id>, <role_name>に必要な情報は、Snowflake メニュー最下部から「アカウント」→「アカウントの詳細を表示する」で確認可能です。
image.png
image.png

<private_key>には「1. 認証設定」で作成した秘密鍵を以下のように1行で記入します。

qiita.rb
snowflake.private.key=MIIEvgIB...

上記設定後の kafka-cli フォルダ内部は以下の状態です。
image.png

3. Kafkaコネクタ起動

起動前に忘れずに環境変数の設定をしておきます。(セッションをまたいで使用できるよう、.bashrc に追記しておくと便利です)

qiita.rb
$ export KAFKA_OPTS="-Djava.security.auth.login.config=jaas.conf"
$ export KAFKA_INSTALL_HOME=[Kafka をインストールしたディレクトリ(:/home/tani/kafka_2.13-3.9.0)]

以下のコマンドでKafkaコネクタを起動します。

qiita.rb
$ $KAFKA_INSTALL_HOME/bin/connect-standalone.sh connect-standalone.properties SF_connect.properties

別のターミナルを開き、以下のように Producer を起動してメッセージを送信します。

qiita.rb
$ $KAFKA_INSTALL_HOME/bin/kafka-console-producer.sh --topic test-topic --broker-list eh-snowflake-je.servicebus.windows.net:9093 --producer.config client_common.properties

>aaa
>bbb
>ccc

image.png

上記の実行後に Snowflake を確認すると、テーブルが作成されデータが取り込まれていることが確認できます。
image.png

参考

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?