前回のApache Kafka記事のつづきです。今回は、Kafkaクラスタ構成を確認してみます。
◼︎ 今回、試してみる Kafkaクラスタ構成
Kafkaクラスタは、つぎのような構成としました。
- Mac OS X環境2台で、クラスタ構成を構築する
- Producer側は、独立したサーバ環境で動作させる
- Consumer側は、Kafkaクラスタのサーバ環境に同居させる
- zookeeperは、192.168.100.201側を使用する
- Consumer側では、同じタイミングでProducer側の送信データを取得する
実際、topic"kafkaesque"を再作成して、つぎのようなKafkaクラスタ構成を構築します。
- topic作成時に、レプリケーションを有効にする
- パーテションは、1つのみ作成する
$ kafka-topics.sh --describe --zookeeper 192.168.100.201:2181 --topic kafkaesque
Topic:kafkaesque PartitionCount:1 ReplicationFactor:2 Configs:
Topic: kafkaesque Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2
なお、今回、使用したkafkaのソフトウェアは、0.8.2.1と少し古めのものになります。
$ brew list kafka
/usr/local/Cellar/kafka/0.8.2.1/bin/kafka-console-consumer.sh
/usr/local/Cellar/kafka/0.8.2.1/bin/kafka-console-producer.sh
/usr/local/Cellar/kafka/0.8.2.1/bin/kafka-consumer-offset-checker.sh
/usr/local/Cellar/kafka/0.8.2.1/bin/kafka-consumer-perf-test.sh
/usr/local/Cellar/kafka/0.8.2.1/bin/kafka-mirror-maker.sh
/usr/local/Cellar/kafka/0.8.2.1/bin/kafka-preferred-replica-election.sh
/usr/local/Cellar/kafka/0.8.2.1/bin/kafka-producer-perf-test.sh
/usr/local/Cellar/kafka/0.8.2.1/bin/kafka-reassign-partitions.sh
/usr/local/Cellar/kafka/0.8.2.1/bin/kafka-replay-log-producer.sh
/usr/local/Cellar/kafka/0.8.2.1/bin/kafka-replica-verification.sh
/usr/local/Cellar/kafka/0.8.2.1/bin/kafka-run-class.sh
/usr/local/Cellar/kafka/0.8.2.1/bin/kafka-server-start.sh
/usr/local/Cellar/kafka/0.8.2.1/bin/kafka-server-stop.sh
/usr/local/Cellar/kafka/0.8.2.1/bin/kafka-simple-consumer-shell.sh
/usr/local/Cellar/kafka/0.8.2.1/bin/kafka-topics.sh
/usr/local/Cellar/kafka/0.8.2.1/bin/zookeeper-server-start.sh
/usr/local/Cellar/kafka/0.8.2.1/bin/zookeeper-server-stop.sh
/usr/local/Cellar/kafka/0.8.2.1/bin/zookeeper-shell.sh
/usr/local/Cellar/kafka/0.8.2.1/homebrew.mxcl.kafka.plist
/usr/local/Cellar/kafka/0.8.2.1/libexec/bin/ (18 files)
/usr/local/Cellar/kafka/0.8.2.1/libexec/clients/ (390 files)
/usr/local/Cellar/kafka/0.8.2.1/libexec/contrib/ (49 files)
/usr/local/Cellar/kafka/0.8.2.1/libexec/core/ (3072 files)
/usr/local/Cellar/kafka/0.8.2.1/libexec/examples/ (18 files)
/usr/local/Cellar/kafka/0.8.2.1/libexec/logs/ (83 files)
/usr/local/Cellar/kafka/0.8.2.1/libexec/system_test/ (247 files)
◼︎ Kafkaクラスタ動作環境をつくる
- 前回、作成の topic"kafkaesque"を削除
$ kafka-topics.sh --delete --zookeeper localhost:2181 --topic kafkaesque
- kafka設定ファイル編集
$ vi /usr/local/etc/kafka/server.properties
------------------
broker.id=1
port=9092
host.name=192.168.100.201
zookeeper.connect=192.168.100.201:2181
delete.topic.enable=true
$ vi /usr/local/etc/kafka/server.properties
------------------
broker.id=2
port=9092
host.name=192.168.100.202
zookeeper.connect=192.168.100.201:2181
delete.topic.enable=true
- kafkaを起動
$ kafka-server-start.sh /usr/local/etc/kafka/server.properties
- topic: "kafkaesque"を作成 & 確認
$ kafka-topics.sh --create --zookeeper 192.168.100.201:2181 --replication-factor 2 --partitions 1 --topic kafkaesque
Created topic "kafkaesque".
$ kafka-topics.sh --describe --zookeeper 192.168.100.201:2181 --topic kafkaesque
Topic:kafkaesque PartitionCount:1 ReplicationFactor:2 Configs:
Topic: kafkaesque Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2
- Producer側サンプルアプリを配備(前回のkafka_producer.pyを修正して、フェイルオーバに対応しました。)
from kafka.client import KafkaClient
from kafka.producer import SimpleProducer
from kafka.common import FailedPayloadsError
from kafka.common import KafkaUnavailableError
from time import sleep
from datetime import datetime
producer = None
try:
kafka = KafkaClient("192.168.100.201:9092, 192.168.100.202:9092")
producer = SimpleProducer(kafka)
except KafkaUnavailableError as e:
print "KafkaUnavailableError: [%s]"% e
while 1:
# "kafkaesque" is the name of our topic
if producer is None:
exit(1)
else:
try:
nowtime = str(datetime.now().time())
producer.send_messages("kafkaesque", "Metamorphosis! " + nowtime)
print "Metamorphosis! " + nowtime
except FailedPayloadsError as e:
print "FailedPayloadsError: [%s]"% e
producer.client.close()
producer = SimpleProducer(kafka)
except KafkaUnavailableError as e:
print "KafkaUnavailableError: [%s]"% e
producer.client.close()
producer = None
sleep(1)
-
Consumer側サンプルアプリを配備(前回のkafka_consumer.goと同じサンプルアプリを使用します。)
-
Consumer側サンプルアプリ設定ファイル
# Consumer identity
client_id=go-consumer1
group_id=cs-rebalance-group1
num_consumers=1
topic=kafkaesque
log_level=warn
# Zookeeper connection settings
zookeeper_connect=192.168.100.201:2181
zookeeper_timeout=1s
...
# Consumer identity
client_id=go-consumer2
group_id=cs-rebalance-group2
num_consumers=1
topic=kafkaesque
log_level=warn
# Zookeeper connection settings
zookeeper_connect=192.168.100.201:2181
zookeeper_timeout=1s
...
◼︎ さっそく、kafkaクラスタ環境で動かしてみる
今回、Consumer側のgroup_idは、各々で異なる値を設定しました。
これによって、Producer側からの送信データが同報的に取得できるようになるはずです。
- まずは、障害が発生していない場合のKafkaクラスタ動作を確認してみました。
$ python kafka_producer.py
Metamorphosis! 20:29:17.478683
Metamorphosis! 20:29:18.483005
Metamorphosis! 20:29:19.487480
Metamorphosis! 20:29:20.491742
Metamorphosis! 20:29:21.496180
Metamorphosis! 20:29:22.500724
Metamorphosis! 20:29:23.505113
Metamorphosis! 20:29:24.509501
Metamorphosis! 20:29:25.513620
Metamorphosis! 20:29:26.517824
Metamorphosis! 20:29:27.522053
Metamorphosis! 20:29:28.526435
Metamorphosis! 20:29:29.530428
Metamorphosis! 20:29:30.534616
...
$ go run kafka_consumer.go
2016/01/10 20:28:32 Connected to 192.168.100.201:2181
2016/01/10 20:28:32 Authenticated: id=95184864121651345, timeout=4000
msg.Value=[Metamorphosis! 20:29:17.478683]
msg.Value=[Metamorphosis! 20:29:18.483005]
msg.Value=[Metamorphosis! 20:29:19.487480]
msg.Value=[Metamorphosis! 20:29:20.491742]
msg.Value=[Metamorphosis! 20:29:21.496180]
msg.Value=[Metamorphosis! 20:29:22.500724]
msg.Value=[Metamorphosis! 20:29:23.505113]
msg.Value=[Metamorphosis! 20:29:24.509501]
msg.Value=[Metamorphosis! 20:29:25.513620]
msg.Value=[Metamorphosis! 20:29:26.517824]
msg.Value=[Metamorphosis! 20:29:27.522053]
msg.Value=[Metamorphosis! 20:29:28.526435]
msg.Value=[Metamorphosis! 20:29:29.530428]
msg.Value=[Metamorphosis! 20:29:30.534616]
...
$ go run kafka_consumer.go
msg.Value=[Metamorphosis! 20:29:17.478683]
msg.Value=[Metamorphosis! 20:29:18.483005]
msg.Value=[Metamorphosis! 20:29:19.487480]
msg.Value=[Metamorphosis! 20:29:20.491742]
msg.Value=[Metamorphosis! 20:29:21.496180]
msg.Value=[Metamorphosis! 20:29:22.500724]
msg.Value=[Metamorphosis! 20:29:23.505113]
msg.Value=[Metamorphosis! 20:29:24.509501]
msg.Value=[Metamorphosis! 20:29:25.513620]
msg.Value=[Metamorphosis! 20:29:26.517824]
msg.Value=[Metamorphosis! 20:29:27.522053]
msg.Value=[Metamorphosis! 20:29:28.526435]
msg.Value=[Metamorphosis! 20:29:29.530428]
msg.Value=[Metamorphosis! 20:29:30.534616]
...
Producer側から刻々と送信されてくる時刻等のデータを、各々のCousumer側で随時表示されることが確認できました。
- 次に、障害が発生した場合のKafkaクラスタ動作を確認してみました。
Kafkaクラスタ構成において、broker.id=1のKafkaサーバが、leaderに選定されております。
$ kafka-topics.sh --describe --zookeeper 192.168.100.201:2181 --topic kafkaesque
Topic:kafkaesque PartitionCount:1 ReplicationFactor:2 Configs:
Topic: kafkaesque Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2
Producer側から刻々と送信されてくる時刻等のデータを、各々のCousumer側で随時表示される状態にて、leaderに選定されているKafkaプロセスを、"Ctrl+C"にて強制停止させてみます。するとKafkaクラスタ動作としては、フェイルオーバによって、その後も、各々のCousumer側で随時表示されつづけるはずです。
$ python kafka_producer.py
Metamorphosis! 21:20:14.368620
Metamorphosis! 21:20:15.385962
Metamorphosis! 21:20:16.390473
Metamorphosis! 21:20:17.394782
Metamorphosis! 21:20:18.399187
Metamorphosis! 21:20:19.403492
Metamorphosis! 21:20:20.407589
Metamorphosis! 21:20:21.411884
Metamorphosis! 21:20:22.415718
Metamorphosis! 21:20:23.419357
Metamorphosis! 21:20:24.423614
No handlers could be found for logger "kafka.conn"
FailedPayloadsError: []
Metamorphosis! 21:20:26.430648
Metamorphosis! 21:20:27.452339
Metamorphosis! 21:20:28.458493
Metamorphosis! 21:20:29.464982
Metamorphosis! 21:20:30.470309
Metamorphosis! 21:20:31.476017
Metamorphosis! 21:20:32.481460
Metamorphosis! 21:20:33.488170
Metamorphosis! 21:20:34.493129
Metamorphosis! 21:20:35.498406
Metamorphosis! 21:20:36.503719
...
Producer側サンプルアプリ実行結果としては、21:20:25に、Kafkaサーバとのコネクション断が発生したため、Producer側からのデータ送信は、行われませんでした。
$ go run kafka_consumer.go
2016/01/10 21:19:42 Connected to 192.168.100.201:2181
2016/01/10 21:19:42 Authenticated: id=95184864121651392, timeout=4000
2016-01-10/21:19:42 [WARN] [6707d3a7-f5c3-1dc7-d652-fa941fad8628] Consumer failed to reflect all partitions 0 of 1
2016-01-10/21:19:42 [ERROR] [6707d3a7-f5c3-1dc7-d652-fa941fad8628] Failed to reflect partition ownership during rebalance
2016-01-10/21:19:47 [WARN] [6707d3a7-f5c3-1dc7-d652-fa941fad8628] Consumer failed to reflect all partitions 0 of 1
2016-01-10/21:19:47 [ERROR] [6707d3a7-f5c3-1dc7-d652-fa941fad8628] Failed to reflect partition ownership during rebalance
msg.Value=[Metamorphosis! 21:20:14.368620]
msg.Value=[Metamorphosis! 21:20:15.385962]
msg.Value=[Metamorphosis! 21:20:16.390473]
msg.Value=[Metamorphosis! 21:20:17.394782]
msg.Value=[Metamorphosis! 21:20:18.399187]
msg.Value=[Metamorphosis! 21:20:19.403492]
msg.Value=[Metamorphosis! 21:20:20.407589]
msg.Value=[Metamorphosis! 21:20:21.411884]
msg.Value=[Metamorphosis! 21:20:22.415718]
2016-01-10/21:20:24 [WARN] [ConsumerFetcherRoutine-6707d3a7-f5c3-1dc7-d652-fa941fad8628-0] Got a fetch error for topic kafkaesque, partition 0: EOF
2016-01-10/21:20:24 [WARN] [6707d3a7-f5c3-1dc7-d652-fa941fad8628] Consumer failed to reflect all partitions 0 of 1
2016-01-10/21:20:24 [ERROR] [6707d3a7-f5c3-1dc7-d652-fa941fad8628] Failed to reflect partition ownership during rebalance
2016-01-10/21:20:25 [WARN] [ConsumerFetcherRoutine-6707d3a7-f5c3-1dc7-d652-fa941fad8628-0] Got a fetch error for topic kafkaesque, partition 0: EOF
msg.Value=[Metamorphosis! 21:20:23.419357]
msg.Value=[Metamorphosis! 21:20:24.423614]
msg.Value=[Metamorphosis! 21:20:26.430648]
msg.Value=[Metamorphosis! 21:20:27.452339]
2016-01-10/21:20:29 [WARN] [6707d3a7-f5c3-1dc7-d652-fa941fad8628] Consumer failed to reflect all partitions 0 of 1
2016-01-10/21:20:29 [ERROR] [6707d3a7-f5c3-1dc7-d652-fa941fad8628] Failed to reflect partition ownership during rebalance
msg.Value=[Metamorphosis! 21:20:28.458493]
msg.Value=[Metamorphosis! 21:20:29.464982]
msg.Value=[Metamorphosis! 21:20:30.470309]
msg.Value=[Metamorphosis! 21:20:31.476017]
msg.Value=[Metamorphosis! 21:20:32.481460]
2016-01-10/21:20:34 [WARN] [6707d3a7-f5c3-1dc7-d652-fa941fad8628] Consumer failed to reflect all partitions 0 of 1
2016-01-10/21:20:34 [ERROR] [6707d3a7-f5c3-1dc7-d652-fa941fad8628] Failed to reflect partition ownership during rebalance
msg.Value=[Metamorphosis! 21:20:33.488170]
msg.Value=[Metamorphosis! 21:20:34.493129]
msg.Value=[Metamorphosis! 21:20:35.498406]
msg.Value=[Metamorphosis! 21:20:36.503719]
...
$ go run kafka_consumer.go
msg.Value=[Metamorphosis! 21:20:14.368620]
msg.Value=[Metamorphosis! 21:20:15.385962]
msg.Value=[Metamorphosis! 21:20:16.390473]
msg.Value=[Metamorphosis! 21:20:17.394782]
msg.Value=[Metamorphosis! 21:20:18.399187]
msg.Value=[Metamorphosis! 21:20:19.403492]
msg.Value=[Metamorphosis! 21:20:20.407589]
msg.Value=[Metamorphosis! 21:20:21.411884]
msg.Value=[Metamorphosis! 21:20:22.415718]
msg.Value=[Metamorphosis! 21:20:23.419357]
msg.Value=[Metamorphosis! 21:20:24.423614]
2016-01-10/21:20:24 [WARN] [ConsumerFetcherRoutine-d6e58d92-0576-94e5-a316-380d4474cc02-0] Got a fetch error for topic kafkaesque, partition 0: EOF
2016-01-10/21:20:25 [WARN] [d6e58d92-0576-94e5-a316-380d4474cc02] Consumer failed to reflect all partitions 0 of 1
2016-01-10/21:20:25 [ERROR] [d6e58d92-0576-94e5-a316-380d4474cc02] Failed to reflect partition ownership during rebalance
2016-01-10/21:20:25 [WARN] [ConsumerFetcherRoutine-d6e58d92-0576-94e5-a316-380d4474cc02-0] Got a fetch error for topic kafkaesque, partition 0: EOF
msg.Value=[Metamorphosis! 21:20:26.430648]
msg.Value=[Metamorphosis! 21:20:27.452339]
msg.Value=[Metamorphosis! 21:20:28.458493]
msg.Value=[Metamorphosis! 21:20:29.464982]
2016-01-10/21:20:30 [WARN] [d6e58d92-0576-94e5-a316-380d4474cc02] Consumer failed to reflect all partitions 0 of 1
2016-01-10/21:20:30 [ERROR] [d6e58d92-0576-94e5-a316-380d4474cc02] Failed to reflect partition ownership during rebalance
msg.Value=[Metamorphosis! 21:20:30.470309]
msg.Value=[Metamorphosis! 21:20:31.476017]
msg.Value=[Metamorphosis! 21:20:32.481460]
msg.Value=[Metamorphosis! 21:20:33.488170]
msg.Value=[Metamorphosis! 21:20:34.493129]
msg.Value=[Metamorphosis! 21:20:35.498406]
msg.Value=[Metamorphosis! 21:20:36.503719]
Consumer側サンプルアプリ実行結果としては、ところどころで、"WARN", "ERROR"が出力されていますが、Producer側からのデータ送信は、取りこぼしなく、各々のCousumer側で随時表示されることが確認できました。
$ kafka-topics.sh --describe --zookeeper 192.168.100.201:2181 --topic kafkaesque
Topic:kafkaesque PartitionCount:1 ReplicationFactor:2 Configs:
Topic: kafkaesque Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2
最後に、Kafkaクラスタ構成において、leaderに選定されているKafkaプロセスを、"Ctrl+C"にて強制停止させたことにより、broker.id=2のKafkaサーバが、leaderに選定されているようすが確認できました。