LoginSignup
15
16

More than 5 years have passed since last update.

Apache Kafkaのクラスタ構成を試してみる

Last updated at Posted at 2016-01-10

前回のApache Kafka記事のつづきです。今回は、Kafkaクラスタ構成を確認してみます。

◼︎ 今回、試してみる Kafkaクラスタ構成

Kafkaクラスタは、つぎのような構成としました。

  • Mac OS X環境2台で、クラスタ構成を構築する
  • Producer側は、独立したサーバ環境で動作させる
  • Consumer側は、Kafkaクラスタのサーバ環境に同居させる
  • zookeeperは、192.168.100.201側を使用する
  • Consumer側では、同じタイミングでProducer側の送信データを取得する

kafka構成.jpeg

実際、topic"kafkaesque"を再作成して、つぎのようなKafkaクラスタ構成を構築します。

  • topic作成時に、レプリケーションを有効にする
  • パーテションは、1つのみ作成する
$ kafka-topics.sh --describe --zookeeper 192.168.100.201:2181 --topic kafkaesque
Topic:kafkaesque    PartitionCount:1    ReplicationFactor:2 Configs:
    Topic: kafkaesque   Partition: 0    Leader: 1   Replicas: 2,1   Isr: 1,2

なお、今回、使用したkafkaのソフトウェアは、0.8.2.1と少し古めのものになります。

$ brew list kafka
/usr/local/Cellar/kafka/0.8.2.1/bin/kafka-console-consumer.sh
/usr/local/Cellar/kafka/0.8.2.1/bin/kafka-console-producer.sh
/usr/local/Cellar/kafka/0.8.2.1/bin/kafka-consumer-offset-checker.sh
/usr/local/Cellar/kafka/0.8.2.1/bin/kafka-consumer-perf-test.sh
/usr/local/Cellar/kafka/0.8.2.1/bin/kafka-mirror-maker.sh
/usr/local/Cellar/kafka/0.8.2.1/bin/kafka-preferred-replica-election.sh
/usr/local/Cellar/kafka/0.8.2.1/bin/kafka-producer-perf-test.sh
/usr/local/Cellar/kafka/0.8.2.1/bin/kafka-reassign-partitions.sh
/usr/local/Cellar/kafka/0.8.2.1/bin/kafka-replay-log-producer.sh
/usr/local/Cellar/kafka/0.8.2.1/bin/kafka-replica-verification.sh
/usr/local/Cellar/kafka/0.8.2.1/bin/kafka-run-class.sh
/usr/local/Cellar/kafka/0.8.2.1/bin/kafka-server-start.sh
/usr/local/Cellar/kafka/0.8.2.1/bin/kafka-server-stop.sh
/usr/local/Cellar/kafka/0.8.2.1/bin/kafka-simple-consumer-shell.sh
/usr/local/Cellar/kafka/0.8.2.1/bin/kafka-topics.sh
/usr/local/Cellar/kafka/0.8.2.1/bin/zookeeper-server-start.sh
/usr/local/Cellar/kafka/0.8.2.1/bin/zookeeper-server-stop.sh
/usr/local/Cellar/kafka/0.8.2.1/bin/zookeeper-shell.sh
/usr/local/Cellar/kafka/0.8.2.1/homebrew.mxcl.kafka.plist
/usr/local/Cellar/kafka/0.8.2.1/libexec/bin/ (18 files)
/usr/local/Cellar/kafka/0.8.2.1/libexec/clients/ (390 files)
/usr/local/Cellar/kafka/0.8.2.1/libexec/contrib/ (49 files)
/usr/local/Cellar/kafka/0.8.2.1/libexec/core/ (3072 files)
/usr/local/Cellar/kafka/0.8.2.1/libexec/examples/ (18 files)
/usr/local/Cellar/kafka/0.8.2.1/libexec/logs/ (83 files)
/usr/local/Cellar/kafka/0.8.2.1/libexec/system_test/ (247 files)

◼︎ Kafkaクラスタ動作環境をつくる

  • 前回、作成の topic"kafkaesque"を削除
$ kafka-topics.sh --delete  --zookeeper localhost:2181  --topic kafkaesque
  • kafka設定ファイル編集
192.168.100.201側パラメータ例
$ vi /usr/local/etc/kafka/server.properties
------------------
broker.id=1
port=9092
host.name=192.168.100.201
zookeeper.connect=192.168.100.201:2181
delete.topic.enable=true
192.168.100.202側パラメータ例
$ vi /usr/local/etc/kafka/server.properties
------------------
broker.id=2
port=9092
host.name=192.168.100.202
zookeeper.connect=192.168.100.201:2181
delete.topic.enable=true
  • kafkaを起動
$ kafka-server-start.sh /usr/local/etc/kafka/server.properties
  • topic: "kafkaesque"を作成 & 確認
$ kafka-topics.sh --create --zookeeper 192.168.100.201:2181 --replication-factor 2 --partitions 1 --topic kafkaesque
Created topic "kafkaesque".

$ kafka-topics.sh --describe --zookeeper 192.168.100.201:2181 --topic kafkaesque
Topic:kafkaesque    PartitionCount:1    ReplicationFactor:2 Configs:
    Topic: kafkaesque   Partition: 0    Leader: 1   Replicas: 2,1   Isr: 1,2
  • Producer側サンプルアプリを配備(前回のkafka_producer.pyを修正して、フェイルオーバに対応しました。)
kafka_producer.py
from kafka.client import KafkaClient
from kafka.producer import SimpleProducer
from kafka.common import FailedPayloadsError
from kafka.common import KafkaUnavailableError
from time import sleep
from datetime import datetime

producer = None

try:
    kafka = KafkaClient("192.168.100.201:9092, 192.168.100.202:9092")
    producer = SimpleProducer(kafka)
except KafkaUnavailableError as e:
    print "KafkaUnavailableError: [%s]"% e


while 1:
    # "kafkaesque" is the name of our topic
    if producer is None:
        exit(1)
    else:
        try:
            nowtime = str(datetime.now().time())
            producer.send_messages("kafkaesque", "Metamorphosis! " + nowtime)
            print "Metamorphosis! " + nowtime
        except FailedPayloadsError as e:
            print "FailedPayloadsError: [%s]"% e
            producer.client.close()
            producer = SimpleProducer(kafka)
        except KafkaUnavailableError as e:
            print "KafkaUnavailableError: [%s]"% e
            producer.client.close()
            producer = None
    sleep(1)
  • Consumer側サンプルアプリを配備(前回のkafka_consumer.goと同じサンプルアプリを使用します。)

  • Consumer側サンプルアプリ設定ファイル

192.168.100.201側のconsumers.properties
#Consumer identity
client_id=go-consumer1
group_id=cs-rebalance-group1
num_consumers=1
topic=kafkaesque
log_level=warn

#Zookeeper connection settings
zookeeper_connect=192.168.100.201:2181
zookeeper_timeout=1s
...
192.168.100.202側のconsumers.properties
#Consumer identity
client_id=go-consumer2
group_id=cs-rebalance-group2
num_consumers=1
topic=kafkaesque
log_level=warn

#Zookeeper connection settings
zookeeper_connect=192.168.100.201:2181
zookeeper_timeout=1s
...

◼︎ さっそく、kafkaクラスタ環境で動かしてみる

今回、Consumer側のgroup_idは、各々で異なる値を設定しました。
これによって、Producer側からの送信データが同報的に取得できるようになるはずです。

  • まずは、障害が発生していない場合のKafkaクラスタ動作を確認してみました。
192.168.100.200のProducer側サンプルアプリ起動
$ python kafka_producer.py 
Metamorphosis! 20:29:17.478683
Metamorphosis! 20:29:18.483005
Metamorphosis! 20:29:19.487480
Metamorphosis! 20:29:20.491742
Metamorphosis! 20:29:21.496180
Metamorphosis! 20:29:22.500724
Metamorphosis! 20:29:23.505113
Metamorphosis! 20:29:24.509501
Metamorphosis! 20:29:25.513620
Metamorphosis! 20:29:26.517824
Metamorphosis! 20:29:27.522053
Metamorphosis! 20:29:28.526435
Metamorphosis! 20:29:29.530428
Metamorphosis! 20:29:30.534616
...
192.168.100.201のConsumer側サンプルアプリ起動
$ go run kafka_consumer.go 
2016/01/10 20:28:32 Connected to 192.168.100.201:2181
2016/01/10 20:28:32 Authenticated: id=95184864121651345, timeout=4000
msg.Value=[Metamorphosis! 20:29:17.478683]
msg.Value=[Metamorphosis! 20:29:18.483005]
msg.Value=[Metamorphosis! 20:29:19.487480]
msg.Value=[Metamorphosis! 20:29:20.491742]
msg.Value=[Metamorphosis! 20:29:21.496180]
msg.Value=[Metamorphosis! 20:29:22.500724]
msg.Value=[Metamorphosis! 20:29:23.505113]
msg.Value=[Metamorphosis! 20:29:24.509501]
msg.Value=[Metamorphosis! 20:29:25.513620]
msg.Value=[Metamorphosis! 20:29:26.517824]
msg.Value=[Metamorphosis! 20:29:27.522053]
msg.Value=[Metamorphosis! 20:29:28.526435]
msg.Value=[Metamorphosis! 20:29:29.530428]
msg.Value=[Metamorphosis! 20:29:30.534616]
...
192.168.100.202のConsumer側サンプルアプリ起動
$ go run kafka_consumer.go 
msg.Value=[Metamorphosis! 20:29:17.478683]
msg.Value=[Metamorphosis! 20:29:18.483005]
msg.Value=[Metamorphosis! 20:29:19.487480]
msg.Value=[Metamorphosis! 20:29:20.491742]
msg.Value=[Metamorphosis! 20:29:21.496180]
msg.Value=[Metamorphosis! 20:29:22.500724]
msg.Value=[Metamorphosis! 20:29:23.505113]
msg.Value=[Metamorphosis! 20:29:24.509501]
msg.Value=[Metamorphosis! 20:29:25.513620]
msg.Value=[Metamorphosis! 20:29:26.517824]
msg.Value=[Metamorphosis! 20:29:27.522053]
msg.Value=[Metamorphosis! 20:29:28.526435]
msg.Value=[Metamorphosis! 20:29:29.530428]
msg.Value=[Metamorphosis! 20:29:30.534616]
...

Producer側から刻々と送信されてくる時刻等のデータを、各々のCousumer側で随時表示されることが確認できました。

  • 次に、障害が発生した場合のKafkaクラスタ動作を確認してみました。

Kafkaクラスタ構成において、broker.id=1のKafkaサーバが、leaderに選定されております。

$ kafka-topics.sh --describe --zookeeper 192.168.100.201:2181 --topic kafkaesque
Topic:kafkaesque    PartitionCount:1    ReplicationFactor:2 Configs:
    Topic: kafkaesque   Partition: 0    Leader: 1   Replicas: 2,1   Isr: 1,2

Producer側から刻々と送信されてくる時刻等のデータを、各々のCousumer側で随時表示される状態にて、leaderに選定されているKafkaプロセスを、"Ctrl+C"にて強制停止させてみます。するとKafkaクラスタ動作としては、フェイルオーバによって、その後も、各々のCousumer側で随時表示されつづけるはずです。

192.168.100.200のProducer側サンプルアプリ起動
$ python kafka_producer.py 
Metamorphosis! 21:20:14.368620
Metamorphosis! 21:20:15.385962
Metamorphosis! 21:20:16.390473
Metamorphosis! 21:20:17.394782
Metamorphosis! 21:20:18.399187
Metamorphosis! 21:20:19.403492
Metamorphosis! 21:20:20.407589
Metamorphosis! 21:20:21.411884
Metamorphosis! 21:20:22.415718
Metamorphosis! 21:20:23.419357
Metamorphosis! 21:20:24.423614
No handlers could be found for logger "kafka.conn"
FailedPayloadsError: []
Metamorphosis! 21:20:26.430648
Metamorphosis! 21:20:27.452339
Metamorphosis! 21:20:28.458493
Metamorphosis! 21:20:29.464982
Metamorphosis! 21:20:30.470309
Metamorphosis! 21:20:31.476017
Metamorphosis! 21:20:32.481460
Metamorphosis! 21:20:33.488170
Metamorphosis! 21:20:34.493129
Metamorphosis! 21:20:35.498406
Metamorphosis! 21:20:36.503719
...

Producer側サンプルアプリ実行結果としては、21:20:25に、Kafkaサーバとのコネクション断が発生したため、Producer側からのデータ送信は、行われませんでした。

192.168.100.201のConsumer側サンプルアプリ起動
$ go run kafka_consumer.go 
2016/01/10 21:19:42 Connected to 192.168.100.201:2181
2016/01/10 21:19:42 Authenticated: id=95184864121651392, timeout=4000
2016-01-10/21:19:42 [WARN] [6707d3a7-f5c3-1dc7-d652-fa941fad8628] Consumer failed to reflect all partitions 0 of 1
2016-01-10/21:19:42 [ERROR] [6707d3a7-f5c3-1dc7-d652-fa941fad8628] Failed to reflect partition ownership during rebalance
2016-01-10/21:19:47 [WARN] [6707d3a7-f5c3-1dc7-d652-fa941fad8628] Consumer failed to reflect all partitions 0 of 1
2016-01-10/21:19:47 [ERROR] [6707d3a7-f5c3-1dc7-d652-fa941fad8628] Failed to reflect partition ownership during rebalance
msg.Value=[Metamorphosis! 21:20:14.368620]
msg.Value=[Metamorphosis! 21:20:15.385962]
msg.Value=[Metamorphosis! 21:20:16.390473]
msg.Value=[Metamorphosis! 21:20:17.394782]
msg.Value=[Metamorphosis! 21:20:18.399187]
msg.Value=[Metamorphosis! 21:20:19.403492]
msg.Value=[Metamorphosis! 21:20:20.407589]
msg.Value=[Metamorphosis! 21:20:21.411884]
msg.Value=[Metamorphosis! 21:20:22.415718]
2016-01-10/21:20:24 [WARN] [ConsumerFetcherRoutine-6707d3a7-f5c3-1dc7-d652-fa941fad8628-0] Got a fetch error for topic kafkaesque, partition 0: EOF
2016-01-10/21:20:24 [WARN] [6707d3a7-f5c3-1dc7-d652-fa941fad8628] Consumer failed to reflect all partitions 0 of 1
2016-01-10/21:20:24 [ERROR] [6707d3a7-f5c3-1dc7-d652-fa941fad8628] Failed to reflect partition ownership during rebalance
2016-01-10/21:20:25 [WARN] [ConsumerFetcherRoutine-6707d3a7-f5c3-1dc7-d652-fa941fad8628-0] Got a fetch error for topic kafkaesque, partition 0: EOF
msg.Value=[Metamorphosis! 21:20:23.419357]
msg.Value=[Metamorphosis! 21:20:24.423614]
msg.Value=[Metamorphosis! 21:20:26.430648]
msg.Value=[Metamorphosis! 21:20:27.452339]
2016-01-10/21:20:29 [WARN] [6707d3a7-f5c3-1dc7-d652-fa941fad8628] Consumer failed to reflect all partitions 0 of 1
2016-01-10/21:20:29 [ERROR] [6707d3a7-f5c3-1dc7-d652-fa941fad8628] Failed to reflect partition ownership during rebalance
msg.Value=[Metamorphosis! 21:20:28.458493]
msg.Value=[Metamorphosis! 21:20:29.464982]
msg.Value=[Metamorphosis! 21:20:30.470309]
msg.Value=[Metamorphosis! 21:20:31.476017]
msg.Value=[Metamorphosis! 21:20:32.481460]
2016-01-10/21:20:34 [WARN] [6707d3a7-f5c3-1dc7-d652-fa941fad8628] Consumer failed to reflect all partitions 0 of 1
2016-01-10/21:20:34 [ERROR] [6707d3a7-f5c3-1dc7-d652-fa941fad8628] Failed to reflect partition ownership during rebalance
msg.Value=[Metamorphosis! 21:20:33.488170]
msg.Value=[Metamorphosis! 21:20:34.493129]
msg.Value=[Metamorphosis! 21:20:35.498406]
msg.Value=[Metamorphosis! 21:20:36.503719]
...
192.168.100.202のConsumer側サンプルアプリ起動
$ go run kafka_consumer.go 
msg.Value=[Metamorphosis! 21:20:14.368620]
msg.Value=[Metamorphosis! 21:20:15.385962]
msg.Value=[Metamorphosis! 21:20:16.390473]
msg.Value=[Metamorphosis! 21:20:17.394782]
msg.Value=[Metamorphosis! 21:20:18.399187]
msg.Value=[Metamorphosis! 21:20:19.403492]
msg.Value=[Metamorphosis! 21:20:20.407589]
msg.Value=[Metamorphosis! 21:20:21.411884]
msg.Value=[Metamorphosis! 21:20:22.415718]
msg.Value=[Metamorphosis! 21:20:23.419357]
msg.Value=[Metamorphosis! 21:20:24.423614]
2016-01-10/21:20:24 [WARN] [ConsumerFetcherRoutine-d6e58d92-0576-94e5-a316-380d4474cc02-0] Got a fetch error for topic kafkaesque, partition 0: EOF
2016-01-10/21:20:25 [WARN] [d6e58d92-0576-94e5-a316-380d4474cc02] Consumer failed to reflect all partitions 0 of 1
2016-01-10/21:20:25 [ERROR] [d6e58d92-0576-94e5-a316-380d4474cc02] Failed to reflect partition ownership during rebalance
2016-01-10/21:20:25 [WARN] [ConsumerFetcherRoutine-d6e58d92-0576-94e5-a316-380d4474cc02-0] Got a fetch error for topic kafkaesque, partition 0: EOF
msg.Value=[Metamorphosis! 21:20:26.430648]
msg.Value=[Metamorphosis! 21:20:27.452339]
msg.Value=[Metamorphosis! 21:20:28.458493]
msg.Value=[Metamorphosis! 21:20:29.464982]
2016-01-10/21:20:30 [WARN] [d6e58d92-0576-94e5-a316-380d4474cc02] Consumer failed to reflect all partitions 0 of 1
2016-01-10/21:20:30 [ERROR] [d6e58d92-0576-94e5-a316-380d4474cc02] Failed to reflect partition ownership during rebalance
msg.Value=[Metamorphosis! 21:20:30.470309]
msg.Value=[Metamorphosis! 21:20:31.476017]
msg.Value=[Metamorphosis! 21:20:32.481460]
msg.Value=[Metamorphosis! 21:20:33.488170]
msg.Value=[Metamorphosis! 21:20:34.493129]
msg.Value=[Metamorphosis! 21:20:35.498406]
msg.Value=[Metamorphosis! 21:20:36.503719]

Consumer側サンプルアプリ実行結果としては、ところどころで、"WARN", "ERROR"が出力されていますが、Producer側からのデータ送信は、取りこぼしなく、各々のCousumer側で随時表示されることが確認できました。

$ kafka-topics.sh --describe --zookeeper 192.168.100.201:2181 --topic kafkaesque
Topic:kafkaesque    PartitionCount:1    ReplicationFactor:2 Configs:
    Topic: kafkaesque   Partition: 0    Leader: 2   Replicas: 2,1   Isr: 2

最後に、Kafkaクラスタ構成において、leaderに選定されているKafkaプロセスを、"Ctrl+C"にて強制停止させたことにより、broker.id=2のKafkaサーバが、leaderに選定されているようすが確認できました。

◼︎ 参照元

15
16
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
15
16