はじめに
Confluent PlatformはConfluent社が提供するApache Kafkaを中心としたプラットフォームです。Apache Kafkaに加えて、Schema Registry、Rest Proxyや運用ツール等が同梱されています。
商用版(Enterprise)とCommunity版があり、各ライセンスのコンポーネントの違いは以下のとおりです。
Confluent Community License FAQから引用
商用版は30日以内まで使用でき、30日を過ぎると商用機能が利用できなくなります。
ただし、Kafkaブローカーが1ノードの場合は無期限で利用できるそうです。
今回は商用版をインストールし、Kafka Connect(こっちが本当にやりたいこと)を利用してOracleのデータをKafkaへ配信することを試してみます。
Confluent Platformのダウンロード
まず、以下のインストール手順の「downloads page」のリンク先から、Confluent Platformのモジュールをダウンロードします。
リンクを飛んでいくと、以下のページに遷移し、「ZIP archive」を選択してダウンロードします。
今回ダウンロードしたバージョンはv5.3.1で、ファイル名は「confluent-5.3.1-2.12.zip」でした。
Confluent Platformのインストール
Confluent Platformのインストールは以下のページを参考に実施します。
今回はKafka Connectを試してみたいだけなので、localモード(?)という開発用のスタンドアローンで動作するモードで構築します。
インストール前に以下の手順でOpenJDK8をインストールしておきます。
# yum install java-1.8.0-openjdk-devel
# java -version
openjdk version "1.8.0_222"
OpenJDK Runtime Environment (build 1.8.0_222-b10)
OpenJDK 64-Bit Server VM (build 25.222-b10, mixed mode)
JAVA_HOMEを設定し、PATHにJavaのパスを追加しています。
# echo "export JAVA_HOME=$(readlink -e $(which java)|sed 's:/bin/java::')" > /etc/profile.d/java.sh
# echo "PATH=\$PATH:\$JAVA_HOME/bin" >> /etc/profile.d/java.sh
# source /etc/profile
次にconfluent-5.3.1-2.12.zipを/opt以下に展開します。
# cd /opt
# unzip /tmp/confluent-5.3.1-2.12.zip
Confluent CLIをインストール
スタンドアローンで動作させる際に使用するconfluent localコマンドを利用するため、Confluent CLIをインストールします。
# curl -L https://cnfl.io/cli | sh -s -- -b /opt/confluent-5.3.1/bin
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
0 162 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0
100 10113 100 10113 0 0 6247 0 0:00:01 0:00:01 --:--:-- 9875k
confluentinc/cli info checking S3 for latest tag
confluentinc/cli info found version: latest for latest/linux/amd64
confluentinc/cli info installed /opt/confluent-5.3.1/bin/confluent
confluentinc/cli info please ensure /opt/confluent-5.3.1/bin is in your PATH
kafka-connect-datagenのインストール(今回は不要)
Confluent Hubクライアントを利用して、kafka-connect-datagenをインストールします。
kafka-connect-datagenは、データ生成用のConnectorで、テスト用のデータを生成します。
サイトの手順に沿ってインストールしたのですが、今回はOracleをロード元(Sink)として利用するため使用しませんでした。
# ./bin/confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:latest
Running in a "--no-prompt" mode
Implicit acceptance of the license below:
Apache License 2.0
https://www.apache.org/licenses/LICENSE-2.0
Downloading component Kafka Connect Datagen 0.1.5, provided by Confluent, Inc. from Confluent Hub and installing into /opt/confluent-5.3.1/share/confluent-hub-components
Adding installation directory to plugin path in the following files:
/opt/confluent-5.3.1/etc/kafka/connect-distributed.properties
/opt/confluent-5.3.1/etc/kafka/connect-standalone.properties
/opt/confluent-5.3.1/etc/schema-registry/connect-avro-distributed.properties
/opt/confluent-5.3.1/etc/schema-registry/connect-avro-standalone.properties
Confluent binディレクトリをPATHに追加
Confluent binディレクトリのインストール場所をPATHに追加します。
# echo "export PATH=/opt/confluent-5.3.1/bin:$PATH" >> /etc/profile.d/confluent.sh
# source /etc/profile
Confluent Platformを起動
confluent local startコマンドを使用してConfluent Platformを起動します。
このコマンドにより、Kafkaだけではなく、Kafka Connect、KSQLなどの全てのConfluent Platformのコンポーネントが起動します。
$ confluent local start
The local commands are intended for a single-node development environment
only, NOT for production usage. https://docs.confluent.io/current/cli/index.html
Using CONFLUENT_CURRENT: /tmp/confluent.2ACJxXld
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
Starting control-center
control-center is [UP]
Confluent Control Centerにアクセス
以下のURLでConfluentの運用管理画面へアクセスします。
kafkaのトピックを作成
コマンドラインからもできますが、Confluent Control Centerを使用してトピックを作成してみます。
クラスタ選択後、「Topics」を選択すると以下の画面に遷移します。
「Add a topic」をクリックすると以下の画面が表示され、トピック名等を入力して、「Create with defaults」ボタンを押せばトピック作成終了です。
トピックは作成しましたが、以降の手順では作成したトピックは結局使用していません。
Oralce JDBCドライバのインストール
Oralce JDBCドライバ(ojdbc8.jar)をOracleのサイトからダウンロードし、以下のように格納します。
- /opt/confluent-5.3.1/share/java/kafka-connect-jdbc/ojdbc8.jar
次にKafka connectを再起動します。
# confluent local stop connect
The local commands are intended for a single-node development environment
only, NOT for production usage. https://docs.confluent.io/current/cli/index.html
Using CONFLUENT_CURRENT: /tmp/confluent.2ACJxXld
Stopping connect
connect is [DOWN]
# confluent local start connect
The local commands are intended for a single-node development environment
only, NOT for production usage. https://docs.confluent.io/current/cli/index.html
Using CONFLUENT_CURRENT: /tmp/confluent.2ACJxXld
zookeeper is already running. Try restarting if needed
kafka is already running. Try restarting if needed
schema-registry is already running. Try restarting if needed
Starting connect
connect is [UP]
ロード元のテーブル作成
データロード元のOracleのテーブルを作成します。
以下のようにemtestテーブルを作成しています。
noカラムは連番のデータを格納し、Kafka Connectでincrementalにデータをロードするためのキーになります。
CREATE TABLE emtest
(
no NUMBER(3,0),
name VARCHAR2(50),
update_at TIMESTAMP(0)
);
Kafka Connectorを作成
JdbcSourceConnectorを使用して、OracleからKafkaへデータをロードするように設定します。
JdbcSourceConnectorはJDBCでデータベースからデータをロードするためのConnectorで、(たぶん)Oracle以外にもPostgreSQL等で使用できます。
クラスタ選択後、「Connect」を選択します。
Connectorの選択画面に遷移するので、ここで「JdbcSourceConnector」を選択します。
Connectorの設定画面に遷移するので、Oracle向けの設定を実施していきます。
設定中の画面は省略しますが、最終的に以下の設定を実施しました。
{
"name": "TestOracle2",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:oracle:thin:@192.168.10.232:1521:testdb",
"connection.user": "hr",
"connection.password": "**",
"table.whitelist": "EMTEST",
"mode": "incrementing",
"incrementing.column.name": "NO",
"validate.non.null": "false",
"topic.prefix": "test."
}
modeは増分をロードするようにするため、incrementingを設定しています。
incrementing.column.nameでは、増分を判断するキーを指定します。
topic.prefixで、トピック名のプレフィックスを指定します。ここでは"test."を指定しており、トピック名はテーブル名を付与した、"test.EMTEST"が自動で作成されます。
kafkaにロードされたメッセージはデフォルトでAvroフォーマットになります。
Stringにするなら、Converterに「org.apache.kafka.connect.storage.StringConverter」を指定します。
ここでkafkaにパブリッシュされたメッセージを確認するために、kafka-avro-console-consumerコマンドを使用します。
Avroフォーマットでない場合は、kafka-console-consumerコマンドを使用します。
# ./kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic test.EMTEST
なお、試している際は気づかなかったのですが、以下のコマンドでも同じように確認することができます。
$ confluent local consume test.EMTEST -- --value-format avro --from-beginning
ここでEMTESTテーブルにインサートすると、以下のようにkafkaに格納されたメッセージを取得しコンソールに表示されます。
# ./kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic test.EMTEST
{"NO":{"bytes":"\u0011"},"NAME":{"string":"CCCC"},"UPDATE_AT":{"long":1569079207000}}
{"NO":{"bytes":"\u0012"},"NAME":{"string":"CCCC"},"UPDATE_AT":{"long":1569079234000}}
上ではコマンドでメッセージを取得して確認しましたが、管理画面からも以下のように確認することができます。こちらはメッセージを取得(consume)しているわけではなく、Kafka Brokerに格納されたメッセージを表示しているだけです。
ログを確認(参考)
Connectのログは以下のコマンドで確認できるようです。
confluent local log connect
Confluentを終了する
Confluentを終了するためには、confluent local stopコマンドを使用します。
$ confluent local stop
The local commands are intended for a single-node development environment
only, NOT for production usage. https://docs.confluent.io/current/cli/index.html
Using CONFLUENT_CURRENT: /tmp/confluent.2ACJxXld
Stopping control-center
control-center is [DOWN]
Stopping ksql-server
ksql-server is [DOWN]
Stopping connect
connect is [DOWN]
Stopping kafka-rest
kafka-rest is [DOWN]
Stopping schema-registry
schema-registry is [DOWN]
Stopping kafka
kafka is [DOWN]
Stopping zookeeper
zookeeper is [DOWN]
「confluent local destroy」を使用すると環境全てを破棄するようです(使用していないのでよく分かりません)