3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Apacke kafka v2.0.0がリリースされていたのでインストールしてみる

Last updated at Posted at 2018-08-30

はじめに

久しぶりにkafkaを起動しようと思ったら、v2.0.0が2018年7月30日にリリースされていたのでインストールして動かしてみることにしました。
前にインストールしたのはv1.1.0でしたが、v2.0.0でもインストール自体は特に変わりはないようです。

久しぶりにkafkaを動かしてみようと思ったのは、インメモリDBのVoltDBとの連携を試してみたかったからです。
VoltDBでは、kafkaに配信されたメッセージをインポートしたり、逆に配信(エクスポート)する機能があります。これを使うと、アプリケーションとVoltDB間の非同期処理をkafkaを経由して簡単に実現できます。今回の記事ではVoltDBとの連携は書いていませんが、今度試した結果を書いてみようかと思います。

kafkaは以下の3台構成で構築しています。

・192.168.10.121 kafkaserver1
・192.168.10.122 kafkaserver2
・192.168.10.123 kafkaserver3
※CentOS7.4でSELinuxは無効にしています。

各サーバにはzookeeperも稼働します。
zookeeperをクラスタ構成にする場合は最低3台必要です。今回はkafkaのサーバと同じ台数ですが、必ずしも同じ台数である必要はありません。(というか、kafkaのサーバに合わせて増やす必要はありません)
zookeeperのサーバが3台の場合、1台のサーバがダウンしても稼働を継続できます。5台にすれば2台のサーバがダウンしても稼働を継続できます。通常は3台か5台で十分かと思います。

kafka v2.0.0のインストール

kafkaのダウンロードページから、Scala 2.12版のモジュールをダウンロード。zookeeperも含まれています。

image.png

ダウンロードしたモジュールを/opt以下に展開。ついでにシンボリックリンクを作成。

tar xvzf /tmp/kafka_2.12-2.0.0.tgz -C /opt
ln -s /opt/kafka_2.12-2.0.0 /opt/kafka

なお、諸事情でOpenJDKはインストール済みなので、手順は省略。

各サーバのzookeeperの設定ファイルを修正する。

vi /opt/kafka/config/zookeeper.properties
zookeeper.properties
dataDir=/var/lib/zookeeper
tickTime=2000
initLimit=5
syncLimit=2
server.1=192.168.10.121:2888:3888
server.2=192.168.10.122:2888:3888
server.3=192.168.10.123:2888:3888

[2018/9/22]以下の設定が抜けていたので追記

mkdir -p /var/lib/zookeeper
# サーバ1
echo "1" > /var/lib/zookeeper/myid
# サーバ2
echo "2" > /var/lib/zookeeper/myid
# サーバ3
echo "3" > /var/lib/zookeeper/myid

kafkaのデータを保存するディレクトリを作成する。

mkdir /opt/kafka-logs

各サーバのkafkaの設定ファイルを修正する。

vi /opt/kafka/config/server.properties

以下を修正

server.properties
# サーバ1
broker.id=0 # サーバごとに変える
zookeeper.connect=192.168.10.121:2181,192.168.10.122:2181,192.168.10.123:2181
log.dirs=/opt/kafka-logs

# サーバ2
broker.id=1 # サーバごとに変える
zookeeper.connect=192.168.10.121:2181,192.168.10.122:2181,192.168.10.123:2181
log.dirs=/opt/kafka-logs

# サーバ3
broker.id=2 # サーバごとに変える
zookeeper.connect=192.168.10.121:2181,192.168.10.122:2181,192.168.10.123:2181
log.dirs=/opt/kafka-logs

v1.0.0以前では「delete.topic.enable=true」でtopicを削除できるようにします。
プロダクション環境であれば削除できない方が良いでしょうが、開発環境なので利便性を重視してtopicを削除できるようにしています。
なお、v1.0.0からは「delete.topic.enable」がデフォルトでtrueであるため変更する必要はありません。

ディスクに余裕がない環境のため、データを保持する期間を変更。
デフォルトでは7日間(168時間)経過したデータが自動的に削除されますが、24時間に変更しています。

log.retention.hours=24

firewalldを有効にしている場合は以下も実行する。
2888と3888はZooKeeperをクラスタ構成にしている場合のみ必要です。

firewall-cmd --add-port=2181/tcp --permanent
firewall-cmd --add-port=2888/tcp --permanent
firewall-cmd --add-port=3888/tcp --permanent
firewall-cmd --add-port=9092/tcp --permanent
firewall-cmd --reload

kafkaを起動

zookeeperとkafkaの設定が完了したら、kafkaを起動します。

cd /opt/kafka
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties # さきほど修正したファイルを使用
bin/kafka-server-start.sh -daemon config/server.properties # さきほど修正したファイルを使用

全てのサーバでkafkaを起動した後、zookeeperに接続してブローカーが登録されているか確認する。

cd /opt/kafka
bin/zookeeper-shell.sh 192.168.10.121:2181 # zookeeperに接続
Connecting to 192.168.10.121:2181
Welcome to ZooKeeper!

ls /brokers/ids # brokerを確認する。
[0, 1, 2] # 0, 1, 2の3台が登録されている。

トピックを作る。

kafka-topics.shを使用して、"test01"というトピックを作成します。

#bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic test01
Created topic "test01".

「kafka-console-consumer.sh」で、Topic(test01)を読み込むクライアントを起動しておきます。
「--from-beginning」オプションでTopicの最初から読み込む設定になります。

bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.121:9092 --from-beginning --topic test01

次に「bin/kafka-console-producer.sh」でTopic(test01)にメッセージを配信します。

bin/kafka-console-producer.sh --broker-list 192.168.10.121:9092 --topic test01
>test02

すると、クライアント側(consumer)でメッセージを読み込み表示されます。

bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.121:9092 --from-beginning --topic test01
test02

3台構成にしたのに1台でしか動かしていませんが、他の2台でもconsumerを起動すると、"test02"のメッセージを読み込むことができます。
3台ともで同じメッセージを受信するのは、consumer起動時にGroupIdを指定していないからで、

以下のように「group」オプションに同じ値をつけてconsumerを3台で起動すると、どれか1台でメッセージを処理します。

bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.121:9092 --from-beginning --topic test01 --group testgroup

ついでに入れてみたtrifectaでは以下のように表示されます。
トピックtest01に"test02"というメッセージがあることが分かります。

image.png

trifectaが最新版でも使えるか心配でしたが、0.10や1.1.0のときと同様に動くようです。

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?