41
36

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Apache Kafkaの用語とコマンド

Last updated at Posted at 2020-02-11

Apache Kafkaについて勉強しているのでまとめておきます。

環境

zookeeper 3.4.14 (confluentinc/cp-zookeeper:5.3.2)
Kafka 2.3.1 (confluentinc/cp-kafka:5.3.2)

Dockerイメージを利用しています。

Kafkaの用語理解

ここでは各用語についてざっくりとまとめておきます。

図を作ってまとめようと思ったのですが、伊藤 雅博さんの記事がかなり分かりやすいので、詳細はこちらをご参照ください。
Apache Kafkaの概要とアーキテクチャ

名前 役割
Kafka Cluster Kafkaが実行されているサーバ(Broker)をグループ化したもの
Broker Kafkaの単一サーバ
Zookeeper Kafkaを管理するサーバ
Producer Kafkaへメッセージを送信するアプリケーション
Consumer Kafkaからメッセージを取得するアプリケーション
Topic メッセージを整理するためのカテゴリー
Partition Topic内のメッセージはパーティションという単位で分散させています
Replica 各Partitionは複数のBrokerに複製(Replica)されています
Leader 複製されているReplicaのうち唯一読み書きが許可されているReplica
Consumer Group 複数のConsumerを同一グループとして扱うためのもの。グループ化することで分散したConsumer間で同一メッセージを重複せずに読み込むことが可能です
Offset Partition単位でメッセージをどこまで読んだか管理するためのもの

Topic操作

kafka-topicsコマンドを利用してトピックを操作します。

共通オプション

オプション名 役割
zookeeper Zookeeperを指定します
topic 操作対象のトピックを指定します
partitions メッセージを分割するPartition数を指定します
replication-factor PartitionをいくつのBrokerに複製するか指定します

Topic作成

kafka-topics --createがトピック作成コマンドになります。

トピック作成サンプル

test-topicというトピック名でpartition数3、Replica数3で保存する場合

サンプル
$ kafka-topics --create --zookeeper zookeeper:2181 --topic test-topic --partitions 3 --replication-factor 3
Created topic test-topic.

Topicの確認

kafka-topics --describeでトピックを確認します。

トピックの内容を確認するサンプル

サンプル
$  kafka-topics --describe --zookeeper zookeeper:2181 --topic test-topic
Topic:test-topic	PartitionCount:3	ReplicationFactor:3	Configs:
	Topic: test-topic	Partition: 0	Leader: 3	Replicas: 3,2,1	Isr: 3,2,1
	Topic: test-topic	Partition: 1	Leader: 1	Replicas: 1,3,2	Isr: 1,3,2
	Topic: test-topic	Partition: 2	Leader: 2	Replicas: 2,1,3	Isr: 2,1,3

こちらの出力結果はそれぞれ以下のようになります。

オプション名 役割
PartitionCount 対象のトピックがいくつのPartitionに分割されるかが記載されます
ReplicationFactor 対象のトピックがいくつReplicaされるかが記載されています。ここにはLeaderも含まれます
Partition Partition番号が記載されます
Leader どこのBrokerに保存されているかが記載されます
Replicas どこのBrokerに保存されているかが記載されます。今回はReplicationFactorが3つ指定されているため、3つのBrokerが記載されています。
Isr In Sync Replica(Isr)の略で、対象Partitionが全て同期されている状態のBrokerが記載されます

Brokerがダウンしている場合の状況

Kafka ClusterにBrokerが3つある場合のうち、1つのBrokerがダウンしている場合の状態もサンプルで載せておきます。

2つめのBrokerがダウンしている場合
kafka-topics --describe --zookeeper zookeeper:2181 --topic test-topic
Topic:test-topic	PartitionCount:3	ReplicationFactor:3	Configs:
	Topic: test-topic	Partition: 0	Leader: 3	Replicas: 3,2,1	Isr: 3,1
	Topic: test-topic	Partition: 1	Leader: 1	Replicas: 1,3,2	Isr: 1,3
	Topic: test-topic	Partition: 2	Leader: 1	Replicas: 2,1,3	Isr: 1,3

このようにもともとPartitionが1のもののLeaderは2のBrokerを利用していましたが、1のBrokerに切り替わりました。
また、Isrの状態も1と3のBrokerのみが記載されており、Broker2はIsrになっていないことが分かります。

Topicの一覧表示

kafka-topics --listでトピックを確認します。

トピックの内容を確認するサンプル

$ kafka-topics --list --zookeeper zookeeper:2181
__confluent.support.metrics
__consumer_offsets
test-topic
test-topic1

Topicの削除

kafka-topics --deleteでトピックを削除します。

トピック削除サンプル

$ kafka-topics --delete --zookeeper zookeeper:2181 --topic test-topic
Topic test-topic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.


$ kafka-topics --describe --zookeeper zookeeper:2181 --topic test-topic
Error while executing topic command : Topic 'test-topic' does not exist as expected
[2020-02-09 13:03:11,678] ERROR java.lang.IllegalArgumentException: Topic 'test-topic' does not exist as expected
	at kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:437)
	at kafka.admin.TopicCommand$ZookeeperTopicService.describeTopic(TopicCommand.scala:349)
	at kafka.admin.TopicCommand$.main(TopicCommand.scala:66)
	at kafka.admin.TopicCommand.main(TopicCommand.scala)
 (kafka.admin.TopicCommand$)

Topicの設定変更

kafka-topics --alterでトピックの設定を変更します。

トピック変更サンプル

Partitionをさらに分割する
$ kafka-topics --alter --zookeeper zookeeper:2181 --topic test-topic --partitions 6 
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!


$ kafka-topics --describe --zookeeper zookeeper:2181 --topic test-topic
Topic:test-topic	PartitionCount:6	ReplicationFactor:3	Configs:
	Topic: test-topic	Partition: 0	Leader: 1	Replicas: 1,3,2	Isr: 1,3,2
	Topic: test-topic	Partition: 1	Leader: 2	Replicas: 2,1,3	Isr: 2,1,3
	Topic: test-topic	Partition: 2	Leader: 3	Replicas: 3,2,1	Isr: 3,2,1
	Topic: test-topic	Partition: 3	Leader: 1	Replicas: 1,3,2	Isr: 1,3,2
	Topic: test-topic	Partition: 4	Leader: 2	Replicas: 2,1,3	Isr: 2,1,3
	Topic: test-topic	Partition: 5	Leader: 3	Replicas: 3,2,1	Isr: 3,2,1

ちなみにalertオプションではreplication-factorは指定できません。
また、partitionsを減らすことはできません。

Producer操作

kafka-console-producerでProducerとして操作することが可能です。

メッセージを送信する

kafka-console-producer --topic=test-topic --broker-list=kafka1:29092,kafka2:29093,kafka3:29094
>test1
>test2
>

このようにkafka-console-producerを実行すると、入力プロンプト>が表示されるため、送信したい文字を入力し実行すると送信されます。

broker-listには、Kafka Clusterに存在するBrokerをカンマ区切りで指定します。
今回の場合は、Brokerは3台なので3つ指定しています。

Consumer操作

kafka-console-consumerでConsumerとして操作することが可能です。

メッセージを受信する

$ kafka-console-consumer --bootstrap-server=kafka1:29092 --topic=test-topic
test1
test2

各種オプション

オプション名 役割 使い方サンプル
from-beginning offsetの最初から読み込みます kafka-console-consumer --bootstrap-server=kafka1:29092 --topic=test-topic --from-beginning
group Consumer-groupを指定します kafka-console-consumer --bootstrap-server=kafka1:29092 --topic=test-topic --group consumer-group

ConsumerGroup操作

kafka-consumer-groupsでConsumerGroupの操作をすることが可能です。

詳細を表示する

$ kafka-consumer-groups --describe --bootstrap-server kafka1:29092 --group consumer-group
Consumer group 'consumer-group' has no active members.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
consumer-group  test-topic      4          12              12              0               -               -               -
consumer-group  test-topic      5          10              10              0               -               -               -
consumer-group  test-topic      2          7               7               0               -               -               -
consumer-group  test-topic      3          9               9               0               -               -               -
consumer-group  test-topic      1          10              10              0               -               -               -
consumer-group  test-topic      0          20              20              0               -               -               -

ConsumerGroupのoffsetをリセットする

--reset-offsetsを利用してリセットします。

kafka-consumer-groups --bootstrap-server kafka3:29094 --group consumer-group --reset-offsets --to-earliest --all-topics --execute

GROUP                          TOPIC                          PARTITION  NEW-OFFSET     
consumer-group                 test-topic                     4          0              
consumer-group                 test-topic                     5          0              
consumer-group                 test-topic                     2          0              
consumer-group                 test-topic                     3          0              
consumer-group                 test-topic                     1          0              
consumer-group                 test-topic                     0          0  

参考

参考文献

Apache Kafka Document
Apache Kafkaの概要とアーキテクチャ

docker-compose

今回利用したdocker-compose.ymlになります。

docker-compose.yml
version: "3"

services:
  zookeeper:
    container_name: kafkajs-typescript-test-zookeeper
    hostname: zookeeper
    image: confluentinc/cp-zookeeper:5.3.2
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
  kafka1:
    container_name: kafkajs-typescript-test-kafka1
    hostname: kafka1
    image: confluentinc/cp-kafka:5.3.2
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
  kafka2:
    container_name: kafkajs-typescript-test-kafka2
    hostname: kafka2
    image: confluentinc/cp-kafka:5.3.2
    depends_on:
      - zookeeper
    ports:
      - "29093:29093"
      - "9093:9093"
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:29093,PLAINTEXT_HOST://localhost:9093
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
  kafka3:
    container_name: kafkajs-typescript-test-kafka3
    hostname: kafka3
    image: confluentinc/cp-kafka:5.3.2
    depends_on:
      - zookeeper
    ports:
      - "29094:29094"
      - "9094:9094"
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:29094,PLAINTEXT_HOST://localhost:9094
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

参考までにですが、kafka-clusterが3つ未満の場合は以下のようなエラーが出力します。

[2020-03-21 07:43:52,872] ERROR [KafkaApi-1] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)

この場合は、KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1の設定をしてあげる必要があります。(数値はclusterの数)

41
36
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
41
36

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?