7
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Confluent PlatformをインストールしてKafka Connectを試してみる

Last updated at Posted at 2019-09-22

はじめに

Confluent PlatformはConfluent社が提供するApache Kafkaを中心としたプラットフォームです。Apache Kafkaに加えて、Schema Registry、Rest Proxyや運用ツール等が同梱されています。

商用版(Enterprise)とCommunity版があり、各ライセンスのコンポーネントの違いは以下のとおりです。

20190329100752.png
Confluent Community License FAQから引用

商用版は30日以内まで使用でき、30日を過ぎると商用機能が利用できなくなります。
ただし、Kafkaブローカーが1ノードの場合は無期限で利用できるそうです。

今回は商用版をインストールし、Kafka Connect(こっちが本当にやりたいこと)を利用してOracleのデータをKafkaへ配信することを試してみます。

Confluent Platformのダウンロード

まず、以下のインストール手順の「downloads page」のリンク先から、Confluent Platformのモジュールをダウンロードします。

リンクを飛んでいくと、以下のページに遷移し、「ZIP archive」を選択してダウンロードします。

image.png

今回ダウンロードしたバージョンはv5.3.1で、ファイル名は「confluent-5.3.1-2.12.zip」でした。

Confluent Platformのインストール

Confluent Platformのインストールは以下のページを参考に実施します。
今回はKafka Connectを試してみたいだけなので、localモード(?)という開発用のスタンドアローンで動作するモードで構築します。

インストール前に以下の手順でOpenJDK8をインストールしておきます。

# yum install java-1.8.0-openjdk-devel

# java -version
openjdk version "1.8.0_222"
OpenJDK Runtime Environment (build 1.8.0_222-b10)
OpenJDK 64-Bit Server VM (build 25.222-b10, mixed mode)

JAVA_HOMEを設定し、PATHにJavaのパスを追加しています。

# echo "export JAVA_HOME=$(readlink -e $(which java)|sed 's:/bin/java::')" > /etc/profile.d/java.sh
# echo "PATH=\$PATH:\$JAVA_HOME/bin" >> /etc/profile.d/java.sh
# source /etc/profile

次にconfluent-5.3.1-2.12.zipを/opt以下に展開します。

# cd /opt
# unzip /tmp/confluent-5.3.1-2.12.zip

Confluent CLIをインストール

スタンドアローンで動作させる際に使用するconfluent localコマンドを利用するため、Confluent CLIをインストールします。

# curl -L https://cnfl.io/cli | sh -s -- -b /opt/confluent-5.3.1/bin
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0   162    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100 10113  100 10113    0     0   6247      0  0:00:01  0:00:01 --:--:-- 9875k
confluentinc/cli info checking S3 for latest tag
confluentinc/cli info found version: latest for latest/linux/amd64
confluentinc/cli info installed /opt/confluent-5.3.1/bin/confluent
confluentinc/cli info please ensure /opt/confluent-5.3.1/bin is in your PATH

kafka-connect-datagenのインストール(今回は不要)

Confluent Hubクライアントを利用して、kafka-connect-datagenをインストールします。
kafka-connect-datagenは、データ生成用のConnectorで、テスト用のデータを生成します。
サイトの手順に沿ってインストールしたのですが、今回はOracleをロード元(Sink)として利用するため使用しませんでした。

# ./bin/confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:latest
Running in a "--no-prompt" mode 
Implicit acceptance of the license below:  
Apache License 2.0 
https://www.apache.org/licenses/LICENSE-2.0 
Downloading component Kafka Connect Datagen 0.1.5, provided by Confluent, Inc. from Confluent Hub and installing into /opt/confluent-5.3.1/share/confluent-hub-components 
Adding installation directory to plugin path in the following files: 
  /opt/confluent-5.3.1/etc/kafka/connect-distributed.properties 
  /opt/confluent-5.3.1/etc/kafka/connect-standalone.properties 
  /opt/confluent-5.3.1/etc/schema-registry/connect-avro-distributed.properties 
  /opt/confluent-5.3.1/etc/schema-registry/connect-avro-standalone.properties 

Confluent binディレクトリをPATHに追加

Confluent binディレクトリのインストール場所をPATHに追加します。

# echo "export PATH=/opt/confluent-5.3.1/bin:$PATH" >> /etc/profile.d/confluent.sh
# source /etc/profile

Confluent Platformを起動

confluent local startコマンドを使用してConfluent Platformを起動します。
このコマンドにより、Kafkaだけではなく、Kafka Connect、KSQLなどの全てのConfluent Platformのコンポーネントが起動します。

$ confluent local start
    The local commands are intended for a single-node development environment
    only, NOT for production usage. https://docs.confluent.io/current/cli/index.html

Using CONFLUENT_CURRENT: /tmp/confluent.2ACJxXld
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
Starting control-center
control-center is [UP]

Confluent Control Centerにアクセス

以下のURLでConfluentの運用管理画面へアクセスします。

kafkaのトピックを作成

コマンドラインからもできますが、Confluent Control Centerを使用してトピックを作成してみます。

クラスタ選択後、「Topics」を選択すると以下の画面に遷移します。

image.png

「Add a topic」をクリックすると以下の画面が表示され、トピック名等を入力して、「Create with defaults」ボタンを押せばトピック作成終了です。

image.png

トピックは作成しましたが、以降の手順では作成したトピックは結局使用していません。

Oralce JDBCドライバのインストール

Oralce JDBCドライバ(ojdbc8.jar)をOracleのサイトからダウンロードし、以下のように格納します。

  • /opt/confluent-5.3.1/share/java/kafka-connect-jdbc/ojdbc8.jar

次にKafka connectを再起動します。

# confluent local stop connect
    The local commands are intended for a single-node development environment
    only, NOT for production usage. https://docs.confluent.io/current/cli/index.html

Using CONFLUENT_CURRENT: /tmp/confluent.2ACJxXld
Stopping connect
connect is [DOWN]
# confluent local start connect
    The local commands are intended for a single-node development environment
    only, NOT for production usage. https://docs.confluent.io/current/cli/index.html

Using CONFLUENT_CURRENT: /tmp/confluent.2ACJxXld
zookeeper is already running. Try restarting if needed
kafka is already running. Try restarting if needed
schema-registry is already running. Try restarting if needed
Starting connect
connect is [UP]

ロード元のテーブル作成

データロード元のOracleのテーブルを作成します。
以下のようにemtestテーブルを作成しています。
noカラムは連番のデータを格納し、Kafka Connectでincrementalにデータをロードするためのキーになります。

CREATE TABLE emtest
 (
 no NUMBER(3,0),
 name VARCHAR2(50),
 update_at TIMESTAMP(0)
 );

Kafka Connectorを作成

JdbcSourceConnectorを使用して、OracleからKafkaへデータをロードするように設定します。
JdbcSourceConnectorはJDBCでデータベースからデータをロードするためのConnectorで、(たぶん)Oracle以外にもPostgreSQL等で使用できます。

クラスタ選択後、「Connect」を選択します。

image.png

Connectorの選択画面に遷移するので、ここで「JdbcSourceConnector」を選択します。

image.png

Connectorの設定画面に遷移するので、Oracle向けの設定を実施していきます。

image.png

設定中の画面は省略しますが、最終的に以下の設定を実施しました。

{
  "name": "TestOracle2",
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "connection.url": "jdbc:oracle:thin:@192.168.10.232:1521:testdb",
  "connection.user": "hr",
  "connection.password": "**",
  "table.whitelist": "EMTEST",
  "mode": "incrementing",
  "incrementing.column.name": "NO",
  "validate.non.null": "false",
  "topic.prefix": "test."
}

modeは増分をロードするようにするため、incrementingを設定しています。
incrementing.column.nameでは、増分を判断するキーを指定します。
topic.prefixで、トピック名のプレフィックスを指定します。ここでは"test."を指定しており、トピック名はテーブル名を付与した、"test.EMTEST"が自動で作成されます。

kafkaにロードされたメッセージはデフォルトでAvroフォーマットになります。
Stringにするなら、Converterに「org.apache.kafka.connect.storage.StringConverter」を指定します。

ここでkafkaにパブリッシュされたメッセージを確認するために、kafka-avro-console-consumerコマンドを使用します。
Avroフォーマットでない場合は、kafka-console-consumerコマンドを使用します。

# ./kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic test.EMTEST

なお、試している際は気づかなかったのですが、以下のコマンドでも同じように確認することができます。

$ confluent local consume test.EMTEST -- --value-format avro --from-beginning

ここでEMTESTテーブルにインサートすると、以下のようにkafkaに格納されたメッセージを取得しコンソールに表示されます。

# ./kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic test.EMTEST

{"NO":{"bytes":"\u0011"},"NAME":{"string":"CCCC"},"UPDATE_AT":{"long":1569079207000}}
{"NO":{"bytes":"\u0012"},"NAME":{"string":"CCCC"},"UPDATE_AT":{"long":1569079234000}}

上ではコマンドでメッセージを取得して確認しましたが、管理画面からも以下のように確認することができます。こちらはメッセージを取得(consume)しているわけではなく、Kafka Brokerに格納されたメッセージを表示しているだけです。

image.png

ログを確認(参考)

Connectのログは以下のコマンドで確認できるようです。

confluent local log connect

Confluentを終了する

Confluentを終了するためには、confluent local stopコマンドを使用します。

$ confluent local stop
    The local commands are intended for a single-node development environment
    only, NOT for production usage. https://docs.confluent.io/current/cli/index.html

Using CONFLUENT_CURRENT: /tmp/confluent.2ACJxXld
Stopping control-center
control-center is [DOWN]
Stopping ksql-server
ksql-server is [DOWN]
Stopping connect
connect is [DOWN]
Stopping kafka-rest
kafka-rest is [DOWN]
Stopping schema-registry
schema-registry is [DOWN]
Stopping kafka
kafka is [DOWN]
Stopping zookeeper
zookeeper is [DOWN]

「confluent local destroy」を使用すると環境全てを破棄するようです(使用していないのでよく分かりません)

参考

7
6
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?