はじめに
久しぶりにkafkaを起動しようと思ったら、v2.0.0が2018年7月30日にリリースされていたのでインストールして動かしてみることにしました。
前にインストールしたのはv1.1.0でしたが、v2.0.0でもインストール自体は特に変わりはないようです。
久しぶりにkafkaを動かしてみようと思ったのは、インメモリDBのVoltDBとの連携を試してみたかったからです。
VoltDBでは、kafkaに配信されたメッセージをインポートしたり、逆に配信(エクスポート)する機能があります。これを使うと、アプリケーションとVoltDB間の非同期処理をkafkaを経由して簡単に実現できます。今回の記事ではVoltDBとの連携は書いていませんが、今度試した結果を書いてみようかと思います。
kafkaは以下の3台構成で構築しています。
・192.168.10.121 kafkaserver1
・192.168.10.122 kafkaserver2
・192.168.10.123 kafkaserver3
※CentOS7.4でSELinuxは無効にしています。
各サーバにはzookeeperも稼働します。
zookeeperをクラスタ構成にする場合は最低3台必要です。今回はkafkaのサーバと同じ台数ですが、必ずしも同じ台数である必要はありません。(というか、kafkaのサーバに合わせて増やす必要はありません)
zookeeperのサーバが3台の場合、1台のサーバがダウンしても稼働を継続できます。5台にすれば2台のサーバがダウンしても稼働を継続できます。通常は3台か5台で十分かと思います。
kafka v2.0.0のインストール
kafkaのダウンロードページから、Scala 2.12版のモジュールをダウンロード。zookeeperも含まれています。
ダウンロードしたモジュールを/opt以下に展開。ついでにシンボリックリンクを作成。
tar xvzf /tmp/kafka_2.12-2.0.0.tgz -C /opt
ln -s /opt/kafka_2.12-2.0.0 /opt/kafka
なお、諸事情でOpenJDKはインストール済みなので、手順は省略。
各サーバのzookeeperの設定ファイルを修正する。
vi /opt/kafka/config/zookeeper.properties
dataDir=/var/lib/zookeeper
tickTime=2000
initLimit=5
syncLimit=2
server.1=192.168.10.121:2888:3888
server.2=192.168.10.122:2888:3888
server.3=192.168.10.123:2888:3888
[2018/9/22]以下の設定が抜けていたので追記
mkdir -p /var/lib/zookeeper
# サーバ1
echo "1" > /var/lib/zookeeper/myid
# サーバ2
echo "2" > /var/lib/zookeeper/myid
# サーバ3
echo "3" > /var/lib/zookeeper/myid
kafkaのデータを保存するディレクトリを作成する。
mkdir /opt/kafka-logs
各サーバのkafkaの設定ファイルを修正する。
vi /opt/kafka/config/server.properties
以下を修正
# サーバ1
broker.id=0 # サーバごとに変える
zookeeper.connect=192.168.10.121:2181,192.168.10.122:2181,192.168.10.123:2181
log.dirs=/opt/kafka-logs
# サーバ2
broker.id=1 # サーバごとに変える
zookeeper.connect=192.168.10.121:2181,192.168.10.122:2181,192.168.10.123:2181
log.dirs=/opt/kafka-logs
# サーバ3
broker.id=2 # サーバごとに変える
zookeeper.connect=192.168.10.121:2181,192.168.10.122:2181,192.168.10.123:2181
log.dirs=/opt/kafka-logs
v1.0.0以前では「delete.topic.enable=true」でtopicを削除できるようにします。
プロダクション環境であれば削除できない方が良いでしょうが、開発環境なので利便性を重視してtopicを削除できるようにしています。
なお、v1.0.0からは「delete.topic.enable」がデフォルトでtrueであるため変更する必要はありません。
ディスクに余裕がない環境のため、データを保持する期間を変更。
デフォルトでは7日間(168時間)経過したデータが自動的に削除されますが、24時間に変更しています。
log.retention.hours=24
firewalldを有効にしている場合は以下も実行する。
2888と3888はZooKeeperをクラスタ構成にしている場合のみ必要です。
firewall-cmd --add-port=2181/tcp --permanent
firewall-cmd --add-port=2888/tcp --permanent
firewall-cmd --add-port=3888/tcp --permanent
firewall-cmd --add-port=9092/tcp --permanent
firewall-cmd --reload
kafkaを起動
zookeeperとkafkaの設定が完了したら、kafkaを起動します。
cd /opt/kafka
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties # さきほど修正したファイルを使用
bin/kafka-server-start.sh -daemon config/server.properties # さきほど修正したファイルを使用
全てのサーバでkafkaを起動した後、zookeeperに接続してブローカーが登録されているか確認する。
cd /opt/kafka
bin/zookeeper-shell.sh 192.168.10.121:2181 # zookeeperに接続
Connecting to 192.168.10.121:2181
Welcome to ZooKeeper!
ls /brokers/ids # brokerを確認する。
[0, 1, 2] # 0, 1, 2の3台が登録されている。
トピックを作る。
kafka-topics.shを使用して、"test01"というトピックを作成します。
#bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic test01
Created topic "test01".
「kafka-console-consumer.sh」で、Topic(test01)を読み込むクライアントを起動しておきます。
「--from-beginning」オプションでTopicの最初から読み込む設定になります。
bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.121:9092 --from-beginning --topic test01
次に「bin/kafka-console-producer.sh」でTopic(test01)にメッセージを配信します。
bin/kafka-console-producer.sh --broker-list 192.168.10.121:9092 --topic test01
>test02
すると、クライアント側(consumer)でメッセージを読み込み表示されます。
bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.121:9092 --from-beginning --topic test01
test02
3台構成にしたのに1台でしか動かしていませんが、他の2台でもconsumerを起動すると、"test02"のメッセージを読み込むことができます。
3台ともで同じメッセージを受信するのは、consumer起動時にGroupIdを指定していないからで、
以下のように「group」オプションに同じ値をつけてconsumerを3台で起動すると、どれか1台でメッセージを処理します。
bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.121:9092 --from-beginning --topic test01 --group testgroup
ついでに入れてみたtrifectaでは以下のように表示されます。
トピックtest01に"test02"というメッセージがあることが分かります。
trifectaが最新版でも使えるか心配でしたが、0.10や1.1.0のときと同様に動くようです。