0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Confluent Platform メモ - (2)メッセージ送受信簡易テスト

Last updated at Posted at 2021-08-17

はじめに

Confluent Platformを使ってみた時のメモ書きです。
今回はコマンドから簡易的にメッセージのやりとりを実施してみます。

関連記事

Confluent Platform メモ - (1)環境構築
Confluent Platform メモ - (2)メッセージ送受信簡易テスト
Confluent Platform メモ - (3)Schema Registry簡易テスト

参考情報

Kafka Commands Primer
Apache Kafkaの用語とコマンド
Apache Kafkaの概要とアーキテクチャ

Kafkaコマンド

/opt/confluent-6.2.0/bin/以下に提供されます。
(これらも実体はシェル・スクリプトで最終的にはJavaプログラムが動くようです)

kafka-topics

Broker上にTopicを作成したりTopicの属性を確認したりするためのコマンドです。

kafka-topics --help
[root@test12 /opt/confluent-6.2.0/bin]# ./kafka-topics --help
This tool helps to create, delete, describe, or change a topic.
Option                                   Description
------                                   -----------
--alter                                  Alter the number of partitions,
                                           replica assignment, and/or
                                           configuration for the topic.
--at-min-isr-partitions                  if set when describing topics, only
                                           show partitions whose isr count is
                                           equal to the configured minimum. Not
                                           supported with the --zookeeper
                                           option.
--bootstrap-server <String: server to    REQUIRED: The Kafka server to connect
  connect to>                              to. In case of providing this, a
                                           direct Zookeeper connection won't be
                                           required.
--command-config <String: command        Property file containing configs to be
  config property file>                    passed to Admin Client. This is used
                                           only with --bootstrap-server option
                                           for describing and altering broker
                                           configs.
--config <String: name=value>            A topic configuration override for the
                                           topic being created or altered. The
                                           following is a list of valid
                                           configurations:
                                                cleanup.policy
                                                compression.type
                                                delete.retention.ms
                                                file.delete.delay.ms
                                                flush.messages
                                                flush.ms
                                                follower.replication.throttled.
                                           replicas
                                                index.interval.bytes
                                                leader.replication.throttled.replicas
                                                max.compaction.lag.ms
                                                max.message.bytes
                                                message.downconversion.enable
                                                message.format.version
                                                message.timestamp.difference.max.ms
                                                message.timestamp.type
                                                min.cleanable.dirty.ratio
                                                min.compaction.lag.ms
                                                min.insync.replicas
                                                preallocate
                                                retention.bytes
                                                retention.ms
                                                segment.bytes
                                                segment.index.bytes
                                                segment.jitter.ms
                                                segment.ms
                                                unclean.leader.election.enable
                                         See the Kafka documentation for full
                                           details on the topic configs. It is
                                           supported only in combination with --
                                           create if --bootstrap-server option
                                           is used (the kafka-configs CLI
                                           supports altering topic configs with
                                           a --bootstrap-server option).
--create                                 Create a new topic.
--delete                                 Delete a topic
--delete-config <String: name>           A topic configuration override to be
                                           removed for an existing topic (see
                                           the list of configurations under the
                                           --config option). Not supported with
                                           the --bootstrap-server option.
--describe                               List details for the given topics.
--disable-rack-aware                     Disable rack aware replica assignment
--exclude-internal                       exclude internal topics when running
                                           list or describe command. The
                                           internal topics will be listed by
                                           default
--force                                  Suppress console prompts
--help                                   Print usage information.
--if-exists                              if set when altering or deleting or
                                           describing topics, the action will
                                           only execute if the topic exists.
--if-not-exists                          if set when creating topics, the
                                           action will only execute if the
                                           topic does not already exist.
--list                                   List all available topics.
--partitions <Integer: # of partitions>  The number of partitions for the topic
                                           being created or altered (WARNING:
                                           If partitions are increased for a
                                           topic that has a key, the partition
                                           logic or ordering of the messages
                                           will be affected). If not supplied
                                           for create, defaults to the cluster
                                           default.
--replica-assignment <String:            A list of manual partition-to-broker
  broker_id_for_part1_replica1 :           assignments for the topic being
  broker_id_for_part1_replica2 ,           created or altered.
  broker_id_for_part2_replica1 :
  broker_id_for_part2_replica2 , ...>
--replication-factor <Integer:           The replication factor for each
  replication factor>                      partition in the topic being
                                           created. If not supplied, defaults
                                           to the cluster default.
--topic <String: topic>                  The topic to create, alter, describe
                                           or delete. It also accepts a regular
                                           expression, except for --create
                                           option. Put topic name in double
                                           quotes and use the '\' prefix to
                                           escape regular expression symbols; e.
                                           g. "test\.topic".
--topics-with-overrides                  if set when describing topics, only
                                           show topics that have overridden
                                           configs
--unavailable-partitions                 if set when describing topics, only
                                           show partitions whose leader is not
                                           available
--under-min-isr-partitions               if set when describing topics, only
                                           show partitions whose isr count is
                                           less than the configured minimum.
                                           Not supported with the --zookeeper
                                           option.
--under-replicated-partitions            if set when describing topics, only
                                           show under replicated partitions
--version                                Display Kafka version.
--zookeeper <String: hosts>              DEPRECATED, The connection string for
                                           the zookeeper connection in the form
                                           host:port. Multiple hosts can be
                                           given to allow fail-over.

kafka-console-producer

Producerとしての役割を担うコマンドです。シェルから対話的にメッセージを送信することができます。

kafka-console-producer --help
[root@test12 /opt/confluent-6.2.0/bin]# ./kafka-console-producer --help
This tool helps to read data from standard input and publish it to Kafka.
Option                                   Description
------                                   -----------
--batch-size <Integer: size>             Number of messages to send in a single
                                           batch if they are not being sent
                                           synchronously. (default: 200)
--bootstrap-server <String: server to    REQUIRED unless --broker-list
  connect to>                              (deprecated) is specified. The server
                                           (s) to connect to. The broker list
                                           string in the form HOST1:PORT1,HOST2:
                                           PORT2.
--broker-list <String: broker-list>      DEPRECATED, use --bootstrap-server
                                           instead; ignored if --bootstrap-
                                           server is specified.  The broker
                                           list string in the form HOST1:PORT1,
                                           HOST2:PORT2.
--compression-codec [String:             The compression codec: either 'none',
  compression-codec]                       'gzip', 'snappy', 'lz4', or 'zstd'.
                                           If specified without value, then it
                                           defaults to 'gzip'
--help                                   Print usage information.
--line-reader <String: reader_class>     The class name of the class to use for
                                           reading lines from standard in. By
                                           default each line is read as a
                                           separate message. (default: kafka.
                                           tools.
                                           ConsoleProducer$LineMessageReader)
--max-block-ms <Long: max block on       The max time that the producer will
  send>                                    block for during a send request
                                           (default: 60000)
--max-memory-bytes <Long: total memory   The total memory used by the producer
  in bytes>                                to buffer records waiting to be sent
                                           to the server. (default: 33554432)
--max-partition-memory-bytes <Long:      The buffer size allocated for a
  memory in bytes per partition>           partition. When records are received
                                           which are smaller than this size the
                                           producer will attempt to
                                           optimistically group them together
                                           until this size is reached.
                                           (default: 16384)
--message-send-max-retries <Integer>     Brokers can fail receiving the message
                                           for multiple reasons, and being
                                           unavailable transiently is just one
                                           of them. This property specifies the
                                           number of retries before the
                                           producer give up and drop this
                                           message. (default: 3)
--metadata-expiry-ms <Long: metadata     The period of time in milliseconds
  expiration interval>                     after which we force a refresh of
                                           metadata even if we haven't seen any
                                           leadership changes. (default: 300000)
--producer-property <String:             A mechanism to pass user-defined
  producer_prop>                           properties in the form key=value to
                                           the producer.
--producer.config <String: config file>  Producer config properties file. Note
                                           that [producer-property] takes
                                           precedence over this config.
--property <String: prop>                A mechanism to pass user-defined
                                           properties in the form key=value to
                                           the message reader. This allows
                                           custom configuration for a user-
                                           defined message reader. Default
                                           properties include:
                                                parse.key=true|false
                                                key.separator=<key.separator>
                                                ignore.error=true|false
--request-required-acks <String:         The required acks of the producer
  request required acks>                   requests (default: 1)
--request-timeout-ms <Integer: request   The ack timeout of the producer
  timeout ms>                              requests. Value must be non-negative
                                           and non-zero (default: 1500)
--retry-backoff-ms <Integer>             Before each retry, the producer
                                           refreshes the metadata of relevant
                                           topics. Since leader election takes
                                           a bit of time, this property
                                           specifies the amount of time that
                                           the producer waits before refreshing
                                           the metadata. (default: 100)
--socket-buffer-size <Integer: size>     The size of the tcp RECV size.
                                           (default: 102400)
--sync                                   If set message send requests to the
                                           brokers are synchronously, one at a
                                           time as they arrive.
--timeout <Integer: timeout_ms>          If set and the producer is running in
                                           asynchronous mode, this gives the
                                           maximum amount of time a message
                                           will queue awaiting sufficient batch
                                           size. The value is given in ms.
                                           (default: 1000)
--topic <String: topic>                  REQUIRED: The topic id to produce
                                           messages to.
--version                                Display Kafka version.

kafka-console-consumer

Consumerとしての役割を担うコマンドです。シェルからメッセージを受信することができます。

kafka-console-consumer --help
[root@test12 /opt/confluent-6.2.0/bin]# ./kafka-console-consumer --help
This tool helps to read data from Kafka topics and outputs it to standard output.
Option                                   Description
------                                   -----------
--bootstrap-server <String: server to    REQUIRED: The server(s) to connect to.
  connect to>
--consumer-property <String:             A mechanism to pass user-defined
  consumer_prop>                           properties in the form key=value to
                                           the consumer.
--consumer.config <String: config file>  Consumer config properties file. Note
                                           that [consumer-property] takes
                                           precedence over this config.
--enable-systest-events                  Log lifecycle events of the consumer
                                           in addition to logging consumed
                                           messages. (This is specific for
                                           system tests.)
--formatter <String: class>              The name of a class to use for
                                           formatting kafka messages for
                                           display. (default: kafka.tools.
                                           DefaultMessageFormatter)
--from-beginning                         If the consumer does not already have
                                           an established offset to consume
                                           from, start with the earliest
                                           message present in the log rather
                                           than the latest message.
--group <String: consumer group id>      The consumer group id of the consumer.
--help                                   Print usage information.
--isolation-level <String>               Set to read_committed in order to
                                           filter out transactional messages
                                           which are not committed. Set to
                                           read_uncommitted to read all
                                           messages. (default: read_uncommitted)
--key-deserializer <String:
  deserializer for key>
--max-messages <Integer: num_messages>   The maximum number of messages to
                                           consume before exiting. If not set,
                                           consumption is continual.
--offset <String: consume offset>        The offset id to consume from (a non-
                                           negative number), or 'earliest'
                                           which means from beginning, or
                                           'latest' which means from end
                                           (default: latest)
--partition <Integer: partition>         The partition to consume from.
                                           Consumption starts from the end of
                                           the partition unless '--offset' is
                                           specified.
--property <String: prop>                The properties to initialize the
                                           message formatter. Default
                                           properties include:
                                          print.timestamp=true|false
                                          print.key=true|false
                                          print.offset=true|false
                                          print.partition=true|false
                                          print.headers=true|false
                                          print.value=true|false
                                          key.separator=<key.separator>
                                          line.separator=<line.separator>
                                          headers.separator=<line.separator>
                                          null.literal=<null.literal>
                                          key.deserializer=<key.deserializer>
                                          value.deserializer=<value.
                                           deserializer>
                                          header.deserializer=<header.
                                           deserializer>
                                         Users can also pass in customized
                                           properties for their formatter; more
                                           specifically, users can pass in
                                           properties keyed with 'key.
                                           deserializer.', 'value.
                                           deserializer.' and 'headers.
                                           deserializer.' prefixes to configure
                                           their deserializers.
--skip-message-on-error                  If there is an error when processing a
                                           message, skip it instead of halt.
--timeout-ms <Integer: timeout_ms>       If specified, exit if no message is
                                           available for consumption for the
                                           specified interval.
--topic <String: topic>                  The topic id to consume on.
--value-deserializer <String:
  deserializer for values>
--version                                Display Kafka version.
--whitelist <String: whitelist>          Regular expression specifying
                                           whitelist of topics to include for
                                           consumption.

kafka-delete-records

KafkaのメッセージはMQとは違ってConsumerでメッセージ受信してもTopicに残ったままになっているようです。結構衝撃の事実でした。
Consumerでメッセージを受信すると"Consumed"とマークされるだけでそのタイミングでは消されません。そのため複数のConsumerから何度も同じメッセージを読むことができるようです。
じゃぁいつ消えるのよ!というのが気になって調べると、以下の様な記述がありました。

Top 5 Things Every Apache Kafka Developer Should Know - Tip #4: Master the command line tools - Delete records

Kafka stores records for topics on disk and retains that data even once consumers have read it. However, records aren’t stored in one big file but are broken up into segments by partition where the offset order is continuous across segments for the same topic partition. Because servers do not have infinite amounts of storage, Kafka provides settings to control how much data is retained, based on time and size:

  • The time configuration controlling data retention is log.retention.hours, which defaults to 168 hours (one week)
  • The size configuration log.retention.bytes controls how large segments can grow before they are eligible for deletion

時間とサイズで消すタイミングがコントロールされているようですが、なんとデフォルトでは7日間保持されるらしい。
上の記事のリンクにはメッセージの手動での削除方法が記載されています。どのメッセージを消すかをJSONファイルとして作成してそれを引数にkafka-delete-recordsコマンドを実行するらしいです。なんと面倒な!全メッセージ消すのであれば、Topic自体を削除/再作成するのが早いかもしれません。

kafka-delete-records --help
[root@test12 /opt/confluent-6.2.0/bin]# ./kafka-delete-records --help
This tool helps to delete records of the given partitions down to the specified offset.
Option                                 Description
------                                 -----------
--bootstrap-server <String: server(s)  REQUIRED: The server to connect to.
  to use for bootstrapping>
--command-config <String: command      A property file containing configs to
  config property file path>             be passed to Admin Client.
--help                                 Print usage information.
--offset-json-file <String: Offset     REQUIRED: The JSON file with offset
  json file path>                        per partition. The format to use is:
                                       {"partitions":
                                         [{"topic": "foo", "partition": 1,
                                         "offset": 1}],
                                        "version":1
                                       }
--version                              Display Kafka version.

offset-json-file例

offsets.json
{
   "partitions": [
                  {"topic": "cool-topic", "partition": 0, "offset": -1}
                 ],
                 "version":1
}

offsetに-1を指定すると全メッセージが削除されるようです。

操作例

以下の記述を参考にいくつかコマンドを試してみます。
Kafka Commands Primer

イメージ:
image.png

Topic作成

cool-topicという名前のTopicを作成してみます。

[root@test12 /opt/confluent-6.2.0/bin]# ./kafka-topics --create --topic cool-topic --bootstrap-server localhost:9092
Created topic cool-topic.

[root@test12 /opt/confluent-6.2.0/bin]# ./kafka-topics --list --bootstrap-server localhost:9092
__consumer_offsets
_schemas
cool-topic

cool-topicの詳細情報を確認します。

[root@test12 /opt/confluent-6.2.0/bin]# ./kafka-topics --describe --topic cool-topic --bootstrap-server localhost:9092
Topic: cool-topic       TopicId: Z8vfn8oqQW6LwUl9N_myAA PartitionCount: 1       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: cool-topic       Partition: 0    Leader: 0       Replicas: 0     Isr: 0

Producer / メッセージ送信

kafka-console-producerコマンドを実行するとプロンプト(>)が表示されるので、そこに送信したいメッセージを入力していきます。エンターを押すたびにその一行分の文字列が1メッセージとして送信されます。

[root@test12 /opt/confluent-6.2.0/bin]# ./kafka-console-producer --topic cool-topic --bootstrap-server localhost:9092
>hello
>first
>second
>third
>end
>^C

最後にCtrl+Cで抜けます。
これで5つのメッセージが送信されました。

Consumer / メッセージ受信

cool-topicのメッセージを受信します。

[root@test12 /opt/confluent-6.2.0/bin]# ./kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic cool-topic
hello
first
second
third
end
^CProcessed a total of 5 messages

最後にCtrl+Cで抜けます。

Topic削除

cool-topicを削除します。

[root@test12 /opt/confluent-6.2.0/bin]# ./kafka-topics --delete --topic cool-topic --bootstrap-server localhost:9092

[root@test12 /opt/confluent-6.2.0/bin]# ./kafka-topics --list --bootstrap-server localhost:9092
__consumer_offsets
_schemas
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?