はじめに
Apache Kafkaのクライアント・アプリケーション(Java)のログ設定について記述します。
ユーザー・コーディング部分のログではなく、Kafkaのクライアント・ライブラリが出力するログの設定になります。Kafkaのログというと、Kafkaが扱うデータ(レコード)の格納先としてのログをイメージされる方もいるかと思いますが、ここでは、アプリケーションのデバッグやトレースのために出力するログのことを指しています。
前提
- Apache Kafka 3.9.0
- SLF4J 2.0.16
- SLF4J Reload4j Provider 2.0.16
Apache Kafkaではサンプルのログ設定ファイルとしてLog4j 1.x形式の設定ファイル(log4j.properties, tools-log4j.properties等)が同梱されています。
クライアント・アプリケーションでも同レベルの設定ファイルが利用できるよう、今回は、SLF4J Reload4j Providerをロギングの実装として利用します。
補足として、Log4j 2.x形式の設定ファイル(log4j2.xml, log4j2.properties, etc)を利用する場合の設定も最後に記載します。
アプリケーションの開発ツールはVSCodeを利用し、ビルド、実行はmavenを利用します。
すでに正常に稼働するKafkaクライアント・アプリケーションが存在することを前提として、ログ出力のための追加の設定を以下に記載します。
pom.xmlの設定
SLF4JとSLF4J Reload4j Providerを利用するため、pom.xmlのdependencies
に以下を追加します。(kafka-clientsは省略)
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.16</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
<version>2.0.16</version>
</dependency>
ログ設定ファイル(log4j.properties)の設定
log4j.properties
の設定は任意です。
出力したいログのレベル、出力先を任意にLog4j 1.xの形式で指定します。
以下は、DEBUGレベルで標準出力と指定ファイルに出力する例です。
log4j.rootLogger=DEBUG, stdout, myAppender
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.stdout.Target=System.out
log4j.appender.myAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.myAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.myAppender.File=/path/to/myprogram_debug.log
log4j.appender.myAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.myAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
設定ファイルの配置
デフォルトでは下記ディレクトリに配置したlog4j.properties
を利用します。
<project dir>/src/main/resources
基本的にはクラスパスに指定したディレクトリ下のlog4j.properties
を利用します。
任意の設定ファイルを指定する場合は、JavaVMの以下のシステム・プロパティで指定できます。
log4j.configuration
指定例(launch.jsonで指定する場合)
{
"type": "java",
"name": "KafkaProducerApp",
"request": "launch",
"mainClass": "com.example.KafkaProducerApp",
"projectName": "kafka",
"vmArgs": [
"-Dlog4j.configuration=file:/path/to/my-log4j.properties"
]
}
実行
上記設定後、Kafkaのクライアント・アプリケーションを実行すると、指定した出力先にログが出力されます。
DEBUGレベルでProducerアプリを実行した時の標準出力例
[2025-02-03 20:11:05,871] INFO ProducerConfig values:
acks = -1
auto.include.jmx.reporter = true
batch.size = 16384
bootstrap.servers = [kafka:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-1
compression.gzip.level = -1
compression.lz4.level = 9
compression.type = none
compression.zstd.level = 3
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = true
enable.metrics.push = true
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metadata.recovery.strategy = none
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.adaptive.partitioning.enable = true
partitioner.availability.timeout.ms = 0
partitioner.class = null
partitioner.ignore.keys = false
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.max.ms = 1000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = GSSAPI
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.header.urlencode = false
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
(org.apache.kafka.clients.producer.ProducerConfig)
[2025-02-03 20:11:05,941] INFO initializing Kafka metrics collector (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,074] INFO [Producer clientId=producer-1] Instantiated an idempotent producer. (org.apache.kafka.clients.producer.KafkaProducer)
[2025-02-03 20:11:06,173] DEBUG [Producer clientId=producer-1] Starting Kafka producer I/O thread. (org.apache.kafka.clients.producer.internals.Sender)
[2025-02-03 20:11:06,180] DEBUG [Producer clientId=producer-1] Transition from state UNINITIALIZED to INITIALIZING (org.apache.kafka.clients.producer.internals.TransactionManager)
[2025-02-03 20:11:06,180] INFO Kafka version: 3.9.0 (org.apache.kafka.common.utils.AppInfoParser)
[2025-02-03 20:11:06,180] INFO Kafka commitId: 84caaa6e9da06435 (org.apache.kafka.common.utils.AppInfoParser)
[2025-02-03 20:11:06,180] INFO Kafka startTimeMs: 1738581066173 (org.apache.kafka.common.utils.AppInfoParser)
[2025-02-03 20:11:06,189] DEBUG [Producer clientId=producer-1] Kafka producer started (org.apache.kafka.clients.producer.KafkaProducer)
[2025-02-03 20:11:06,419] DEBUG [Producer clientId=producer-1] Enqueuing transactional request InitProducerIdRequestData(transactionalId=null, transactionTimeoutMs=2147483647, producerId=-1, producerEpoch=-1) (org.apache.kafka.clients.producer.internals.TransactionManager)
[2025-02-03 20:11:06,420] DEBUG [Producer clientId=producer-1] Initialize connection to node kafka:9092 (id: -1 rack: null) for sending metadata request (org.apache.kafka.clients.NetworkClient)
[2025-02-03 20:11:06,423] DEBUG Resolved host kafka as 127.0.0.1 (org.apache.kafka.clients.ClientUtils)
[2025-02-03 20:11:06,423] DEBUG [Producer clientId=producer-1] Resolved host kafka to addresses [kafka/127.0.0.1] (org.apache.kafka.clients.ClusterConnectionStates)
[2025-02-03 20:11:06,423] DEBUG [Producer clientId=producer-1] Initiating connection to node kafka:9092 (id: -1 rack: null) using address kafka/127.0.0.1 (org.apache.kafka.clients.NetworkClient)
[2025-02-03 20:11:06,458] DEBUG [Producer clientId=producer-1] Created socket with SO_RCVBUF = 326640, SO_SNDBUF = 146988, SO_TIMEOUT = 0 to node -1 (org.apache.kafka.common.network.Selector)
[2025-02-03 20:11:06,461] DEBUG [Producer clientId=producer-1] Completed connection to node -1. Fetching API versions. (org.apache.kafka.clients.NetworkClient)
[2025-02-03 20:11:06,461] DEBUG [Producer clientId=producer-1] Initiating API versions fetch from node -1. (org.apache.kafka.clients.NetworkClient)
[2025-02-03 20:11:06,482] DEBUG [Producer clientId=producer-1] Sending API_VERSIONS request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=4, clientId=producer-1, correlationId=0, headerVersion=2) and timeout 30000 to node -1: ApiVersionsRequestData(clientSoftwareName='apache-kafka-java', clientSoftwareVersion='3.9.0') (org.apache.kafka.clients.NetworkClient)
[2025-02-03 20:11:06,543] DEBUG [Producer clientId=producer-1] Received API_VERSIONS response from node -1 for request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=4, clientId=producer-1, correlationId=0, headerVersion=2): ApiVersionsResponseData(errorCode=0, apiKeys=[ApiVersion(apiKey=0, minVersion=0, maxVersion=11), ApiVersion(apiKey=1, minVersion=0, maxVersion=17), ApiVersion(apiKey=2, minVersion=0, maxVersion=9), ApiVersion(apiKey=3, minVersion=0, maxVersion=12), ApiVersion(apiKey=4, minVersion=0, maxVersion=7), ApiVersion(apiKey=5, minVersion=0, maxVersion=4), ApiVersion(apiKey=6, minVersion=0, maxVersion=8), ApiVersion(apiKey=7, minVersion=0, maxVersion=3), ApiVersion(apiKey=8, minVersion=0, maxVersion=9), ApiVersion(apiKey=9, minVersion=0, maxVersion=9), ApiVersion(apiKey=10, minVersion=0, maxVersion=6), ApiVersion(apiKey=11, minVersion=0, maxVersion=9), ApiVersion(apiKey=12, minVersion=0, maxVersion=4), ApiVersion(apiKey=13, minVersion=0, maxVersion=5), ApiVersion(apiKey=14, minVersion=0, maxVersion=5), ApiVersion(apiKey=15, minVersion=0, maxVersion=5), ApiVersion(apiKey=16, minVersion=0, maxVersion=5), ApiVersion(apiKey=17, minVersion=0, maxVersion=1), ApiVersion(apiKey=18, minVersion=0, maxVersion=4), ApiVersion(apiKey=19, minVersion=0, maxVersion=7), ApiVersion(apiKey=20, minVersion=0, maxVersion=6), ApiVersion(apiKey=21, minVersion=0, maxVersion=2), ApiVersion(apiKey=22, minVersion=0, maxVersion=5), ApiVersion(apiKey=23, minVersion=0, maxVersion=4), ApiVersion(apiKey=24, minVersion=0, maxVersion=5), ApiVersion(apiKey=25, minVersion=0, maxVersion=4), ApiVersion(apiKey=26, minVersion=0, maxVersion=4), ApiVersion(apiKey=27, minVersion=0, maxVersion=1), ApiVersion(apiKey=28, minVersion=0, maxVersion=4), ApiVersion(apiKey=29, minVersion=0, maxVersion=3), ApiVersion(apiKey=30, minVersion=0, maxVersion=3), ApiVersion(apiKey=31, minVersion=0, maxVersion=3), ApiVersion(apiKey=32, minVersion=0, maxVersion=4), ApiVersion(apiKey=33, minVersion=0, maxVersion=2), ApiVersion(apiKey=34, minVersion=0, maxVersion=2), ApiVersion(apiKey=35, minVersion=0, maxVersion=4), ApiVersion(apiKey=36, minVersion=0, maxVersion=2), ApiVersion(apiKey=37, minVersion=0, maxVersion=3), ApiVersion(apiKey=38, minVersion=0, maxVersion=3), ApiVersion(apiKey=39, minVersion=0, maxVersion=2), ApiVersion(apiKey=40, minVersion=0, maxVersion=2), ApiVersion(apiKey=41, minVersion=0, maxVersion=3), ApiVersion(apiKey=42, minVersion=0, maxVersion=2), ApiVersion(apiKey=43, minVersion=0, maxVersion=2), ApiVersion(apiKey=44, minVersion=0, maxVersion=1), ApiVersion(apiKey=45, minVersion=0, maxVersion=0), ApiVersion(apiKey=46, minVersion=0, maxVersion=0), ApiVersion(apiKey=47, minVersion=0, maxVersion=0), ApiVersion(apiKey=48, minVersion=0, maxVersion=1), ApiVersion(apiKey=49, minVersion=0, maxVersion=1), ApiVersion(apiKey=50, minVersion=0, maxVersion=0), ApiVersion(apiKey=51, minVersion=0, maxVersion=0), ApiVersion(apiKey=56, minVersion=0, maxVersion=3), ApiVersion(apiKey=57, minVersion=0, maxVersion=1), ApiVersion(apiKey=58, minVersion=0, maxVersion=0), ApiVersion(apiKey=60, minVersion=0, maxVersion=1), ApiVersion(apiKey=61, minVersion=0, maxVersion=0), ApiVersion(apiKey=65, minVersion=0, maxVersion=0), ApiVersion(apiKey=66, minVersion=0, maxVersion=1), ApiVersion(apiKey=67, minVersion=0, maxVersion=0), ApiVersion(apiKey=68, minVersion=0, maxVersion=0), ApiVersion(apiKey=69, minVersion=0, maxVersion=0)], throttleTimeMs=0, supportedFeatures=[], finalizedFeaturesEpoch=0, finalizedFeatures=[], zkMigrationReady=false) (org.apache.kafka.clients.NetworkClient)
[2025-02-03 20:11:06,631] DEBUG [Producer clientId=producer-1] Node -1 has finalized features epoch: 0, finalized features: [], supported features: [], ZK migration ready: false, API versions: (Produce(0): 0 to 11 [usable: 11], Fetch(1): 0 to 17 [usable: 17], ListOffsets(2): 0 to 9 [usable: 9], Metadata(3): 0 to 12 [usable: 12], LeaderAndIsr(4): 0 to 7 [usable: 7], StopReplica(5): 0 to 4 [usable: 4], UpdateMetadata(6): 0 to 8 [usable: 8], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 9 [usable: 9], OffsetFetch(9): 0 to 9 [usable: 9], FindCoordinator(10): 0 to 6 [usable: 6], JoinGroup(11): 0 to 9 [usable: 9], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 5 [usable: 5], SyncGroup(14): 0 to 5 [usable: 5], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 5 [usable: 5], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 4 [usable: 4], CreateTopics(19): 0 to 7 [usable: 7], DeleteTopics(20): 0 to 6 [usable: 6], DeleteRecords(21): 0 to 2 [usable: 2], InitProducerId(22): 0 to 5 [usable: 5], OffsetForLeaderEpoch(23): 0 to 4 [usable: 4], AddPartitionsToTxn(24): 0 to 5 [usable: 5], AddOffsetsToTxn(25): 0 to 4 [usable: 4], EndTxn(26): 0 to 4 [usable: 4], WriteTxnMarkers(27): 0 to 1 [usable: 1], TxnOffsetCommit(28): 0 to 4 [usable: 4], DescribeAcls(29): 0 to 3 [usable: 3], CreateAcls(30): 0 to 3 [usable: 3], DeleteAcls(31): 0 to 3 [usable: 3], DescribeConfigs(32): 0 to 4 [usable: 4], AlterConfigs(33): 0 to 2 [usable: 2], AlterReplicaLogDirs(34): 0 to 2 [usable: 2], DescribeLogDirs(35): 0 to 4 [usable: 4], SaslAuthenticate(36): 0 to 2 [usable: 2], CreatePartitions(37): 0 to 3 [usable: 3], CreateDelegationToken(38): 0 to 3 [usable: 3], RenewDelegationToken(39): 0 to 2 [usable: 2], ExpireDelegationToken(40): 0 to 2 [usable: 2], DescribeDelegationToken(41): 0 to 3 [usable: 3], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0], DescribeClientQuotas(48): 0 to 1 [usable: 1], AlterClientQuotas(49): 0 to 1 [usable: 1], DescribeUserScramCredentials(50): 0 [usable: 0], AlterUserScramCredentials(51): 0 [usable: 0], DescribeQuorum(55): UNSUPPORTED, AlterPartition(56): 0 to 3 [usable: 3], UpdateFeatures(57): 0 to 1 [usable: 1], Envelope(58): 0 [usable: 0], DescribeCluster(60): 0 to 1 [usable: 1], DescribeProducers(61): 0 [usable: 0], UnregisterBroker(64): UNSUPPORTED, DescribeTransactions(65): 0 [usable: 0], ListTransactions(66): 0 to 1 [usable: 1], AllocateProducerIds(67): 0 [usable: 0], ConsumerGroupHeartbeat(68): 0 [usable: 0], ConsumerGroupDescribe(69): 0 [usable: 0], GetTelemetrySubscriptions(71): UNSUPPORTED, PushTelemetry(72): UNSUPPORTED, ListClientMetricsResources(74): UNSUPPORTED, DescribeTopicPartitions(75): UNSUPPORTED, ShareGroupHeartbeat(76): UNSUPPORTED, ShareGroupDescribe(77): UNSUPPORTED, ShareFetch(78): UNSUPPORTED, ShareAcknowledge(79): UNSUPPORTED, AddRaftVoter(80): UNSUPPORTED, RemoveRaftVoter(81): UNSUPPORTED, InitializeShareGroupState(83): UNSUPPORTED, ReadShareGroupState(84): UNSUPPORTED, WriteShareGroupState(85): UNSUPPORTED, DeleteShareGroupState(86): UNSUPPORTED, ReadShareGroupStateSummary(87): UNSUPPORTED). (org.apache.kafka.clients.NetworkClient)
[2025-02-03 20:11:06,635] DEBUG [Producer clientId=producer-1] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(topicId=AAAAAAAAAAAAAAAAAAAAAA, name='topic01')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node kafka:9092 (id: -1 rack: null) (org.apache.kafka.clients.NetworkClient)
[2025-02-03 20:11:06,637] DEBUG [Producer clientId=producer-1] Sending METADATA request with header RequestHeader(apiKey=METADATA, apiVersion=12, clientId=producer-1, correlationId=1, headerVersion=2) and timeout 30000 to node -1: MetadataRequestData(topics=[MetadataRequestTopic(topicId=AAAAAAAAAAAAAAAAAAAAAA, name='topic01')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) (org.apache.kafka.clients.NetworkClient)
[2025-02-03 20:11:06,638] DEBUG [Producer clientId=producer-1] Give up sending telemetry request since no node is available (org.apache.kafka.clients.NetworkClient)
[2025-02-03 20:11:06,638] DEBUG [Producer clientId=producer-1] Sending transactional request InitProducerIdRequestData(transactionalId=null, transactionTimeoutMs=2147483647, producerId=-1, producerEpoch=-1) to node kafka:9092 (id: -1 rack: null) with correlation ID 2 (org.apache.kafka.clients.producer.internals.Sender)
[2025-02-03 20:11:06,638] DEBUG [Producer clientId=producer-1] Sending INIT_PRODUCER_ID request with header RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=5, clientId=producer-1, correlationId=2, headerVersion=2) and timeout 30000 to node -1: InitProducerIdRequestData(transactionalId=null, transactionTimeoutMs=2147483647, producerId=-1, producerEpoch=-1) (org.apache.kafka.clients.NetworkClient)
[2025-02-03 20:11:06,639] DEBUG [Producer clientId=producer-1] Give up sending telemetry request since no node is available (org.apache.kafka.clients.NetworkClient)
[2025-02-03 20:11:06,639] DEBUG Creating telemetry subscription request with client instance id AAAAAAAAAAAAAAAAAAAAAA (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter)
[2025-02-03 20:11:06,639] DEBUG Setting telemetry state from SUBSCRIPTION_NEEDED to SUBSCRIPTION_IN_PROGRESS (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter)
[2025-02-03 20:11:06,640] DEBUG [Producer clientId=producer-1] Version mismatch when attempting to send GetTelemetrySubscriptionsRequestData(clientInstanceId=AAAAAAAAAAAAAAAAAAAAAA) with correlation id 3 to -1 (org.apache.kafka.clients.NetworkClient)
org.apache.kafka.common.errors.UnsupportedVersionException: The node does not support GET_TELEMETRY_SUBSCRIPTIONS
[2025-02-03 20:11:06,643] DEBUG The broker generated an error for the get telemetry network API request (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter)
org.apache.kafka.common.errors.UnsupportedVersionException: The node does not support GET_TELEMETRY_SUBSCRIPTIONS
[2025-02-03 20:11:06,643] DEBUG Updating intervalMs: 300000, lastRequestMs: 1738581066643 (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter)
[2025-02-03 20:11:06,644] DEBUG Setting telemetry state from SUBSCRIPTION_IN_PROGRESS to SUBSCRIPTION_NEEDED (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter)
[2025-02-03 20:11:06,653] DEBUG [Producer clientId=producer-1] Received METADATA response from node -1 for request with header RequestHeader(apiKey=METADATA, apiVersion=12, clientId=producer-1, correlationId=1, headerVersion=2): MetadataResponseData(throttleTimeMs=0, brokers=[MetadataResponseBroker(nodeId=0, host='localhost', port=9092, rack=null)], clusterId='pVv8q9HHSze4gVFmIZdlvA', controllerId=0, topics=[MetadataResponseTopic(errorCode=0, name='topic01', topicId=mia05J1KRQyTuVA0MeQqQA, isInternal=false, partitions=[MetadataResponsePartition(errorCode=0, partitionIndex=0, leaderId=0, leaderEpoch=0, replicaNodes=[0], isrNodes=[0], offlineReplicas=[])], topicAuthorizedOperations=-2147483648)], clusterAuthorizedOperations=-2147483648) (org.apache.kafka.clients.NetworkClient)
[2025-02-03 20:11:06,664] DEBUG [Producer clientId=producer-1] Setting the last seen epoch of partition topic01-0 to 0 since the last known epoch was undefined. (org.apache.kafka.clients.Metadata)
[2025-02-03 20:11:06,671] INFO [Producer clientId=producer-1] Cluster ID: pVv8q9HHSze4gVFmIZdlvA (org.apache.kafka.clients.Metadata)
[2025-02-03 20:11:06,674] DEBUG [Producer clientId=producer-1] Updated cluster metadata updateVersion 2 to MetadataSnapshot{clusterId='pVv8q9HHSze4gVFmIZdlvA', nodes={0=localhost:9092 (id: 0 rack: null)}, partitions=[PartitionMetadata(error=NONE, partition=topic01-0, leader=Optional[0], leaderEpoch=Optional[0], replicas=0, isr=0, offlineReplicas=)], controller=localhost:9092 (id: 0 rack: null)} (org.apache.kafka.clients.Metadata)
[2025-02-03 20:11:06,676] DEBUG [Producer clientId=producer-1] Received INIT_PRODUCER_ID response from node -1 for request with header RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=5, clientId=producer-1, correlationId=2, headerVersion=2): InitProducerIdResponseData(throttleTimeMs=0, errorCode=0, producerId=2, producerEpoch=0) (org.apache.kafka.clients.NetworkClient)
[2025-02-03 20:11:06,676] INFO [Producer clientId=producer-1] ProducerId set to 2 with epoch 0 (org.apache.kafka.clients.producer.internals.TransactionManager)
[2025-02-03 20:11:06,676] DEBUG [Producer clientId=producer-1] Transition from state INITIALIZING to READY (org.apache.kafka.clients.producer.internals.TransactionManager)
[2025-02-03 20:11:06,689] DEBUG Resolved host localhost as 127.0.0.1 (org.apache.kafka.clients.ClientUtils)
[2025-02-03 20:11:06,689] DEBUG [Producer clientId=producer-1] Resolved host localhost to addresses [localhost/127.0.0.1] (org.apache.kafka.clients.ClusterConnectionStates)
[2025-02-03 20:11:06,689] DEBUG [Producer clientId=producer-1] Initiating connection to node localhost:9092 (id: 0 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient)
[2025-02-03 20:11:06,698] DEBUG [Producer clientId=producer-1] Created socket with SO_RCVBUF = 326640, SO_SNDBUF = 146988, SO_TIMEOUT = 0 to node 0 (org.apache.kafka.common.network.Selector)
[2025-02-03 20:11:06,698] DEBUG [Producer clientId=producer-1] Completed connection to node 0. Fetching API versions. (org.apache.kafka.clients.NetworkClient)
[2025-02-03 20:11:06,698] DEBUG [Producer clientId=producer-1] Initiating API versions fetch from node 0. (org.apache.kafka.clients.NetworkClient)
[2025-02-03 20:11:06,699] DEBUG [Producer clientId=producer-1] Sending API_VERSIONS request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=4, clientId=producer-1, correlationId=4, headerVersion=2) and timeout 30000 to node 0: ApiVersionsRequestData(clientSoftwareName='apache-kafka-java', clientSoftwareVersion='3.9.0') (org.apache.kafka.clients.NetworkClient)
[2025-02-03 20:11:06,704] DEBUG [Producer clientId=producer-1] Received API_VERSIONS response from node 0 for request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=4, clientId=producer-1, correlationId=4, headerVersion=2): ApiVersionsResponseData(errorCode=0, apiKeys=[ApiVersion(apiKey=0, minVersion=0, maxVersion=11), ApiVersion(apiKey=1, minVersion=0, maxVersion=17), ApiVersion(apiKey=2, minVersion=0, maxVersion=9), ApiVersion(apiKey=3, minVersion=0, maxVersion=12), ApiVersion(apiKey=4, minVersion=0, maxVersion=7), ApiVersion(apiKey=5, minVersion=0, maxVersion=4), ApiVersion(apiKey=6, minVersion=0, maxVersion=8), ApiVersion(apiKey=7, minVersion=0, maxVersion=3), ApiVersion(apiKey=8, minVersion=0, maxVersion=9), ApiVersion(apiKey=9, minVersion=0, maxVersion=9), ApiVersion(apiKey=10, minVersion=0, maxVersion=6), ApiVersion(apiKey=11, minVersion=0, maxVersion=9), ApiVersion(apiKey=12, minVersion=0, maxVersion=4), ApiVersion(apiKey=13, minVersion=0, maxVersion=5), ApiVersion(apiKey=14, minVersion=0, maxVersion=5), ApiVersion(apiKey=15, minVersion=0, maxVersion=5), ApiVersion(apiKey=16, minVersion=0, maxVersion=5), ApiVersion(apiKey=17, minVersion=0, maxVersion=1), ApiVersion(apiKey=18, minVersion=0, maxVersion=4), ApiVersion(apiKey=19, minVersion=0, maxVersion=7), ApiVersion(apiKey=20, minVersion=0, maxVersion=6), ApiVersion(apiKey=21, minVersion=0, maxVersion=2), ApiVersion(apiKey=22, minVersion=0, maxVersion=5), ApiVersion(apiKey=23, minVersion=0, maxVersion=4), ApiVersion(apiKey=24, minVersion=0, maxVersion=5), ApiVersion(apiKey=25, minVersion=0, maxVersion=4), ApiVersion(apiKey=26, minVersion=0, maxVersion=4), ApiVersion(apiKey=27, minVersion=0, maxVersion=1), ApiVersion(apiKey=28, minVersion=0, maxVersion=4), ApiVersion(apiKey=29, minVersion=0, maxVersion=3), ApiVersion(apiKey=30, minVersion=0, maxVersion=3), ApiVersion(apiKey=31, minVersion=0, maxVersion=3), ApiVersion(apiKey=32, minVersion=0, maxVersion=4), ApiVersion(apiKey=33, minVersion=0, maxVersion=2), ApiVersion(apiKey=34, minVersion=0, maxVersion=2), ApiVersion(apiKey=35, minVersion=0, maxVersion=4), ApiVersion(apiKey=36, minVersion=0, maxVersion=2), ApiVersion(apiKey=37, minVersion=0, maxVersion=3), ApiVersion(apiKey=38, minVersion=0, maxVersion=3), ApiVersion(apiKey=39, minVersion=0, maxVersion=2), ApiVersion(apiKey=40, minVersion=0, maxVersion=2), ApiVersion(apiKey=41, minVersion=0, maxVersion=3), ApiVersion(apiKey=42, minVersion=0, maxVersion=2), ApiVersion(apiKey=43, minVersion=0, maxVersion=2), ApiVersion(apiKey=44, minVersion=0, maxVersion=1), ApiVersion(apiKey=45, minVersion=0, maxVersion=0), ApiVersion(apiKey=46, minVersion=0, maxVersion=0), ApiVersion(apiKey=47, minVersion=0, maxVersion=0), ApiVersion(apiKey=48, minVersion=0, maxVersion=1), ApiVersion(apiKey=49, minVersion=0, maxVersion=1), ApiVersion(apiKey=50, minVersion=0, maxVersion=0), ApiVersion(apiKey=51, minVersion=0, maxVersion=0), ApiVersion(apiKey=56, minVersion=0, maxVersion=3), ApiVersion(apiKey=57, minVersion=0, maxVersion=1), ApiVersion(apiKey=58, minVersion=0, maxVersion=0), ApiVersion(apiKey=60, minVersion=0, maxVersion=1), ApiVersion(apiKey=61, minVersion=0, maxVersion=0), ApiVersion(apiKey=65, minVersion=0, maxVersion=0), ApiVersion(apiKey=66, minVersion=0, maxVersion=1), ApiVersion(apiKey=67, minVersion=0, maxVersion=0), ApiVersion(apiKey=68, minVersion=0, maxVersion=0), ApiVersion(apiKey=69, minVersion=0, maxVersion=0)], throttleTimeMs=0, supportedFeatures=[], finalizedFeaturesEpoch=0, finalizedFeatures=[], zkMigrationReady=false) (org.apache.kafka.clients.NetworkClient)
[2025-02-03 20:11:06,705] DEBUG [Producer clientId=producer-1] Node 0 has finalized features epoch: 0, finalized features: [], supported features: [], ZK migration ready: false, API versions: (Produce(0): 0 to 11 [usable: 11], Fetch(1): 0 to 17 [usable: 17], ListOffsets(2): 0 to 9 [usable: 9], Metadata(3): 0 to 12 [usable: 12], LeaderAndIsr(4): 0 to 7 [usable: 7], StopReplica(5): 0 to 4 [usable: 4], UpdateMetadata(6): 0 to 8 [usable: 8], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 9 [usable: 9], OffsetFetch(9): 0 to 9 [usable: 9], FindCoordinator(10): 0 to 6 [usable: 6], JoinGroup(11): 0 to 9 [usable: 9], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 5 [usable: 5], SyncGroup(14): 0 to 5 [usable: 5], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 5 [usable: 5], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 4 [usable: 4], CreateTopics(19): 0 to 7 [usable: 7], DeleteTopics(20): 0 to 6 [usable: 6], DeleteRecords(21): 0 to 2 [usable: 2], InitProducerId(22): 0 to 5 [usable: 5], OffsetForLeaderEpoch(23): 0 to 4 [usable: 4], AddPartitionsToTxn(24): 0 to 5 [usable: 5], AddOffsetsToTxn(25): 0 to 4 [usable: 4], EndTxn(26): 0 to 4 [usable: 4], WriteTxnMarkers(27): 0 to 1 [usable: 1], TxnOffsetCommit(28): 0 to 4 [usable: 4], DescribeAcls(29): 0 to 3 [usable: 3], CreateAcls(30): 0 to 3 [usable: 3], DeleteAcls(31): 0 to 3 [usable: 3], DescribeConfigs(32): 0 to 4 [usable: 4], AlterConfigs(33): 0 to 2 [usable: 2], AlterReplicaLogDirs(34): 0 to 2 [usable: 2], DescribeLogDirs(35): 0 to 4 [usable: 4], SaslAuthenticate(36): 0 to 2 [usable: 2], CreatePartitions(37): 0 to 3 [usable: 3], CreateDelegationToken(38): 0 to 3 [usable: 3], RenewDelegationToken(39): 0 to 2 [usable: 2], ExpireDelegationToken(40): 0 to 2 [usable: 2], DescribeDelegationToken(41): 0 to 3 [usable: 3], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0], DescribeClientQuotas(48): 0 to 1 [usable: 1], AlterClientQuotas(49): 0 to 1 [usable: 1], DescribeUserScramCredentials(50): 0 [usable: 0], AlterUserScramCredentials(51): 0 [usable: 0], DescribeQuorum(55): UNSUPPORTED, AlterPartition(56): 0 to 3 [usable: 3], UpdateFeatures(57): 0 to 1 [usable: 1], Envelope(58): 0 [usable: 0], DescribeCluster(60): 0 to 1 [usable: 1], DescribeProducers(61): 0 [usable: 0], UnregisterBroker(64): UNSUPPORTED, DescribeTransactions(65): 0 [usable: 0], ListTransactions(66): 0 to 1 [usable: 1], AllocateProducerIds(67): 0 [usable: 0], ConsumerGroupHeartbeat(68): 0 [usable: 0], ConsumerGroupDescribe(69): 0 [usable: 0], GetTelemetrySubscriptions(71): UNSUPPORTED, PushTelemetry(72): UNSUPPORTED, ListClientMetricsResources(74): UNSUPPORTED, DescribeTopicPartitions(75): UNSUPPORTED, ShareGroupHeartbeat(76): UNSUPPORTED, ShareGroupDescribe(77): UNSUPPORTED, ShareFetch(78): UNSUPPORTED, ShareAcknowledge(79): UNSUPPORTED, AddRaftVoter(80): UNSUPPORTED, RemoveRaftVoter(81): UNSUPPORTED, InitializeShareGroupState(83): UNSUPPORTED, ReadShareGroupState(84): UNSUPPORTED, WriteShareGroupState(85): UNSUPPORTED, DeleteShareGroupState(86): UNSUPPORTED, ReadShareGroupStateSummary(87): UNSUPPORTED). (org.apache.kafka.clients.NetworkClient)
[2025-02-03 20:11:06,716] DEBUG [Producer clientId=producer-1] ProducerId of partition topic01-0 set to 2 with epoch 0. Reinitialize sequence at beginning. (org.apache.kafka.clients.producer.internals.TransactionManager)
[2025-02-03 20:11:06,717] DEBUG [Producer clientId=producer-1] Assigned producerId 2 and producerEpoch 0 to batch with base sequence 0 being sent to partition topic01-0 (org.apache.kafka.clients.producer.internals.RecordAccumulator)
[2025-02-03 20:11:06,727] DEBUG [Producer clientId=producer-1] Sending PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=11, clientId=producer-1, correlationId=5, headerVersion=2) and timeout 30000 to node 0: {acks=-1,timeout=30000,partitionSizes=[topic01-0=80]} (org.apache.kafka.clients.NetworkClient)
[2025-02-03 20:11:06,754] DEBUG [Producer clientId=producer-1] Received PRODUCE response from node 0 for request with header RequestHeader(apiKey=PRODUCE, apiVersion=11, clientId=producer-1, correlationId=5, headerVersion=2): ProduceResponseData(responses=[TopicProduceResponse(name='topic01', partitionResponses=[PartitionProduceResponse(index=0, errorCode=0, baseOffset=2, logAppendTimeMs=-1, logStartOffset=0, recordErrors=[], errorMessage=null, currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1))])], throttleTimeMs=0, nodeEndpoints=[]) (org.apache.kafka.clients.NetworkClient)
[2025-02-03 20:11:06,758] DEBUG [Producer clientId=producer-1] ProducerId: 2; Set last ack'd sequence number for topic-partition topic01-0 to 0 (org.apache.kafka.clients.producer.internals.TransactionManager)
Sent message: key=key-1, value=value-1 to partition=0, offset=2
[2025-02-03 20:11:06,763] INFO [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. (org.apache.kafka.clients.producer.KafkaProducer)
[2025-02-03 20:11:06,766] DEBUG Initiate close of ClientTelemetryReporter (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter)
[2025-02-03 20:11:06,766] DEBUG initiate close for client telemetry, check if terminal push required. (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter)
[2025-02-03 20:11:06,766] DEBUG Subscription not yet loaded, ignoring terminal push (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter)
[2025-02-03 20:11:06,766] DEBUG [Producer clientId=producer-1] Beginning shutdown of Kafka producer I/O thread, sending remaining records. (org.apache.kafka.clients.producer.internals.Sender)
[2025-02-03 20:11:06,777] DEBUG removing kafka metric : MetricName [name=connection-count, group=producer-metrics, description=The current number of active connections., tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,782] DEBUG removing kafka metric : MetricName [name=connection-close-total, group=producer-metrics, description=The total number of connections closed, tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,783] DEBUG removing kafka metric : MetricName [name=connection-close-rate, group=producer-metrics, description=The number of connections closed per second, tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,783] DEBUG removing kafka metric : MetricName [name=connection-creation-total, group=producer-metrics, description=The total number of new connections established, tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,784] DEBUG removing kafka metric : MetricName [name=connection-creation-rate, group=producer-metrics, description=The number of new connections established per second, tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,785] DEBUG removing kafka metric : MetricName [name=successful-authentication-total, group=producer-metrics, description=The total number of connections with successful authentication, tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,785] DEBUG removing kafka metric : MetricName [name=successful-authentication-rate, group=producer-metrics, description=The number of connections with successful authentication per second, tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,786] DEBUG removing kafka metric : MetricName [name=successful-reauthentication-total, group=producer-metrics, description=The total number of successful re-authentication of connections, tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,786] DEBUG removing kafka metric : MetricName [name=successful-reauthentication-rate, group=producer-metrics, description=The number of successful re-authentication of connections per second, tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,787] DEBUG removing kafka metric : MetricName [name=successful-authentication-no-reauth-total, group=producer-metrics, description=The total number of connections with successful authentication where the client does not support re-authentication, tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,787] DEBUG removing kafka metric : MetricName [name=failed-authentication-total, group=producer-metrics, description=The total number of connections with failed authentication, tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,787] DEBUG removing kafka metric : MetricName [name=failed-authentication-rate, group=producer-metrics, description=The number of connections with failed authentication per second, tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,788] DEBUG removing kafka metric : MetricName [name=failed-reauthentication-total, group=producer-metrics, description=The total number of failed re-authentication of connections, tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,788] DEBUG removing kafka metric : MetricName [name=failed-reauthentication-rate, group=producer-metrics, description=The number of failed re-authentication of connections per second, tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,789] DEBUG removing kafka metric : MetricName [name=reauthentication-latency-max, group=producer-metrics, description=The max latency observed due to re-authentication, tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,790] DEBUG removing kafka metric : MetricName [name=reauthentication-latency-avg, group=producer-metrics, description=The average latency observed due to re-authentication, tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,790] DEBUG removing kafka metric : MetricName [name=network-io-total, group=producer-metrics, description=The total number of network operations (reads or writes) on all connections, tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,790] DEBUG removing kafka metric : MetricName [name=network-io-rate, group=producer-metrics, description=The number of network operations (reads or writes) on all connections per second, tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,790] DEBUG removing kafka metric : MetricName [name=outgoing-byte-total, group=producer-metrics, description=The total number of outgoing bytes sent to all servers, tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,791] DEBUG removing kafka metric : MetricName [name=outgoing-byte-rate, group=producer-metrics, description=The number of outgoing bytes sent to all servers per second, tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,793] DEBUG removing kafka metric : MetricName [name=incoming-byte-total, group=producer-metrics, description=The total number of bytes read off all sockets, tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,794] DEBUG removing kafka metric : MetricName [name=incoming-byte-rate, group=producer-metrics, description=The number of bytes read off all sockets per second, tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,794] DEBUG removing kafka metric : MetricName [name=request-total, group=producer-metrics, description=The total number of requests sent, tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,794] DEBUG removing kafka metric : MetricName [name=request-rate, group=producer-metrics, description=The number of requests sent per second, tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,794] DEBUG removing kafka metric : MetricName [name=request-size-avg, group=producer-metrics, description=The average size of requests sent., tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,795] DEBUG removing kafka metric : MetricName [name=request-size-max, group=producer-metrics, description=The maximum size of any request sent., tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,796] DEBUG removing kafka metric : MetricName [name=response-total, group=producer-metrics, description=The total number of responses received, tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,797] DEBUG removing kafka metric : MetricName [name=response-rate, group=producer-metrics, description=The number of responses received per second, tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,798] DEBUG removing kafka metric : MetricName [name=select-total, group=producer-metrics, description=The total number of times the I/O layer checked for new I/O to perform, tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,798] DEBUG removing kafka metric : MetricName [name=select-rate, group=producer-metrics, description=The number of times the I/O layer checked for new I/O to perform per second, tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,799] DEBUG removing kafka metric : MetricName [name=io-wait-time-ns-avg, group=producer-metrics, description=The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds., tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,799] DEBUG removing kafka metric : MetricName [name=io-waittime-total, group=producer-metrics, description=*Deprecated* The total time the I/O thread spent waiting, tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,799] DEBUG removing kafka metric : MetricName [name=io-wait-ratio, group=producer-metrics, description=The fraction of time the I/O thread spent waiting, tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,799] DEBUG removing kafka metric : MetricName [name=io-wait-time-ns-total, group=producer-metrics, description=The total time the I/O thread spent waiting, tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,799] DEBUG removing kafka metric : MetricName [name=io-time-ns-avg, group=producer-metrics, description=The average length of time for I/O per select call in nanoseconds., tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,799] DEBUG removing kafka metric : MetricName [name=iotime-total, group=producer-metrics, description=*Deprecated* The total time the I/O thread spent doing I/O, tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,799] DEBUG removing kafka metric : MetricName [name=io-ratio, group=producer-metrics, description=The fraction of time the I/O thread spent doing I/O, tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,800] DEBUG removing kafka metric : MetricName [name=io-time-ns-total, group=producer-metrics, description=The total time the I/O thread spent doing I/O, tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,800] DEBUG removing kafka metric : MetricName [name=request-total, group=producer-node-metrics, description=The total number of requests sent, tags={client-id=producer-1, node-id=node--1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,800] DEBUG removing kafka metric : MetricName [name=request-rate, group=producer-node-metrics, description=The number of requests sent per second, tags={client-id=producer-1, node-id=node--1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,800] DEBUG removing kafka metric : MetricName [name=request-size-avg, group=producer-node-metrics, description=The average size of requests sent., tags={client-id=producer-1, node-id=node--1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,802] DEBUG removing kafka metric : MetricName [name=request-size-max, group=producer-node-metrics, description=The maximum size of any request sent., tags={client-id=producer-1, node-id=node--1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,802] DEBUG removing kafka metric : MetricName [name=outgoing-byte-total, group=producer-node-metrics, description=The total number of outgoing bytes, tags={client-id=producer-1, node-id=node--1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,802] DEBUG removing kafka metric : MetricName [name=outgoing-byte-rate, group=producer-node-metrics, description=The number of outgoing bytes per second, tags={client-id=producer-1, node-id=node--1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,802] DEBUG removing kafka metric : MetricName [name=response-total, group=producer-node-metrics, description=The total number of responses received, tags={client-id=producer-1, node-id=node--1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,802] DEBUG removing kafka metric : MetricName [name=response-rate, group=producer-node-metrics, description=The number of responses received per second, tags={client-id=producer-1, node-id=node--1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,803] DEBUG removing kafka metric : MetricName [name=incoming-byte-total, group=producer-node-metrics, description=The total number of incoming bytes, tags={client-id=producer-1, node-id=node--1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,803] DEBUG removing kafka metric : MetricName [name=incoming-byte-rate, group=producer-node-metrics, description=The number of incoming bytes per second, tags={client-id=producer-1, node-id=node--1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,803] DEBUG removing kafka metric : MetricName [name=request-latency-avg, group=producer-node-metrics, description=, tags={client-id=producer-1, node-id=node--1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,803] DEBUG removing kafka metric : MetricName [name=request-latency-max, group=producer-node-metrics, description=, tags={client-id=producer-1, node-id=node--1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,804] DEBUG removing kafka metric : MetricName [name=request-total, group=producer-node-metrics, description=The total number of requests sent, tags={client-id=producer-1, node-id=node-0}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,804] DEBUG removing kafka metric : MetricName [name=request-rate, group=producer-node-metrics, description=The number of requests sent per second, tags={client-id=producer-1, node-id=node-0}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,804] DEBUG removing kafka metric : MetricName [name=request-size-avg, group=producer-node-metrics, description=The average size of requests sent., tags={client-id=producer-1, node-id=node-0}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,804] DEBUG removing kafka metric : MetricName [name=request-size-max, group=producer-node-metrics, description=The maximum size of any request sent., tags={client-id=producer-1, node-id=node-0}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,805] DEBUG removing kafka metric : MetricName [name=outgoing-byte-total, group=producer-node-metrics, description=The total number of outgoing bytes, tags={client-id=producer-1, node-id=node-0}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,805] DEBUG removing kafka metric : MetricName [name=outgoing-byte-rate, group=producer-node-metrics, description=The number of outgoing bytes per second, tags={client-id=producer-1, node-id=node-0}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,805] DEBUG removing kafka metric : MetricName [name=response-total, group=producer-node-metrics, description=The total number of responses received, tags={client-id=producer-1, node-id=node-0}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,805] DEBUG removing kafka metric : MetricName [name=response-rate, group=producer-node-metrics, description=The number of responses received per second, tags={client-id=producer-1, node-id=node-0}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,805] DEBUG removing kafka metric : MetricName [name=incoming-byte-total, group=producer-node-metrics, description=The total number of incoming bytes, tags={client-id=producer-1, node-id=node-0}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,805] DEBUG removing kafka metric : MetricName [name=incoming-byte-rate, group=producer-node-metrics, description=The number of incoming bytes per second, tags={client-id=producer-1, node-id=node-0}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,806] DEBUG removing kafka metric : MetricName [name=request-latency-avg, group=producer-node-metrics, description=, tags={client-id=producer-1, node-id=node-0}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,808] DEBUG removing kafka metric : MetricName [name=request-latency-max, group=producer-node-metrics, description=, tags={client-id=producer-1, node-id=node-0}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,809] DEBUG close telemetry sender for client telemetry reporter instance (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter)
[2025-02-03 20:11:06,809] DEBUG Setting telemetry state from SUBSCRIPTION_NEEDED to TERMINATED (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter)
[2025-02-03 20:11:06,809] DEBUG [Producer clientId=producer-1] Shutdown of Kafka producer I/O thread has completed. (org.apache.kafka.clients.producer.internals.Sender)
[2025-02-03 20:11:06,812] DEBUG removing kafka metric : MetricName [name=flush-time-ns-total, group=producer-metrics, description=Total time producer has spent in flush in nanoseconds., tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,813] DEBUG removing kafka metric : MetricName [name=txn-init-time-ns-total, group=producer-metrics, description=Total time producer has spent in initTransactions in nanoseconds., tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,814] DEBUG removing kafka metric : MetricName [name=txn-begin-time-ns-total, group=producer-metrics, description=Total time producer has spent in beginTransaction in nanoseconds., tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,815] DEBUG removing kafka metric : MetricName [name=txn-send-offsets-time-ns-total, group=producer-metrics, description=Total time producer has spent in sendOffsetsToTransaction in nanoseconds., tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,815] DEBUG removing kafka metric : MetricName [name=txn-commit-time-ns-total, group=producer-metrics, description=Total time producer has spent in commitTransaction in nanoseconds., tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,816] DEBUG removing kafka metric : MetricName [name=txn-abort-time-ns-total, group=producer-metrics, description=Total time producer has spent in abortTransaction in nanoseconds., tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,816] DEBUG removing kafka metric : MetricName [name=metadata-wait-time-ns-total, group=producer-metrics, description=Total time producer has spent waiting on topic metadata in nanoseconds., tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,816] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics)
[2025-02-03 20:11:06,817] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics)
[2025-02-03 20:11:06,818] INFO Closing reporter org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter (org.apache.kafka.common.metrics.Metrics)
[2025-02-03 20:11:06,818] DEBUG Stopping ClientTelemetryReporter (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter)
[2025-02-03 20:11:06,818] DEBUG close telemetry sender for client telemetry reporter instance (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter)
[2025-02-03 20:11:06,818] DEBUG Ignoring subsequent close (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter)
[2025-02-03 20:11:06,819] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics)
[2025-02-03 20:11:06,820] DEBUG Stopping ClientTelemetryReporter (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter)
[2025-02-03 20:11:06,822] DEBUG close telemetry sender for client telemetry reporter instance (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter)
[2025-02-03 20:11:06,822] DEBUG Ignoring subsequent close (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter)
[2025-02-03 20:11:06,826] DEBUG removing kafka metric : MetricName [name=version, group=app-info, description=Metric indicating version, tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,828] DEBUG removing kafka metric : MetricName [name=commit-id, group=app-info, description=Metric indicating commit-id, tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,829] DEBUG removing kafka metric : MetricName [name=start-time-ms, group=app-info, description=Metric indicating start-time-ms, tags={client-id=producer-1}] (org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector)
[2025-02-03 20:11:06,829] INFO App info kafka.producer for producer-1 unregistered (org.apache.kafka.common.utils.AppInfoParser)
[2025-02-03 20:11:06,829] DEBUG [Producer clientId=producer-1] Kafka producer has been closed (org.apache.kafka.clients.producer.KafkaProducer)
補足(Log4j 2.x形式の設定ファイルを利用する)
Log4j 2.x形式の設定ファイル(log4j2.xml, log4j2.properties, etc)を利用する場合は、ロギングの実装として、Log4j SLF4J 2.0 API bindingを利用します。
pom.xmlの設定
dependenciesに以下を追加します。(kafka-clientsは省略)
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.16</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j2-impl</artifactId>
<version>2.24.3</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.24.3</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.24.3</version>
</dependency>
設定ファイル
log4j2.xml
やlog4j2.properties
等、Log4j 2で利用できる設定ファイルを用意します。
以下は、それぞれ、DEBUGレベルで標準出力に出力する場合の設定例です。
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
<!-- Console Appender の設定 -->
<Appenders>
<Console name="STDOUT" target="SYSTEM_OUT">
<PatternLayout pattern="[%d] %p %m (%c)%n"/>
</Console>
</Appenders>
<!-- ルートロガーの設定 -->
<Loggers>
<Root level="debug">
<AppenderRef ref="STDOUT"/>
</Root>
</Loggers>
</Configuration>
# ルートロガーの設定
rootLogger.level = debug
rootLogger.appenderRef.stdout.ref = STDOUT
# Console Appender の設定
appender.stdout.type = Console
appender.stdout.name = STDOUT
appender.stdout.target = SYSTEM_OUT
appender.stdout.layout.type = PatternLayout
appender.stdout.layout.pattern = [%d] %p %m (%c)%n
設定ファイルの配置
デフォルトでは下記ディレクトリに配置した設定ファイルを利用します。
<project dir>/src/main/resources
基本的にはクラスパスに指定したディレクトリ下の設定ファイルを利用します。
任意の設定ファイルを指定する場合は、以下のシステム・プロパティで指定できます。
log4j.configurationFile
Log4j 1.xのときのlog4j.configuration
とは異なります。
指定例(launch.jsonでmy-log4j2.xmlを指定する場合)
{
"type": "java",
"name": "KafkaProducerApp",
"request": "launch",
"mainClass": "com.example.KafkaProducerApp",
"projectName": "kafka",
"vmArgs": [
"-Dlog4j.configurationFile=file:/path/to/my-log4j2.xml"
]
}
上記設定により、Log4j 2.x形式の設定ファイルでクライアント・ライブラリのログ出力を制御できます。