Apache Kafkaについて勉強しているのでまとめておきます。
環境
zookeeper 3.4.14 (confluentinc/cp-zookeeper:5.3.2)
Kafka 2.3.1 (confluentinc/cp-kafka:5.3.2)
Dockerイメージを利用しています。
Kafkaの用語理解
ここでは各用語についてざっくりとまとめておきます。
図を作ってまとめようと思ったのですが、伊藤 雅博さんの記事がかなり分かりやすいので、詳細はこちらをご参照ください。
Apache Kafkaの概要とアーキテクチャ
名前 | 役割 |
---|---|
Kafka Cluster | Kafkaが実行されているサーバ(Broker)をグループ化したもの |
Broker | Kafkaの単一サーバ |
Zookeeper | Kafkaを管理するサーバ |
Producer | Kafkaへメッセージを送信するアプリケーション |
Consumer | Kafkaからメッセージを取得するアプリケーション |
Topic | メッセージを整理するためのカテゴリー |
Partition | Topic内のメッセージはパーティションという単位で分散させています |
Replica | 各Partitionは複数のBrokerに複製(Replica)されています |
Leader | 複製されているReplicaのうち唯一読み書きが許可されているReplica |
Consumer Group | 複数のConsumerを同一グループとして扱うためのもの。グループ化することで分散したConsumer間で同一メッセージを重複せずに読み込むことが可能です |
Offset | Partition単位でメッセージをどこまで読んだか管理するためのもの |
Topic操作
kafka-topics
コマンドを利用してトピックを操作します。
共通オプション
オプション名 | 役割 |
---|---|
zookeeper | Zookeeperを指定します |
topic | 操作対象のトピックを指定します |
partitions | メッセージを分割するPartition数を指定します |
replication-factor | PartitionをいくつのBrokerに複製するか指定します |
Topic作成
kafka-topics --create
がトピック作成コマンドになります。
トピック作成サンプル
test-topicというトピック名でpartition数3、Replica数3で保存する場合
$ kafka-topics --create --zookeeper zookeeper:2181 --topic test-topic --partitions 3 --replication-factor 3
Created topic test-topic.
Topicの確認
kafka-topics --describe
でトピックを確認します。
トピックの内容を確認するサンプル
$ kafka-topics --describe --zookeeper zookeeper:2181 --topic test-topic
Topic:test-topic PartitionCount:3 ReplicationFactor:3 Configs:
Topic: test-topic Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
Topic: test-topic Partition: 1 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
Topic: test-topic Partition: 2 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
こちらの出力結果はそれぞれ以下のようになります。
オプション名 | 役割 |
---|---|
PartitionCount | 対象のトピックがいくつのPartitionに分割されるかが記載されます |
ReplicationFactor | 対象のトピックがいくつReplicaされるかが記載されています。ここにはLeaderも含まれます |
Partition | Partition番号が記載されます |
Leader | どこのBrokerに保存されているかが記載されます |
Replicas | どこのBrokerに保存されているかが記載されます。今回はReplicationFactorが3つ指定されているため、3つのBrokerが記載されています。 |
Isr | In Sync Replica(Isr)の略で、対象Partitionが全て同期されている状態のBrokerが記載されます |
Brokerがダウンしている場合の状況
Kafka ClusterにBrokerが3つある場合のうち、1つのBrokerがダウンしている場合の状態もサンプルで載せておきます。
kafka-topics --describe --zookeeper zookeeper:2181 --topic test-topic
Topic:test-topic PartitionCount:3 ReplicationFactor:3 Configs:
Topic: test-topic Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3,1
Topic: test-topic Partition: 1 Leader: 1 Replicas: 1,3,2 Isr: 1,3
Topic: test-topic Partition: 2 Leader: 1 Replicas: 2,1,3 Isr: 1,3
このようにもともとPartitionが1のもののLeaderは2のBrokerを利用していましたが、1のBrokerに切り替わりました。
また、Isrの状態も1と3のBrokerのみが記載されており、Broker2はIsrになっていないことが分かります。
Topicの一覧表示
kafka-topics --list
でトピックを確認します。
トピックの内容を確認するサンプル
$ kafka-topics --list --zookeeper zookeeper:2181
__confluent.support.metrics
__consumer_offsets
test-topic
test-topic1
Topicの削除
kafka-topics --delete
でトピックを削除します。
トピック削除サンプル
$ kafka-topics --delete --zookeeper zookeeper:2181 --topic test-topic
Topic test-topic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
$ kafka-topics --describe --zookeeper zookeeper:2181 --topic test-topic
Error while executing topic command : Topic 'test-topic' does not exist as expected
[2020-02-09 13:03:11,678] ERROR java.lang.IllegalArgumentException: Topic 'test-topic' does not exist as expected
at kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:437)
at kafka.admin.TopicCommand$ZookeeperTopicService.describeTopic(TopicCommand.scala:349)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:66)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
(kafka.admin.TopicCommand$)
Topicの設定変更
kafka-topics --alter
でトピックの設定を変更します。
トピック変更サンプル
$ kafka-topics --alter --zookeeper zookeeper:2181 --topic test-topic --partitions 6
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
$ kafka-topics --describe --zookeeper zookeeper:2181 --topic test-topic
Topic:test-topic PartitionCount:6 ReplicationFactor:3 Configs:
Topic: test-topic Partition: 0 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
Topic: test-topic Partition: 1 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
Topic: test-topic Partition: 2 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
Topic: test-topic Partition: 3 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
Topic: test-topic Partition: 4 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
Topic: test-topic Partition: 5 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
ちなみにalertオプションではreplication-factor
は指定できません。
また、partitionsを減らすことはできません。
Producer操作
kafka-console-producer
でProducerとして操作することが可能です。
メッセージを送信する
kafka-console-producer --topic=test-topic --broker-list=kafka1:29092,kafka2:29093,kafka3:29094
>test1
>test2
>
このようにkafka-console-producer
を実行すると、入力プロンプト>
が表示されるため、送信したい文字を入力し実行すると送信されます。
broker-list
には、Kafka Clusterに存在するBrokerをカンマ区切りで指定します。
今回の場合は、Brokerは3台なので3つ指定しています。
Consumer操作
kafka-console-consumer
でConsumerとして操作することが可能です。
メッセージを受信する
$ kafka-console-consumer --bootstrap-server=kafka1:29092 --topic=test-topic
test1
test2
各種オプション
オプション名 | 役割 | 使い方サンプル |
---|---|---|
from-beginning | offsetの最初から読み込みます | kafka-console-consumer --bootstrap-server=kafka1:29092 --topic=test-topic --from-beginning |
group | Consumer-groupを指定します | kafka-console-consumer --bootstrap-server=kafka1:29092 --topic=test-topic --group consumer-group |
ConsumerGroup操作
kafka-consumer-groups
でConsumerGroupの操作をすることが可能です。
詳細を表示する
$ kafka-consumer-groups --describe --bootstrap-server kafka1:29092 --group consumer-group
Consumer group 'consumer-group' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
consumer-group test-topic 4 12 12 0 - - -
consumer-group test-topic 5 10 10 0 - - -
consumer-group test-topic 2 7 7 0 - - -
consumer-group test-topic 3 9 9 0 - - -
consumer-group test-topic 1 10 10 0 - - -
consumer-group test-topic 0 20 20 0 - - -
ConsumerGroupのoffsetをリセットする
--reset-offsets
を利用してリセットします。
kafka-consumer-groups --bootstrap-server kafka3:29094 --group consumer-group --reset-offsets --to-earliest --all-topics --execute
GROUP TOPIC PARTITION NEW-OFFSET
consumer-group test-topic 4 0
consumer-group test-topic 5 0
consumer-group test-topic 2 0
consumer-group test-topic 3 0
consumer-group test-topic 1 0
consumer-group test-topic 0 0
参考
参考文献
Apache Kafka Document
Apache Kafkaの概要とアーキテクチャ
docker-compose
今回利用したdocker-compose.ymlになります。
version: "3"
services:
zookeeper:
container_name: kafkajs-typescript-test-zookeeper
hostname: zookeeper
image: confluentinc/cp-zookeeper:5.3.2
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka1:
container_name: kafkajs-typescript-test-kafka1
hostname: kafka1
image: confluentinc/cp-kafka:5.3.2
depends_on:
- zookeeper
ports:
- "29092:29092"
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
kafka2:
container_name: kafkajs-typescript-test-kafka2
hostname: kafka2
image: confluentinc/cp-kafka:5.3.2
depends_on:
- zookeeper
ports:
- "29093:29093"
- "9093:9093"
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:29093,PLAINTEXT_HOST://localhost:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
kafka3:
container_name: kafkajs-typescript-test-kafka3
hostname: kafka3
image: confluentinc/cp-kafka:5.3.2
depends_on:
- zookeeper
ports:
- "29094:29094"
- "9094:9094"
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:29094,PLAINTEXT_HOST://localhost:9094
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
参考までにですが、kafka-clusterが3つ未満の場合は以下のようなエラーが出力します。
[2020-03-21 07:43:52,872] ERROR [KafkaApi-1] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)
この場合は、KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
の設定をしてあげる必要があります。(数値はclusterの数)