はじめに
Apache Kafkaの商用版であるConfluent Platformについてのメモ書きです。初物なので調べた内容の整理なども含めてログを残しておきます。
商用版といってもConfluent Community Licenseというのがあり、一部の機能は無償で利用できるので無償の範囲で試しています。
関連記事
Confluent Platform メモ - (1)環境構築
Confluent Platform メモ - (2)メッセージ送受信簡易テスト
Confluent Platform メモ - (3)Schema Registry簡易テスト
参考情報
Apache Kafkaの概要とアーキテクチャ
Confluent Platformってどんなの?
Confluent Platform(Community版)をインストールしてKafka Connectを試してみる
Apache KafkaのProducer/Broker/Consumerのしくみと設定一覧
Manual Install using ZIP and TAR Archives
Confluent Platform Community Licenseについて
Confluent Platformにはいくつかコンポーネントが含まれており、コンポーネントごとにライセンスが異なるようです。
Confluent Community License FAQ
以下の表は上のサイトからの抜粋です。
Confluent Platformのコアのコンポーネントである Kafka, ZooKeeper辺りはAPACHE 2.0 LICENCEとして利用可能のようです。Schema RgistryやREST Proxyは COMMUNITY LICENSEの範囲で利用できそうです。
Control Centerやクラスター関連の機能を使う場合にはENTERPRISE LICENSEが必要(有償)ということのようです。
参考:
Confluent Platform Licenses
Confluent Community License Version 1.0
環境情報
RHEL V8.2
Confluent Community V6.2.0
Windows10上のVirtualBoxに仮想OSとしてRHEL V8.2を立てているのでそこにインストールします。
Kafkaは複数のノードに分散されたクラスター構成を行うことができますが、ここでは簡易的なテスト環境を作る目的なので、1ノード上に1Brokerのみを構成します。
前提条件はこちら
Confluent System Requirements
javaは既にインストール済み
[root@test12 ~]# java -version
openjdk version "1.8.0_242"
OpenJDK Runtime Environment (build 1.8.0_242-b08)
OpenJDK 64-Bit Server VM (build 25.242-b08, mixed mode)
Confluent Platform Community Componentのインストール
オフラインでのインストール方法があるのでこちらの手順に従ってやってみます。
Manual Install using ZIP and TAR Archives
上のガイドにある confluent-community-6.2.0.tar.gz (約350MB) をダウンロードしてターゲットマシンに持っていき展開します。ここでは、/opt/以下に展開することにします。
[root@test12 /Local_Inst_Image/Confluent]# ls -la
合計 348552
drwxrwx---. 1 root vboxsf 0 8月 13 18:00 .
drwxrwx---. 1 root vboxsf 16384 8月 13 17:59 ..
-rwxrwx---. 1 root vboxsf 356898902 8月 13 18:00 confluent-community-6.2.0.tar.gz
[root@test12 /Local_Inst_Image/Confluent]# tar xzf confluent-community-6.2.0.tar.gz -C /opt
/opt/confluent-6.2.0以下にファイルが展開されました。
owner,group変更します。
[root@test12 /opt]# chown -R root:root confluent-6.2.0
[root@test12 /opt]# ls -la confluent-6.2.0/
合計 8
drwxr-xr-x. 7 root root 77 6月 6 08:51 .
drwxr-xr-x. 7 root root 95 8月 13 18:05 ..
-rw-r--r--. 1 root root 871 6月 6 08:51 README
drwxr-xr-x. 3 root root 4096 6月 6 07:11 bin
drwxr-xr-x. 8 root root 116 6月 6 07:11 etc
drwxr-xr-x. 3 root root 21 6月 6 07:11 lib
drwxr-xr-x. 6 root root 71 6月 6 07:11 share
drwxr-xr-x. 2 root root 178 6月 6 08:51 src
tar展開するだけでOKみたいです。楽ちん。
Confluentコンポーネントの管理
ここでは、ZooKeeper, Kafka(Broker), Schem Registry の3つのコンポーネントを扱います。
とりあえず全てデフォルトの構成で起動させてみます。
(Control Centerというコンポーネントを使うとGUIの管理用インターフェースが提供されるようで、それを使うと分かりやすそうなのですが、残念ながらCommunity Licenseでは利用できないようです...)
ZooKeeper
構成
Configure Confluent Platform - ZooKeeper
デフォルトで提供されているプロパティファイルを確認。
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080
この辺にパラメーターのReferenceらしきものはあり...
Running ZooKeeper in Production - Configuration Options
Complete Listはこっち見ろということでApacheのサイトに飛ばされたが...
Apache - ZooKeeper Administrator's Guide
admin.enableServerとか見つからない。上のリンクはV3.4.10のドキュメントのようだがバージョンが古いっぽい。
新し目のバージョンのドキュメントにはadmin.enableServerの記載はあった。
Apache - ZooKeeper Administrator's Guide (3.5.9)
(後でZooKeeperの起動時ログを確認したところV3.5.9だった)
起動/停止
- 起動コマンド:
bin/zookeeper-server-start -daemon <property_file>
- 停止コマンド:
bin/zookeeper-server-stop
- ログ:
logs/zookeeper.out
- Listenポート: 2181
※マニュアルにコマンドリファレンスみたいなものが見当たらない...。上のファイル覗いたらシェル・スクリプトだったのでそこから判断するに -daemonオプションをつければバックグラウンド起動してくれるようです。(末尾のシェル・スクリプト一覧参照)
提供されるプロパティファイルをそのまま使って以下のコマンドで起動してみます。
/opt/confluent-6.2.0/bin/zookeeper-server-start -daemon /opt/confluent-6.2.0/etc/kafka/zookeeper.properties
logs/zookeeper.outにメッセージが出力されます。
(server.logにも同じ内容のメッセージが出力されますが、server.logは後述のKafkaのメッセージも合わせて出力されるようです。)
起動時ログ: logs/zookeeper.out
[2021-08-14 09:24:44,670] INFO Reading configuration from: /opt/confluent-6.2.0/etc/kafka/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2021-08-14 09:24:44,695] INFO clientPortAddress is 0.0.0.0:2181 (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2021-08-14 09:24:44,695] INFO secureClientPort is not set (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2021-08-14 09:24:44,697] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2021-08-14 09:24:44,697] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2021-08-14 09:24:44,697] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
[2021-08-14 09:24:44,697] WARN Either no config or no quorum defined in config, running in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2021-08-14 09:24:44,700] INFO Log4j 1.2 jmx support found and enabled. (org.apache.zookeeper.jmx.ManagedUtil)
[2021-08-14 09:24:44,714] INFO Reading configuration from: /opt/confluent-6.2.0/etc/kafka/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2021-08-14 09:24:44,714] INFO clientPortAddress is 0.0.0.0:2181 (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2021-08-14 09:24:44,714] INFO secureClientPort is not set (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2021-08-14 09:24:44,715] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
[2021-08-14 09:24:44,717] INFO zookeeper.snapshot.trust.empty : false (org.apache.zookeeper.server.persistence.FileTxnSnapLog)
[2021-08-14 09:24:44,734] INFO Server environment:zookeeper.version=3.5.9-83df9301aa5c2a5d284a9940177808c01bc35cef, built on 01/06/2021 20:03 GMT (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,734] INFO Server environment:host.name=localhost (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,734] INFO Server environment:java.version=1.8.0_242 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,734] INFO Server environment:java.vendor=Oracle Corporation (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,734] INFO Server environment:java.home=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.242.b08-4.el8.x86_64/jre (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,735] INFO Server environment:java.class.path=/opt/confluent-6.2.0/bin/../share/java/kafka/netty-transport-4.1.62.Final.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/netty-buffer-4.1.62.Final.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/connect-mirror-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-streams-scala_2.13-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-util-ajax-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jakarta.inject-2.6.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/audience-annotations-0.5.0.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/scala-java8-compat_2.13-0.9.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-metadata-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jakarta.activation-api-1.2.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/slf4j-api-1.7.30.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/scala-logging_2.13-3.9.2.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/zookeeper-jute-3.5.9.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/netty-codec-4.1.62.Final.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/connect-json-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-security-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/scala-reflect-2.13.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/scala-library-2.13.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-jaxrs-base-2.10.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka_2.13-6.2.0-ccs-sources.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/netty-handler-4.1.62.Final.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jakarta.ws.rs-api-2.1.6.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/zstd-jni-1.4.9-1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-util-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-server-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/zookeeper-3.5.9.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/javassist-3.27.0-GA.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-shell-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-client-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/commons-cli-1.4.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-core-2.10.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-servlet-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/connect-runtime-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/scala-collection-compat_2.13-2.3.0.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-jaxrs-json-provider-2.10.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-streams-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jakarta.validation-api-2.0.2.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-module-scala_2.13-2.10.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jakarta.annotation-api-1.3.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jersey-server-2.34.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/hk2-utils-2.6.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/lz4-java-1.7.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/maven-artifact-3.8.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/connect-transforms-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/metrics-core-2.2.0.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-annotations-2.10.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka_2.13-6.2.0-ccs-test-sources.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/connect-mirror-client-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jakarta.xml.bind-api-2.3.2.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/reflections-0.9.12.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/argparse4j-0.7.0.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/netty-transport-native-epoll-4.1.62.Final.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/activation-1.1.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-raft-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-tools-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-http-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka_2.13-6.2.0-ccs-javadoc.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/connect-api-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/connect-file-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jersey-container-servlet-2.34.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jersey-container-servlet-core-2.34.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/netty-resolver-4.1.62.Final.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jaxb-api-2.3.0.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-streams-test-utils-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-log4j-appender-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jersey-hk2-2.34.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/javax.servlet-api-3.1.0.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-datatype-jdk8-2.10.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/slf4j-log4j12-1.7.30.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/commons-lang3-3.8.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/plexus-utils-3.2.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/javax.ws.rs-api-2.1.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jersey-common-2.34.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka_2.13-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-module-paranamer-2.10.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/netty-common-4.1.62.Final.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/netty-transport-native-unix-common-4.1.62.Final.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka_2.13-6.2.0-ccs-test.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-streams-examples-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-io-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/osgi-resource-locator-1.0.3.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/rocksdbjni-5.18.4.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/aopalliance-repackaged-2.6.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-dataformat-csv-2.10.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/snappy-java-1.1.8.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-databind-2.10.5.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-continuation-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/hk2-locator-2.6.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/confluent-log4j-1.2.17-cp2.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/hk2-api-2.6.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jopt-simple-5.0.4.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/paranamer-2.8.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-module-jaxb-annotations-2.10.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-clients-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-servlets-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/connect-basic-auth-extension-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jline-3.12.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jersey-client-2.34.jar:/opt/confluent-6.2.0/bin/../share/java/confluent-telemetry/* (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,737] INFO Server environment:java.library.path=/opt/ibm/cics/lib:/opt/ibm/cicssm/lib:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,737] INFO Server environment:java.io.tmpdir=/tmp (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,737] INFO Server environment:java.compiler=<NA> (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,737] INFO Server environment:os.name=Linux (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,738] INFO Server environment:os.arch=amd64 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,738] INFO Server environment:os.version=4.18.0-193.el8.x86_64 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,738] INFO Server environment:user.name=root (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,738] INFO Server environment:user.home=/root (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,738] INFO Server environment:user.dir=/opt/confluent-6.2.0/bin (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,738] INFO Server environment:os.memory.free=497MB (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,738] INFO Server environment:os.memory.max=512MB (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,738] INFO Server environment:os.memory.total=512MB (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,740] INFO minSessionTimeout set to 6000 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,740] INFO maxSessionTimeout set to 60000 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,740] INFO Created server with tickTime 3000 minSessionTimeout 6000 maxSessionTimeout 60000 datadir /tmp/zookeeper/version-2 snapdir /tmp/zookeeper/version-2 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,749] INFO Using org.apache.zookeeper.server.NIOServerCnxnFactory as server connection factory (org.apache.zookeeper.server.ServerCnxnFactory)
[2021-08-14 09:24:44,753] INFO Configuring NIO connection handler with 10s sessionless connection timeout, 1 selector thread(s), 8 worker threads, and 64 kB direct buffers. (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2021-08-14 09:24:44,757] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2021-08-14 09:24:44,800] INFO zookeeper.snapshotSizeFactor = 0.33 (org.apache.zookeeper.server.ZKDatabase)
[2021-08-14 09:24:44,803] INFO Reading snapshot /tmp/zookeeper/version-2/snapshot.0 (org.apache.zookeeper.server.persistence.FileSnap)
[2021-08-14 09:24:44,810] INFO Snapshotting: 0x0 to /tmp/zookeeper/version-2/snapshot.0 (org.apache.zookeeper.server.persistence.FileTxnSnapLog)
[2021-08-14 09:24:44,822] INFO PrepRequestProcessor (sid:0) started, reconfigEnabled=false (org.apache.zookeeper.server.PrepRequestProcessor)
[2021-08-14 09:24:44,828] INFO Using checkIntervalMs=60000 maxPerMinute=10000 (org.apache.zookeeper.server.ContainerManager)
起動したっぽい。ポート2181がListenされた状態になりました。
起動時のログ見ると以下のメッセージが出てたので、ZooKeeperのバージョンとしては3.5.9らしい。
Server environment:zookeeper.version=3.5.9-83df9301aa5c2a5d284a9940177808c01bc35cef, built on 01/06/2021 20:03 GMT (org.apache.zookeeper.server.ZooKeeperServer)
Kafka(Broker)
構成
Configure Confluent Platform - Kafka
製品提供のプロパティファイルはこちら。
/opt/confluent-6.2.0/etc/kafka/server.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000
##################### Confluent Metrics Reporter #######################
# Confluent Control Center and Confluent Auto Data Balancer integration
#
# Uncomment the following lines to publish monitoring data for
# Confluent Control Center and Confluent Auto Data Balancer
# If you are using a dedicated metrics cluster, also adjust the settings
# to point to your metrics kakfa cluster.
#metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter
#confluent.metrics.reporter.bootstrap.servers=localhost:9092
#
# Uncomment the following line if the metrics cluster has a single broker
#confluent.metrics.reporter.topic.replicas=1
############################# Group Coordinator Settings #############################
# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
コメントが多くて長いのでパラメータ部分だけ抽出するとこんな感じ。
[root@test12 /opt/confluent-6.2.0/etc/kafka]# cat server.properties | grep -v -e "^#" | grep -v -e "^\s*$"
broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
パラメーターのリファレンスはこちら
Kafka Broker Configurations
※Listenするポート関連の設定がされていないが、リファレンスを見るとlisners
が指定されていなければport
が使われるっぽく、port
のデフォルトが9092になってました。
※topicに保持されるデータは実体としてはOS上のファイルとして保持されます。その書き出し先はlog.dirsパラメーターで指定され、上の通りデフォルトでは/tmp/kafka-logs
になっています。/tmp/ディレクトリ以下は通常は自動クリーンアップの対象になっていることが多いので、自動削除されないようにするにはこのパラメーターの設定を変更して出力先ディレクトリを変更するか、/tmpクリーンアップの設定を変更する必要があります。
参考:
Stack Overflow - Which directory does apache kafka store the data in broker nodes
Kafka Broker Configurations - log.dirs
【CentOS】/tmp配下のファイルが消える理由
起動/停止
- 起動コマンド:
bin/kafka-server-start -daemon <property_file>
- 停止コマンド:
bin/kafka-server-stop
- ログ:
logs/kafkaServer.out
- Listenポート: 9092
以下のコマンドで起動
/opt/confluent-6.2.0/bin/kafka-server-start -daemon /opt/confluent-6.2.0/etc/kafka/server.properties
起動時ログ: logs/kafkaServer.out
[2021-08-14 11:17:16,090] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2021-08-14 11:17:16,584] INFO Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation (org.apache.zookeeper.common.X509Util)
[2021-08-14 11:17:16,712] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
[2021-08-14 11:17:16,716] INFO starting (kafka.server.KafkaServer)
[2021-08-14 11:17:16,717] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2021-08-14 11:17:16,747] INFO [ZooKeeperClient Kafka server] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
[2021-08-14 11:17:16,752] INFO Client environment:zookeeper.version=3.5.9-83df9301aa5c2a5d284a9940177808c01bc35cef, built on 01/06/2021 20:03 GMT (org.apache.zookeeper.ZooKeeper)
[2021-08-14 11:17:16,752] INFO Client environment:host.name=localhost (org.apache.zookeeper.ZooKeeper)
[2021-08-14 11:17:16,752] INFO Client environment:java.version=1.8.0_242 (org.apache.zookeeper.ZooKeeper)
[2021-08-14 11:17:16,752] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
[2021-08-14 11:17:16,752] INFO Client environment:java.home=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.242.b08-4.el8.x86_64/jre (org.apache.zookeeper.ZooKeeper)
[2021-08-14 11:17:16,761] INFO Client environment:java.class.path=/opt/confluent-6.2.0/bin/../share/java/kafka/netty-transport-4.1.62.Final.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/netty-buffer-4.1.62.Final.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/connect-mirror-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-streams-scala_2.13-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-util-ajax-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jakarta.inject-2.6.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/audience-annotations-0.5.0.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/scala-java8-compat_2.13-0.9.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-metadata-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jakarta.activation-api-1.2.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/slf4j-api-1.7.30.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/scala-logging_2.13-3.9.2.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/zookeeper-jute-3.5.9.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/netty-codec-4.1.62.Final.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/connect-json-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-security-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/scala-reflect-2.13.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/scala-library-2.13.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-jaxrs-base-2.10.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka_2.13-6.2.0-ccs-sources.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/netty-handler-4.1.62.Final.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jakarta.ws.rs-api-2.1.6.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/zstd-jni-1.4.9-1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-util-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-server-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/zookeeper-3.5.9.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/javassist-3.27.0-GA.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-shell-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-client-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/commons-cli-1.4.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-core-2.10.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-servlet-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/connect-runtime-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/scala-collection-compat_2.13-2.3.0.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-jaxrs-json-provider-2.10.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-streams-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jakarta.validation-api-2.0.2.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-module-scala_2.13-2.10.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jakarta.annotation-api-1.3.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jersey-server-2.34.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/hk2-utils-2.6.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/lz4-java-1.7.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/maven-artifact-3.8.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/connect-transforms-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/metrics-core-2.2.0.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-annotations-2.10.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka_2.13-6.2.0-ccs-test-sources.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/connect-mirror-client-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jakarta.xml.bind-api-2.3.2.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/reflections-0.9.12.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/argparse4j-0.7.0.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/netty-transport-native-epoll-4.1.62.Final.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/activation-1.1.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-raft-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-tools-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-http-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka_2.13-6.2.0-ccs-javadoc.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/connect-api-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/connect-file-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jersey-container-servlet-2.34.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jersey-container-servlet-core-2.34.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/netty-resolver-4.1.62.Final.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jaxb-api-2.3.0.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-streams-test-utils-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-log4j-appender-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jersey-hk2-2.34.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/javax.servlet-api-3.1.0.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-datatype-jdk8-2.10.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/slf4j-log4j12-1.7.30.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/commons-lang3-3.8.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/plexus-utils-3.2.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/javax.ws.rs-api-2.1.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jersey-common-2.34.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka_2.13-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-module-paranamer-2.10.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/netty-common-4.1.62.Final.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/netty-transport-native-unix-common-4.1.62.Final.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka_2.13-6.2.0-ccs-test.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-streams-examples-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-io-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/osgi-resource-locator-1.0.3.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/rocksdbjni-5.18.4.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/aopalliance-repackaged-2.6.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-dataformat-csv-2.10.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/snappy-java-1.1.8.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-databind-2.10.5.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-continuation-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/hk2-locator-2.6.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/confluent-log4j-1.2.17-cp2.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/hk2-api-2.6.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jopt-simple-5.0.4.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/paranamer-2.8.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-module-jaxb-annotations-2.10.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-clients-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-servlets-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/connect-basic-auth-extension-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jline-3.12.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jersey-client-2.34.jar:/opt/confluent-6.2.0/bin/../share/java/confluent-telemetry/* (org.apache.zookeeper.ZooKeeper)
[2021-08-14 11:17:16,763] INFO Client environment:java.library.path=/opt/ibm/cics/lib:/opt/ibm/cicssm/lib:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)
[2021-08-14 11:17:16,763] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
[2021-08-14 11:17:16,763] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
[2021-08-14 11:17:16,763] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
[2021-08-14 11:17:16,763] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
[2021-08-14 11:17:16,763] INFO Client environment:os.version=4.18.0-193.el8.x86_64 (org.apache.zookeeper.ZooKeeper)
[2021-08-14 11:17:16,764] INFO Client environment:user.name=root (org.apache.zookeeper.ZooKeeper)
[2021-08-14 11:17:16,764] INFO Client environment:user.home=/root (org.apache.zookeeper.ZooKeeper)
[2021-08-14 11:17:16,764] INFO Client environment:user.dir=/opt/confluent-6.2.0/bin (org.apache.zookeeper.ZooKeeper)
[2021-08-14 11:17:16,764] INFO Client environment:os.memory.free=980MB (org.apache.zookeeper.ZooKeeper)
[2021-08-14 11:17:16,764] INFO Client environment:os.memory.max=1024MB (org.apache.zookeeper.ZooKeeper)
[2021-08-14 11:17:16,764] INFO Client environment:os.memory.total=1024MB (org.apache.zookeeper.ZooKeeper)
[2021-08-14 11:17:16,766] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=18000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@5b8dfcc1 (org.apache.zookeeper.ZooKeeper)
[2021-08-14 11:17:16,770] INFO jute.maxbuffer value is 4194304 Bytes (org.apache.zookeeper.ClientCnxnSocket)
[2021-08-14 11:17:16,785] INFO zookeeper.request.timeout value is 0. feature enabled= (org.apache.zookeeper.ClientCnxn)
[2021-08-14 11:17:16,796] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
[2021-08-14 11:17:16,798] INFO Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2021-08-14 11:17:16,802] INFO Socket connection established, initiating session, client: /0:0:0:0:0:0:0:1:37008, server: localhost/0:0:0:0:0:0:0:1:2181 (org.apache.zookeeper.ClientCnxn)
[2021-08-14 11:17:16,831] INFO Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x1000070216a0000, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn)
[2021-08-14 11:17:16,834] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)
[2021-08-14 11:17:17,053] INFO [feature-zk-node-event-process-thread]: Starting (kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread)
[2021-08-14 11:17:17,065] INFO Feature ZK node at path: /feature does not exist (kafka.server.FinalizedFeatureChangeListener)
[2021-08-14 11:17:17,065] INFO Cleared cache (kafka.server.FinalizedFeatureCache)
[2021-08-14 11:17:17,311] INFO Cluster ID = Mp9ekWqlR2-jIyzo3tDKMg (kafka.server.KafkaServer)
[2021-08-14 11:17:17,314] WARN No meta.properties file under dir /tmp/kafka-logs/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2021-08-14 11:17:17,381] INFO KafkaConfig values:
advertised.host.name = null
advertised.listeners = null
advertised.port = null
alter.config.policy.class.name = null
alter.log.dirs.replication.quota.window.num = 11
alter.log.dirs.replication.quota.window.size.seconds = 1
authorizer.class.name =
auto.create.topics.enable = true
auto.leader.rebalance.enable = true
background.threads = 10
broker.heartbeat.interval.ms = 2000
broker.id = 0
broker.id.generation.enable = true
broker.rack = null
broker.session.timeout.ms = 9000
client.quota.callback.class = null
compression.type = producer
connection.failed.authentication.delay.ms = 100
connections.max.idle.ms = 600000
connections.max.reauth.ms = 0
control.plane.listener.name = null
controlled.shutdown.enable = true
controlled.shutdown.max.retries = 3
controlled.shutdown.retry.backoff.ms = 5000
controller.listener.names = null
controller.quorum.append.linger.ms = 25
controller.quorum.election.backoff.max.ms = 1000
controller.quorum.election.timeout.ms = 1000
controller.quorum.fetch.timeout.ms = 2000
controller.quorum.request.timeout.ms = 2000
controller.quorum.retry.backoff.ms = 20
controller.quorum.voters = []
controller.quota.window.num = 11
controller.quota.window.size.seconds = 1
controller.socket.timeout.ms = 30000
create.topic.policy.class.name = null
default.replication.factor = 1
delegation.token.expiry.check.interval.ms = 3600000
delegation.token.expiry.time.ms = 86400000
delegation.token.master.key = null
delegation.token.max.lifetime.ms = 604800000
delegation.token.secret.key = null
delete.records.purgatory.purge.interval.requests = 1
delete.topic.enable = true
fetch.max.bytes = 57671680
fetch.purgatory.purge.interval.requests = 1000
group.initial.rebalance.delay.ms = 0
group.max.session.timeout.ms = 1800000
group.max.size = 2147483647
group.min.session.timeout.ms = 6000
host.name =
initial.broker.registration.timeout.ms = 60000
inter.broker.listener.name = null
inter.broker.protocol.version = 2.8-IV1
kafka.metrics.polling.interval.secs = 10
kafka.metrics.reporters = []
leader.imbalance.check.interval.seconds = 300
leader.imbalance.per.broker.percentage = 10
listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
listeners = null
log.cleaner.backoff.ms = 15000
log.cleaner.dedupe.buffer.size = 134217728
log.cleaner.delete.retention.ms = 86400000
log.cleaner.enable = true
log.cleaner.io.buffer.load.factor = 0.9
log.cleaner.io.buffer.size = 524288
log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
log.cleaner.max.compaction.lag.ms = 9223372036854775807
log.cleaner.min.cleanable.ratio = 0.5
log.cleaner.min.compaction.lag.ms = 0
log.cleaner.threads = 1
log.cleanup.policy = [delete]
log.dir = /tmp/kafka-logs
log.dirs = /tmp/kafka-logs
log.flush.interval.messages = 9223372036854775807
log.flush.interval.ms = null
log.flush.offset.checkpoint.interval.ms = 60000
log.flush.scheduler.interval.ms = 9223372036854775807
log.flush.start.offset.checkpoint.interval.ms = 60000
log.index.interval.bytes = 4096
log.index.size.max.bytes = 10485760
log.message.downconversion.enable = true
log.message.format.version = 2.8-IV1
log.message.timestamp.difference.max.ms = 9223372036854775807
log.message.timestamp.type = CreateTime
log.preallocate = false
log.retention.bytes = -1
log.retention.check.interval.ms = 300000
log.retention.hours = 168
log.retention.minutes = null
log.retention.ms = null
log.roll.hours = 168
log.roll.jitter.hours = 0
log.roll.jitter.ms = null
log.roll.ms = null
log.segment.bytes = 1073741824
log.segment.delete.delay.ms = 60000
max.connection.creation.rate = 2147483647
max.connections = 2147483647
max.connections.per.ip = 2147483647
max.connections.per.ip.overrides =
max.incremental.fetch.session.cache.slots = 1000
message.max.bytes = 1048588
metadata.log.dir = null
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
min.insync.replicas = 1
node.id = -1
num.io.threads = 8
num.network.threads = 3
num.partitions = 1
num.recovery.threads.per.data.dir = 1
num.replica.alter.log.dirs.threads = null
num.replica.fetchers = 1
offset.metadata.max.bytes = 4096
offsets.commit.required.acks = -1
offsets.commit.timeout.ms = 5000
offsets.load.buffer.size = 5242880
offsets.retention.check.interval.ms = 600000
offsets.retention.minutes = 10080
offsets.topic.compression.codec = 0
offsets.topic.num.partitions = 50
offsets.topic.replication.factor = 1
offsets.topic.segment.bytes = 104857600
password.encoder.cipher.algorithm = AES/CBC/PKCS5Padding
password.encoder.iterations = 4096
password.encoder.key.length = 128
password.encoder.keyfactory.algorithm = null
password.encoder.old.secret = null
password.encoder.secret = null
port = 9092
principal.builder.class = null
process.roles = []
producer.purgatory.purge.interval.requests = 1000
queued.max.request.bytes = -1
queued.max.requests = 500
quota.consumer.default = 9223372036854775807
quota.producer.default = 9223372036854775807
quota.window.num = 11
quota.window.size.seconds = 1
replica.fetch.backoff.ms = 1000
replica.fetch.max.bytes = 1048576
replica.fetch.min.bytes = 1
replica.fetch.response.max.bytes = 10485760
replica.fetch.wait.max.ms = 500
replica.high.watermark.checkpoint.interval.ms = 5000
replica.lag.time.max.ms = 30000
replica.selector.class = null
replica.socket.receive.buffer.bytes = 65536
replica.socket.timeout.ms = 30000
replication.quota.window.num = 11
replication.quota.window.size.seconds = 1
request.timeout.ms = 30000
reserved.broker.max.id = 1000
sasl.client.callback.handler.class = null
sasl.enabled.mechanisms = [GSSAPI]
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.principal.to.local.rules = [DEFAULT]
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism.controller.protocol = GSSAPI
sasl.mechanism.inter.broker.protocol = GSSAPI
sasl.server.callback.handler.class = null
security.inter.broker.protocol = PLAINTEXT
security.providers = null
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
socket.receive.buffer.bytes = 102400
socket.request.max.bytes = 104857600
socket.send.buffer.bytes = 102400
ssl.cipher.suites = []
ssl.client.auth = none
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.principal.mapping.rules = DEFAULT
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.abort.timed.out.transaction.cleanup.interval.ms = 10000
transaction.max.timeout.ms = 900000
transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
transaction.state.log.load.buffer.size = 5242880
transaction.state.log.min.isr = 1
transaction.state.log.num.partitions = 50
transaction.state.log.replication.factor = 1
transaction.state.log.segment.bytes = 104857600
transactional.id.expiration.ms = 604800000
unclean.leader.election.enable = false
zookeeper.clientCnxnSocket = null
zookeeper.connect = localhost:2181
zookeeper.connection.timeout.ms = 18000
zookeeper.max.in.flight.requests = 10
zookeeper.session.timeout.ms = 18000
zookeeper.set.acl = false
zookeeper.ssl.cipher.suites = null
zookeeper.ssl.client.enable = false
zookeeper.ssl.crl.enable = false
zookeeper.ssl.enabled.protocols = null
zookeeper.ssl.endpoint.identification.algorithm = HTTPS
zookeeper.ssl.keystore.location = null
zookeeper.ssl.keystore.password = null
zookeeper.ssl.keystore.type = null
zookeeper.ssl.ocsp.enable = false
zookeeper.ssl.protocol = TLSv1.2
zookeeper.ssl.truststore.location = null
zookeeper.ssl.truststore.password = null
zookeeper.ssl.truststore.type = null
zookeeper.sync.time.ms = 2000
(kafka.server.KafkaConfig)
[2021-08-14 11:17:17,394] INFO KafkaConfig values:
advertised.host.name = null
advertised.listeners = null
advertised.port = null
alter.config.policy.class.name = null
alter.log.dirs.replication.quota.window.num = 11
alter.log.dirs.replication.quota.window.size.seconds = 1
authorizer.class.name =
auto.create.topics.enable = true
auto.leader.rebalance.enable = true
background.threads = 10
broker.heartbeat.interval.ms = 2000
broker.id = 0
broker.id.generation.enable = true
broker.rack = null
broker.session.timeout.ms = 9000
client.quota.callback.class = null
compression.type = producer
connection.failed.authentication.delay.ms = 100
connections.max.idle.ms = 600000
connections.max.reauth.ms = 0
control.plane.listener.name = null
controlled.shutdown.enable = true
controlled.shutdown.max.retries = 3
controlled.shutdown.retry.backoff.ms = 5000
controller.listener.names = null
controller.quorum.append.linger.ms = 25
controller.quorum.election.backoff.max.ms = 1000
controller.quorum.election.timeout.ms = 1000
controller.quorum.fetch.timeout.ms = 2000
controller.quorum.request.timeout.ms = 2000
controller.quorum.retry.backoff.ms = 20
controller.quorum.voters = []
controller.quota.window.num = 11
controller.quota.window.size.seconds = 1
controller.socket.timeout.ms = 30000
create.topic.policy.class.name = null
default.replication.factor = 1
delegation.token.expiry.check.interval.ms = 3600000
delegation.token.expiry.time.ms = 86400000
delegation.token.master.key = null
delegation.token.max.lifetime.ms = 604800000
delegation.token.secret.key = null
delete.records.purgatory.purge.interval.requests = 1
delete.topic.enable = true
fetch.max.bytes = 57671680
fetch.purgatory.purge.interval.requests = 1000
group.initial.rebalance.delay.ms = 0
group.max.session.timeout.ms = 1800000
group.max.size = 2147483647
group.min.session.timeout.ms = 6000
host.name =
initial.broker.registration.timeout.ms = 60000
inter.broker.listener.name = null
inter.broker.protocol.version = 2.8-IV1
kafka.metrics.polling.interval.secs = 10
kafka.metrics.reporters = []
leader.imbalance.check.interval.seconds = 300
leader.imbalance.per.broker.percentage = 10
listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
listeners = null
log.cleaner.backoff.ms = 15000
log.cleaner.dedupe.buffer.size = 134217728
log.cleaner.delete.retention.ms = 86400000
log.cleaner.enable = true
log.cleaner.io.buffer.load.factor = 0.9
log.cleaner.io.buffer.size = 524288
log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
log.cleaner.max.compaction.lag.ms = 9223372036854775807
log.cleaner.min.cleanable.ratio = 0.5
log.cleaner.min.compaction.lag.ms = 0
log.cleaner.threads = 1
log.cleanup.policy = [delete]
log.dir = /tmp/kafka-logs
log.dirs = /tmp/kafka-logs
log.flush.interval.messages = 9223372036854775807
log.flush.interval.ms = null
log.flush.offset.checkpoint.interval.ms = 60000
log.flush.scheduler.interval.ms = 9223372036854775807
log.flush.start.offset.checkpoint.interval.ms = 60000
log.index.interval.bytes = 4096
log.index.size.max.bytes = 10485760
log.message.downconversion.enable = true
log.message.format.version = 2.8-IV1
log.message.timestamp.difference.max.ms = 9223372036854775807
log.message.timestamp.type = CreateTime
log.preallocate = false
log.retention.bytes = -1
log.retention.check.interval.ms = 300000
log.retention.hours = 168
log.retention.minutes = null
log.retention.ms = null
log.roll.hours = 168
log.roll.jitter.hours = 0
log.roll.jitter.ms = null
log.roll.ms = null
log.segment.bytes = 1073741824
log.segment.delete.delay.ms = 60000
max.connection.creation.rate = 2147483647
max.connections = 2147483647
max.connections.per.ip = 2147483647
max.connections.per.ip.overrides =
max.incremental.fetch.session.cache.slots = 1000
message.max.bytes = 1048588
metadata.log.dir = null
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
min.insync.replicas = 1
node.id = -1
num.io.threads = 8
num.network.threads = 3
num.partitions = 1
num.recovery.threads.per.data.dir = 1
num.replica.alter.log.dirs.threads = null
num.replica.fetchers = 1
offset.metadata.max.bytes = 4096
offsets.commit.required.acks = -1
offsets.commit.timeout.ms = 5000
offsets.load.buffer.size = 5242880
offsets.retention.check.interval.ms = 600000
offsets.retention.minutes = 10080
offsets.topic.compression.codec = 0
offsets.topic.num.partitions = 50
offsets.topic.replication.factor = 1
offsets.topic.segment.bytes = 104857600
password.encoder.cipher.algorithm = AES/CBC/PKCS5Padding
password.encoder.iterations = 4096
password.encoder.key.length = 128
password.encoder.keyfactory.algorithm = null
password.encoder.old.secret = null
password.encoder.secret = null
port = 9092
principal.builder.class = null
process.roles = []
producer.purgatory.purge.interval.requests = 1000
queued.max.request.bytes = -1
queued.max.requests = 500
quota.consumer.default = 9223372036854775807
quota.producer.default = 9223372036854775807
quota.window.num = 11
quota.window.size.seconds = 1
replica.fetch.backoff.ms = 1000
replica.fetch.max.bytes = 1048576
replica.fetch.min.bytes = 1
replica.fetch.response.max.bytes = 10485760
replica.fetch.wait.max.ms = 500
replica.high.watermark.checkpoint.interval.ms = 5000
replica.lag.time.max.ms = 30000
replica.selector.class = null
replica.socket.receive.buffer.bytes = 65536
replica.socket.timeout.ms = 30000
replication.quota.window.num = 11
replication.quota.window.size.seconds = 1
request.timeout.ms = 30000
reserved.broker.max.id = 1000
sasl.client.callback.handler.class = null
sasl.enabled.mechanisms = [GSSAPI]
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.principal.to.local.rules = [DEFAULT]
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism.controller.protocol = GSSAPI
sasl.mechanism.inter.broker.protocol = GSSAPI
sasl.server.callback.handler.class = null
security.inter.broker.protocol = PLAINTEXT
security.providers = null
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
socket.receive.buffer.bytes = 102400
socket.request.max.bytes = 104857600
socket.send.buffer.bytes = 102400
ssl.cipher.suites = []
ssl.client.auth = none
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.principal.mapping.rules = DEFAULT
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.abort.timed.out.transaction.cleanup.interval.ms = 10000
transaction.max.timeout.ms = 900000
transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
transaction.state.log.load.buffer.size = 5242880
transaction.state.log.min.isr = 1
transaction.state.log.num.partitions = 50
transaction.state.log.replication.factor = 1
transaction.state.log.segment.bytes = 104857600
transactional.id.expiration.ms = 604800000
unclean.leader.election.enable = false
zookeeper.clientCnxnSocket = null
zookeeper.connect = localhost:2181
zookeeper.connection.timeout.ms = 18000
zookeeper.max.in.flight.requests = 10
zookeeper.session.timeout.ms = 18000
zookeeper.set.acl = false
zookeeper.ssl.cipher.suites = null
zookeeper.ssl.client.enable = false
zookeeper.ssl.crl.enable = false
zookeeper.ssl.enabled.protocols = null
zookeeper.ssl.endpoint.identification.algorithm = HTTPS
zookeeper.ssl.keystore.location = null
zookeeper.ssl.keystore.password = null
zookeeper.ssl.keystore.type = null
zookeeper.ssl.ocsp.enable = false
zookeeper.ssl.protocol = TLSv1.2
zookeeper.ssl.truststore.location = null
zookeeper.ssl.truststore.password = null
zookeeper.ssl.truststore.type = null
zookeeper.sync.time.ms = 2000
(kafka.server.KafkaConfig)
[2021-08-14 11:17:17,454] INFO [ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2021-08-14 11:17:17,455] INFO [ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2021-08-14 11:17:17,456] INFO [ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2021-08-14 11:17:17,458] INFO [ThrottledChannelReaper-ControllerMutation]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2021-08-14 11:17:17,484] INFO Log directory /tmp/kafka-logs not found, creating it. (kafka.log.LogManager)
[2021-08-14 11:17:17,500] INFO Loading logs from log dirs ArraySeq(/tmp/kafka-logs) (kafka.log.LogManager)
[2021-08-14 11:17:17,506] INFO Attempting recovery for all logs in /tmp/kafka-logs since no clean shutdown file was found (kafka.log.LogManager)
[2021-08-14 11:17:17,532] INFO Loaded 0 logs in 33ms. (kafka.log.LogManager)
[2021-08-14 11:17:17,533] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
[2021-08-14 11:17:17,536] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2021-08-14 11:17:18,486] INFO Updated connection-accept-rate max connection creation rate to 2147483647 (kafka.network.ConnectionQuotas)
[2021-08-14 11:17:18,490] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2021-08-14 11:17:18,568] INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Created data-plane acceptor and processors for endpoint : ListenerName(PLAINTEXT) (kafka.network.SocketServer)
[2021-08-14 11:17:18,614] INFO [broker-0-to-controller-send-thread]: Starting (kafka.server.BrokerToControllerRequestThread)
[2021-08-14 11:17:18,647] INFO [ExpirationReaper-0-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2021-08-14 11:17:18,648] INFO [ExpirationReaper-0-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2021-08-14 11:17:18,648] INFO [ExpirationReaper-0-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2021-08-14 11:17:18,649] INFO [ExpirationReaper-0-ElectLeader]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2021-08-14 11:17:18,670] INFO [LogDirFailureHandler]: Starting (kafka.server.ReplicaManager$LogDirFailureHandler)
[2021-08-14 11:17:18,701] INFO Creating /brokers/ids/0 (is it secure? false) (kafka.zk.KafkaZkClient)
[2021-08-14 11:17:18,720] INFO Stat of the created znode at /brokers/ids/0 is: 25,25,1628907438713,1628907438713,1,0,0,72058075634860032,202,0,25
(kafka.zk.KafkaZkClient)
[2021-08-14 11:17:18,721] INFO Registered broker 0 at path /brokers/ids/0 with addresses: PLAINTEXT://localhost:9092, czxid (broker epoch): 25 (kafka.zk.KafkaZkClient)
[2021-08-14 11:17:18,788] INFO [ExpirationReaper-0-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2021-08-14 11:17:18,799] INFO [ExpirationReaper-0-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2021-08-14 11:17:18,801] INFO [ExpirationReaper-0-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2021-08-14 11:17:18,803] INFO Successfully created /controller_epoch with initial epoch 0 (kafka.zk.KafkaZkClient)
[2021-08-14 11:17:18,815] INFO [GroupCoordinator 0]: Starting up. (kafka.coordinator.group.GroupCoordinator)
[2021-08-14 11:17:18,849] INFO [GroupCoordinator 0]: Startup complete. (kafka.coordinator.group.GroupCoordinator)
[2021-08-14 11:17:18,850] INFO Feature ZK node created at path: /feature (kafka.server.FinalizedFeatureChangeListener)
[2021-08-14 11:17:18,897] INFO Updated cache from existing <empty> to latest FinalizedFeaturesAndEpoch(features=Features{}, epoch=0). (kafka.server.FinalizedFeatureCache)
[2021-08-14 11:17:18,902] INFO [ProducerId Manager 0]: Acquired new producerId block (brokerId:0,blockStartProducerId:0,blockEndProducerId:999) by writing to Zk with path version 1 (kafka.coordinator.transaction.ProducerIdManager)
[2021-08-14 11:17:18,902] INFO [TransactionCoordinator id=0] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
[2021-08-14 11:17:18,906] INFO [TransactionCoordinator id=0] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2021-08-14 11:17:18,908] INFO [Transaction Marker Channel Manager 0]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2021-08-14 11:17:18,946] INFO [ExpirationReaper-0-AlterAcls]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2021-08-14 11:17:18,974] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
[2021-08-14 11:17:19,052] INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Starting socket server acceptors and processors (kafka.network.SocketServer)
[2021-08-14 11:17:19,069] INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Started data-plane acceptor and processor(s) for endpoint : ListenerName(PLAINTEXT) (kafka.network.SocketServer)
[2021-08-14 11:17:19,070] INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Started socket server acceptors and processors (kafka.network.SocketServer)
[2021-08-14 11:17:19,074] INFO Kafka version: 6.2.0-ccs (org.apache.kafka.common.utils.AppInfoParser)
[2021-08-14 11:17:19,074] INFO Kafka commitId: 1a5755cf9401c84f (org.apache.kafka.common.utils.AppInfoParser)
[2021-08-14 11:17:19,074] INFO Kafka startTimeMs: 1628907439070 (org.apache.kafka.common.utils.AppInfoParser)
[2021-08-14 11:17:19,076] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
[2021-08-14 11:17:19,134] INFO [broker-0-to-controller-send-thread]: Recorded new controller, from now on will use broker localhost:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
ポート9092がListenされた状態になりました。
Schema Registry
構成
Configure Confluent Platform - Schema Registry
製品提供のプロパティファイルはこちら。
etc/schema-registry/schema-registry.properties
#
# Copyright 2018 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# The address the socket server listens on.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=http://0.0.0.0:8081
# Zookeeper connection string for the Zookeeper cluster used by your Kafka cluster
# (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# Note: use of this property is deprecated.
#kafkastore.connection.url=localhost:2181
# Alternatively, Schema Registry can now operate without Zookeeper, handling all coordination via
# Kafka brokers. Use this setting to specify the bootstrap servers for your Kafka cluster and it
# will be used both for selecting the leader schema registry instance and for storing the data for
# registered schemas.
# (Note that you cannot mix the two modes; use this mode only on new deployments or by shutting down
# all instances, switching to the new configuration, and then starting the schema registry
# instances again.)
kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092
# The name of the topic to store schemas in
kafkastore.topic=_schemas
# If true, API requests that fail will include extra debugging information, including stack traces
debug=false
パラメーターのリファレンスはこちら
Schema Registry Configuration Options
起動/停止
- 起動コマンド:
bin/schema-registry-start -daemon <property_file>
- 停止コマンド:
bin/schema-registry-stop
- ログ:
logs/schema-registry.log
- Listenポート: 8081
以下のコマンドで起動
/opt/confluent-6.2.0/bin/schema-registry-start -daemon /opt/confluent-6.2.0/etc/schema-registry/schema-registry.properties
起動時ログ: logs/schema-registry.log
[2021-08-14 11:57:16,923] INFO SchemaRegistryConfig values:
access.control.allow.headers =
access.control.allow.methods =
access.control.allow.origin =
access.control.skip.options = true
authentication.method = NONE
authentication.realm =
authentication.roles = [*]
authentication.skip.paths = []
avro.compatibility.level =
compression.enable = true
csrf.prevention.enable = false
csrf.prevention.token.endpoint = /csrf
csrf.prevention.token.expiration.minutes = 30
csrf.prevention.token.max.entries = 10000
debug = false
host.name = localhost
idle.timeout.ms = 30000
inter.instance.headers.whitelist = []
inter.instance.protocol = http
kafkastore.bootstrap.servers = [PLAINTEXT://localhost:9092]
kafkastore.checkpoint.dir = /tmp
kafkastore.checkpoint.version = 0
kafkastore.connection.url =
kafkastore.group.id =
kafkastore.init.timeout.ms = 60000
kafkastore.sasl.kerberos.kinit.cmd = /usr/bin/kinit
kafkastore.sasl.kerberos.min.time.before.relogin = 60000
kafkastore.sasl.kerberos.service.name =
kafkastore.sasl.kerberos.ticket.renew.jitter = 0.05
kafkastore.sasl.kerberos.ticket.renew.window.factor = 0.8
kafkastore.sasl.mechanism = GSSAPI
kafkastore.security.protocol = PLAINTEXT
kafkastore.ssl.cipher.suites =
kafkastore.ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1
kafkastore.ssl.endpoint.identification.algorithm =
kafkastore.ssl.key.password = [hidden]
kafkastore.ssl.keymanager.algorithm = SunX509
kafkastore.ssl.keystore.location =
kafkastore.ssl.keystore.password = [hidden]
kafkastore.ssl.keystore.type = JKS
kafkastore.ssl.protocol = TLS
kafkastore.ssl.provider =
kafkastore.ssl.trustmanager.algorithm = PKIX
kafkastore.ssl.truststore.location =
kafkastore.ssl.truststore.password = [hidden]
kafkastore.ssl.truststore.type = JKS
kafkastore.timeout.ms = 500
kafkastore.topic = _schemas
kafkastore.topic.replication.factor = 3
kafkastore.topic.skip.validation = false
kafkastore.update.handlers = []
kafkastore.write.max.retries = 5
kafkastore.zk.session.timeout.ms = 30000
leader.eligibility = true
listeners = [http://0.0.0.0:8081]
master.eligibility = null
metric.reporters = []
metrics.jmx.prefix = kafka.schema.registry
metrics.num.samples = 2
metrics.sample.window.ms = 30000
metrics.tag.map = []
mode.mutability = true
port = 8081
request.logger.name = io.confluent.rest-utils.requests
request.queue.capacity = 2147483647
request.queue.capacity.growby = 64
request.queue.capacity.init = 128
resource.extension.class = []
resource.extension.classes = []
resource.static.locations = []
response.http.headers.config =
response.mediatype.default = application/vnd.schemaregistry.v1+json
response.mediatype.preferred = [application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json]
rest.servlet.initializor.classes = []
schema.cache.expiry.secs = 300
schema.cache.size = 1000
schema.compatibility.level = backward
schema.providers = []
schema.registry.group.id = schema-registry
schema.registry.inter.instance.protocol =
schema.registry.resource.extension.class = []
schema.registry.zk.namespace = schema_registry
shutdown.graceful.ms = 1000
ssl.cipher.suites = []
ssl.client.auth = false
ssl.client.authentication = NONE
ssl.enabled.protocols = []
ssl.endpoint.identification.algorithm = null
ssl.key.password = [hidden]
ssl.keymanager.algorithm =
ssl.keystore.location =
ssl.keystore.password = [hidden]
ssl.keystore.reload = false
ssl.keystore.type = JKS
ssl.keystore.watch.location =
ssl.protocol = TLS
ssl.provider =
ssl.trustmanager.algorithm =
ssl.truststore.location =
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
thread.pool.max = 200
thread.pool.min = 8
websocket.path.prefix = /ws
websocket.servlet.initializor.classes = []
zookeeper.set.acl = false
(io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig)
[2021-08-14 11:57:16,998] INFO Logging initialized @1039ms to org.eclipse.jetty.util.log.Slf4jLog (org.eclipse.jetty.util.log)
[2021-08-14 11:57:17,007] INFO Initial capacity 128, increased by 64, maximum capacity 2147483647. (io.confluent.rest.ApplicationServer)
[2021-08-14 11:57:17,117] INFO Adding listener: http://0.0.0.0:8081 (io.confluent.rest.ApplicationServer)
[2021-08-14 11:57:17,999] INFO Registering schema provider for AVRO: io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider (io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry)
[2021-08-14 11:57:17,999] INFO Registering schema provider for JSON: io.confluent.kafka.schemaregistry.json.JsonSchemaProvider (io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry)
[2021-08-14 11:57:17,999] INFO Registering schema provider for PROTOBUF: io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider (io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry)
[2021-08-14 11:57:18,063] INFO Initializing KafkaStore with broker endpoints: PLAINTEXT://localhost:9092 (io.confluent.kafka.schemaregistry.storage.KafkaStore)
[2021-08-14 11:57:18,088] INFO Creating schemas topic _schemas (io.confluent.kafka.schemaregistry.storage.KafkaStore)
[2021-08-14 11:57:18,090] WARN Creating the schema topic _schemas using a replication factor of 1, which is less than the desired one of 3. If this is a production environment, it's crucial to add more brokers and increase the replication factor of the topic. (io.confluent.kafka.schemaregistry.storage.KafkaStore)
[2021-08-14 11:57:18,442] INFO Kafka store reader thread starting consumer (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread)
[2021-08-14 11:57:18,603] INFO Seeking to beginning for all partitions (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread)
[2021-08-14 11:57:18,604] INFO Initialized last consumed offset to -1 (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread)
[2021-08-14 11:57:18,606] INFO [kafka-store-reader-thread-_schemas]: Starting (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread)
[2021-08-14 11:57:18,851] INFO Wait to catch up until the offset at 0 (io.confluent.kafka.schemaregistry.storage.KafkaStore)
[2021-08-14 11:57:18,963] INFO Reached offset at 0 (io.confluent.kafka.schemaregistry.storage.KafkaStore)
[2021-08-14 11:57:18,964] INFO Joining schema registry with Kafka-based coordination (io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry)
[2021-08-14 11:57:19,843] INFO Finished rebalance with leader election result: Assignment{version=1, error=0, leader='sr-1-76d7d323-be6d-42fa-b17e-d6d60fd14ac6', leaderIdentity=version=1,host=localhost,port=8081,scheme=http,leaderEligibility=true} (io.confluent.kafka.schemaregistry.leaderelector.kafka.KafkaGroupLeaderElector)
[2021-08-14 11:57:19,877] INFO Wait to catch up until the offset at 1 (io.confluent.kafka.schemaregistry.storage.KafkaStore)
[2021-08-14 11:57:19,881] INFO Reached offset at 1 (io.confluent.kafka.schemaregistry.storage.KafkaStore)
[2021-08-14 11:57:20,084] INFO jetty-9.4.40.v20210413; built: 2021-04-13T20:42:42.668Z; git: b881a572662e1943a14ae12e7e1207989f218b74; jvm 1.8.0_242-b08 (org.eclipse.jetty.server.Server)
[2021-08-14 11:57:20,185] INFO DefaultSessionIdManager workerName=node0 (org.eclipse.jetty.server.session)
[2021-08-14 11:57:20,186] INFO No SessionScavenger set, using defaults (org.eclipse.jetty.server.session)
[2021-08-14 11:57:20,187] INFO node0 Scavenging every 600000ms (org.eclipse.jetty.server.session)
[2021-08-14 11:57:20,992] INFO HV000001: Hibernate Validator 6.1.7.Final (org.hibernate.validator.internal.util.Version)
[2021-08-14 11:57:21,380] INFO Started o.e.j.s.ServletContextHandler@3e7dd664{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler)
[2021-08-14 11:57:21,401] INFO Started o.e.j.s.ServletContextHandler@71c27ee8{/ws,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler)
[2021-08-14 11:57:21,431] INFO Started NetworkTrafficServerConnector@4c762604{HTTP/1.1, (http/1.1)}{0.0.0.0:8081} (org.eclipse.jetty.server.AbstractConnector)
[2021-08-14 11:57:21,432] INFO Started @5478ms (org.eclipse.jetty.server.Server)
[2021-08-14 11:57:21,432] INFO Server started, listening for requests... (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain)
ポート8081がListenされた状態になりました。
参考: 起動/停止関連スクリプト
/opt/confluent-6.2.0/bin/zookeeper-server-start
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
if [ $# -lt 1 ];
then
echo "USAGE: $0 [-daemon] zookeeper.properties"
exit 1
fi
base_dir=$(dirname $0)
if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
LOG4J_CONFIG_NORMAL_INSTALL="/etc/kafka/log4j.properties"
LOG4J_CONFIG_ZIP_INSTALL="$base_dir/../etc/kafka/log4j.properties"
if [ -e "$LOG4J_CONFIG_NORMAL_INSTALL" ]; then # Normal install layout
KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${LOG4J_CONFIG_NORMAL_INSTALL}"
elif [ -e "${LOG4J_CONFIG_ZIP_INSTALL}" ]; then # Simple zip file layout
KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${LOG4J_CONFIG_ZIP_INSTALL}"
else # Fallback to normal default
KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
fi
fi
export KAFKA_LOG4J_OPTS
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M"
fi
EXTRA_ARGS=${EXTRA_ARGS-'-name zookeeper -loggc'}
COMMAND=$1
case $COMMAND in
-daemon)
EXTRA_ARGS="-daemon "$EXTRA_ARGS
shift
;;
*)
;;
esac
exec $base_dir/kafka-run-class $EXTRA_ARGS org.apache.zookeeper.server.quorum.QuorumPeerMain "$@"
/opt/confluent-6.2.0/bin/zookeeper-server-stop
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
SIGNAL=${SIGNAL:-TERM}
OSNAME=$(uname -s)
if [[ "$OSNAME" == "OS/390" ]]; then
if [ -z $JOBNAME ]; then
JOBNAME="ZKEESTRT"
fi
PIDS=$(ps -A -o pid,jobname,comm | grep -i $JOBNAME | grep java | grep -v grep | awk '{print $1}')
elif [[ "$OSNAME" == "OS400" ]]; then
PIDS=$(ps -Af | grep java | grep -i QuorumPeerMain | grep -v grep | awk '{print $2}')
else
PIDS=$(ps ax | grep java | grep -i QuorumPeerMain | grep -v grep | awk '{print $1}')
fi
if [ -z "$PIDS" ]; then
echo "No zookeeper server to stop"
exit 1
else
kill -s $SIGNAL $PIDS
fi
/opt/confluent-6.2.0/bin/kafka-server-start
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
if [ $# -lt 1 ];
then
echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
exit 1
fi
base_dir=$(dirname $0)
if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
LOG4J_CONFIG_NORMAL_INSTALL="/etc/kafka/log4j.properties"
LOG4J_CONFIG_ZIP_INSTALL="$base_dir/../etc/kafka/log4j.properties"
if [ -e "$LOG4J_CONFIG_NORMAL_INSTALL" ]; then # Normal install layout
KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${LOG4J_CONFIG_NORMAL_INSTALL}"
elif [ -e "${LOG4J_CONFIG_ZIP_INSTALL}" ]; then # Simple zip file layout
KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${LOG4J_CONFIG_ZIP_INSTALL}"
else # Fallback to normal default
KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
fi
fi
export KAFKA_LOG4J_OPTS
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}
COMMAND=$1
case $COMMAND in
-daemon)
EXTRA_ARGS="-daemon "$EXTRA_ARGS
shift
;;
*)
;;
esac
exec $base_dir/kafka-run-class $EXTRA_ARGS kafka.Kafka "$@"
/opt/confluent-6.2.0/bin/kafka-server-stop
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
SIGNAL=${SIGNAL:-TERM}
OSNAME=$(uname -s)
if [[ "$OSNAME" == "OS/390" ]]; then
if [ -z $JOBNAME ]; then
JOBNAME="KAFKSTRT"
fi
PIDS=$(ps -A -o pid,jobname,comm | grep -i $JOBNAME | grep java | grep -v grep | awk '{print $1}')
elif [[ "$OSNAME" == "OS400" ]]; then
PIDS=$(ps -Af | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $2}')
else
PIDS=$(ps ax | grep ' kafka\.Kafka ' | grep java | grep -v grep | awk '{print $1}')
PIDS_SUPPORT=$(ps ax | grep -i 'io\.confluent\.support\.metrics\.SupportedKafka' | grep java | grep -v grep | awk '{print $1}')
fi
if [ -z "$PIDS" ]; then
# Normal Kafka is not running, but maybe we are running the support wrapper?
if [ -z "${PIDS_SUPPORT}" ]; then
echo "No kafka server to stop"
exit 1
else
kill -s $SIGNAL $PIDS_SUPPORT
fi
else
kill -s $SIGNAL $PIDS
fi
/opt/confluent-6.2.0/bin/kafka-run-class
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
if [ $# -lt 1 ];
then
echo "USAGE: $0 [-daemon] [-name servicename] [-loggc] classname [opts]"
exit 1
fi
# CYGWIN == 1 if Cygwin is detected, else 0.
if [[ $(uname -a) =~ "CYGWIN" ]]; then
CYGWIN=1
else
CYGWIN=0
fi
if [ -z "$INCLUDE_TEST_JARS" ]; then
INCLUDE_TEST_JARS=false
fi
# Exclude jars not necessary for running commands.
regex="(-(test|test-sources|src|scaladoc|javadoc)\.jar|jar.asc)$"
should_include_file() {
if [ "$INCLUDE_TEST_JARS" = true ]; then
return 0
fi
file=$1
if [ -z "$(echo "$file" | egrep "$regex")" ] ; then
return 0
else
return 1
fi
}
base_dir=$(dirname $0)/..
if [ -z "$SCALA_VERSION" ]; then
SCALA_VERSION=2.13.5
if [[ -f "$base_dir/gradle.properties" ]]; then
SCALA_VERSION=`grep "^scalaVersion=" "$base_dir/gradle.properties" | cut -d= -f 2`
fi
fi
if [ -z "$SCALA_BINARY_VERSION" ]; then
SCALA_BINARY_VERSION=$(echo $SCALA_VERSION | cut -f 1-2 -d '.')
fi
# run ./gradlew copyDependantLibs to get all dependant jars in a local dir
shopt -s nullglob
if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
for dir in "$base_dir"/core/build/dependant-libs-${SCALA_VERSION}*;
do
CLASSPATH="$CLASSPATH:$dir/*"
done
fi
for file in "$base_dir"/examples/build/libs/kafka-examples*.jar;
do
if should_include_file "$file"; then
CLASSPATH="$CLASSPATH":"$file"
fi
done
if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
clients_lib_dir=$(dirname $0)/../clients/build/libs
streams_lib_dir=$(dirname $0)/../streams/build/libs
streams_dependant_clients_lib_dir=$(dirname $0)/../streams/build/dependant-libs-${SCALA_VERSION}
else
clients_lib_dir=/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs
streams_lib_dir=$clients_lib_dir
streams_dependant_clients_lib_dir=$streams_lib_dir
fi
for file in "$clients_lib_dir"/kafka-clients*.jar;
do
if should_include_file "$file"; then
CLASSPATH="$CLASSPATH":"$file"
fi
done
for file in "$streams_lib_dir"/kafka-streams*.jar;
do
if should_include_file "$file"; then
CLASSPATH="$CLASSPATH":"$file"
fi
done
if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar;
do
if should_include_file "$file"; then
CLASSPATH="$CLASSPATH":"$file"
fi
done
else
VERSION_NO_DOTS=`echo $UPGRADE_KAFKA_STREAMS_TEST_VERSION | sed 's/\.//g'`
SHORT_VERSION_NO_DOTS=${VERSION_NO_DOTS:0:((${#VERSION_NO_DOTS} - 1))} # remove last char, ie, bug-fix number
for file in "$base_dir"/streams/upgrade-system-tests-$SHORT_VERSION_NO_DOTS/build/libs/kafka-streams-upgrade-system-tests*.jar;
do
if should_include_file "$file"; then
CLASSPATH="$file":"$CLASSPATH"
fi
done
if [ "$SHORT_VERSION_NO_DOTS" = "0100" ]; then
CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zkclient-0.8.jar":"$CLASSPATH"
CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zookeeper-3.4.6.jar":"$CLASSPATH"
fi
if [ "$SHORT_VERSION_NO_DOTS" = "0101" ]; then
CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zkclient-0.9.jar":"$CLASSPATH"
CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zookeeper-3.4.8.jar":"$CLASSPATH"
fi
fi
for file in "$streams_dependant_clients_lib_dir"/rocksdb*.jar;
do
CLASSPATH="$CLASSPATH":"$file"
done
for file in "$streams_dependant_clients_lib_dir"/*hamcrest*.jar;
do
CLASSPATH="$CLASSPATH":"$file"
done
for file in "$base_dir"/shell/build/libs/kafka-shell*.jar;
do
if should_include_file "$file"; then
CLASSPATH="$CLASSPATH":"$file"
fi
done
for dir in "$base_dir"/shell/build/dependant-libs-${SCALA_VERSION}*;
do
CLASSPATH="$CLASSPATH:$dir/*"
done
for file in "$base_dir"/tools/build/libs/kafka-tools*.jar;
do
if should_include_file "$file"; then
CLASSPATH="$CLASSPATH":"$file"
fi
done
for dir in "$base_dir"/tools/build/dependant-libs-${SCALA_VERSION}*;
do
CLASSPATH="$CLASSPATH:$dir/*"
done
for cc_pkg in "api" "transforms" "runtime" "file" "mirror" "mirror-client" "json" "tools" "basic-auth-extension"
do
for file in "$base_dir"/connect/${cc_pkg}/build/libs/connect-${cc_pkg}*.jar;
do
if should_include_file "$file"; then
CLASSPATH="$CLASSPATH":"$file"
fi
done
if [ -d "$base_dir/connect/${cc_pkg}/build/dependant-libs" ] ; then
CLASSPATH="$CLASSPATH:$base_dir/connect/${cc_pkg}/build/dependant-libs/*"
fi
done
# classpath addition for release
for file in "$base_dir"/libs/*;
do
if should_include_file "$file"; then
CLASSPATH="$CLASSPATH":"$file"
fi
done
# CONFLUENT: classpath addition for releases with LSB-style layout
CLASSPATH="$CLASSPATH":"$base_dir/share/java/kafka/*"
# classpath for telemetry
CLASSPATH="$CLASSPATH":"$base_dir/share/java/confluent-telemetry/*"
for file in "$base_dir"/core/build/libs/kafka_${SCALA_BINARY_VERSION}*.jar;
do
if should_include_file "$file"; then
CLASSPATH="$CLASSPATH":"$file"
fi
done
shopt -u nullglob
if [ -z "$CLASSPATH" ] ; then
echo "Classpath is empty. Please build the project first e.g. by running './gradlew jar -PscalaVersion=$SCALA_VERSION'"
exit 1
fi
# JMX settings
if [ -z "$KAFKA_JMX_OPTS" ]; then
KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false "
fi
# JMX port to use
if [ $JMX_PORT ]; then
KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT "
fi
# Log directory to use
if [ "x$LOG_DIR" = "x" ]; then
LOG_DIR="$base_dir/logs"
fi
# Log4j settings
if [ -z "$KAFKA_LOG4J_OPTS" ]; then
# Log to console. This is a tool.
LOG4J_CONFIG_NORMAL_INSTALL="/etc/kafka/tools-log4j.properties"
LOG4J_CONFIG_ZIP_INSTALL="$base_dir/etc/kafka/tools-log4j.properties"
if [ -e "$LOG4J_CONFIG_NORMAL_INSTALL" ]; then # Normal install layout
LOG4J_DIR="${LOG4J_CONFIG_NORMAL_INSTALL}"
elif [ -e "${LOG4J_CONFIG_ZIP_INSTALL}" ]; then # Simple zip file layout
LOG4J_DIR="${LOG4J_CONFIG_ZIP_INSTALL}"
else # Fallback to normal default
LOG4J_DIR="$base_dir/config/tools-log4j.properties"
fi
# If Cygwin is detected, LOG4J_DIR is converted to Windows format.
(( CYGWIN )) && LOG4J_DIR=$(cygpath --path --mixed "${LOG4J_DIR}")
KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${LOG4J_DIR}"
else
# create logs directory
if [ ! -d "$LOG_DIR" ]; then
mkdir -p "$LOG_DIR"
fi
fi
# If Cygwin is detected, LOG_DIR is converted to Windows format.
(( CYGWIN )) && LOG_DIR=$(cygpath --path --mixed "${LOG_DIR}")
KAFKA_LOG4J_OPTS="-Dkafka.logs.dir=$LOG_DIR $KAFKA_LOG4J_OPTS"
# Generic jvm settings you want to add
if [ -z "$KAFKA_OPTS" ]; then
KAFKA_OPTS=""
fi
# Set Debug options if enabled
if [ "x$KAFKA_DEBUG" != "x" ]; then
# Use default ports
DEFAULT_JAVA_DEBUG_PORT="5005"
if [ -z "$JAVA_DEBUG_PORT" ]; then
JAVA_DEBUG_PORT="$DEFAULT_JAVA_DEBUG_PORT"
fi
# Use the defaults if JAVA_DEBUG_OPTS was not set
DEFAULT_JAVA_DEBUG_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=${DEBUG_SUSPEND_FLAG:-n},address=$JAVA_DEBUG_PORT"
if [ -z "$JAVA_DEBUG_OPTS" ]; then
JAVA_DEBUG_OPTS="$DEFAULT_JAVA_DEBUG_OPTS"
fi
echo "Enabling Java debug options: $JAVA_DEBUG_OPTS"
KAFKA_OPTS="$JAVA_DEBUG_OPTS $KAFKA_OPTS"
fi
# Which java to use
if [ -z "$JAVA_HOME" ]; then
JAVA="java"
else
JAVA="$JAVA_HOME/bin/java"
fi
# Memory options
if [ -z "$KAFKA_HEAP_OPTS" ]; then
KAFKA_HEAP_OPTS="-Xmx256M"
fi
# JVM performance options
# MaxInlineLevel=15 is the default since JDK 14 and can be removed once older JDKs are no longer supported
if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then
KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true"
fi
while [ $# -gt 0 ]; do
COMMAND=$1
case $COMMAND in
-name)
DAEMON_NAME=$2
CONSOLE_OUTPUT_FILE=$LOG_DIR/$DAEMON_NAME.out
shift 2
;;
-loggc)
if [ -z "$KAFKA_GC_LOG_OPTS" ]; then
GC_LOG_ENABLED="true"
fi
shift
;;
-daemon)
DAEMON_MODE="true"
shift
;;
*)
break
;;
esac
done
# GC options
GC_FILE_SUFFIX='-gc.log'
GC_LOG_FILE_NAME=''
if [ "x$GC_LOG_ENABLED" = "xtrue" ]; then
GC_LOG_FILE_NAME=$DAEMON_NAME$GC_FILE_SUFFIX
# The first segment of the version number, which is '1' for releases before Java 9
# it then becomes '9', '10', ...
# Some examples of the first line of `java --version`:
# 8 -> java version "1.8.0_152"
# 9.0.4 -> java version "9.0.4"
# 10 -> java version "10" 2018-03-20
# 10.0.1 -> java version "10.0.1" 2018-04-17
# We need to match to the end of the line to prevent sed from printing the characters that do not match
JAVA_MAJOR_VERSION=$("$JAVA" -version 2>&1 | sed -E -n 's/.* version "([0-9]*).*$/\1/p')
if [[ "$JAVA_MAJOR_VERSION" -ge "9" ]] ; then
KAFKA_GC_LOG_OPTS="-Xlog:gc*:file=$LOG_DIR/$GC_LOG_FILE_NAME:time,tags:filecount=10,filesize=100M"
else
KAFKA_GC_LOG_OPTS="-Xloggc:$LOG_DIR/$GC_LOG_FILE_NAME -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M"
fi
fi
# Remove a possible colon prefix from the classpath (happens at lines like `CLASSPATH="$CLASSPATH:$file"` when CLASSPATH is blank)
# Syntax used on the right side is native Bash string manipulation; for more details see
# http://tldp.org/LDP/abs/html/string-manipulation.html, specifically the section titled "Substring Removal"
CLASSPATH=${CLASSPATH#:}
# If Cygwin is detected, classpath is converted to Windows format.
(( CYGWIN )) && CLASSPATH=$(cygpath --path --mixed "${CLASSPATH}")
# Launch mode
if [ "x$DAEMON_MODE" = "xtrue" ]; then
nohup "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp "$CLASSPATH" $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
else
exec "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp "$CLASSPATH" $KAFKA_OPTS "$@"
fi
/opt/confluent-6.2.0/bin/schema-registry-start
#!/bin/bash
#
# Copyright 2018 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
print_synopsis() {
echo "USAGE: $0 [-daemon] schema-registry.properties"
}
EXTRA_ARGS=${EXTRA_ARGS-'-name schemaRegistry'}
# EXTRA_ARGS=${EXTRA_ARGS-'-name schemaRegistry -loggc'}
ARG=$1
case $ARG in
-daemon)
EXTRA_ARGS="-daemon "$EXTRA_ARGS
shift
;;
*)
;;
esac
if [[ $# -lt 1 ]]; then
print_synopsis
exit 1
fi
if [[ ! -e "$1" ]]; then
echo "Property file $1 does not exist"
print_synopsis
exit 1
fi
exec $(dirname $0)/schema-registry-run-class ${EXTRA_ARGS} io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain "$@"
/opt/confluent-6.2.0/bin/schema-registry-stop
#!/bin/bash
#
# Copyright 2018 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# When stopping, search for both the current SchemaRegistryMain class and the deprecated Main class.
exec $(dirname $0)/schema-registry-stop-service "(io.confluent.kafka.schemaregistry.rest.Main)|(io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain)"
/opt/confluent-6.2.0/bin/schema-registry-run-class
#!/bin/bash
#
# Copyright 2018 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
if [ $# -lt 1 ];
then
echo "USAGE: $0 [-daemon] [-name servicename] [-loggc] classname [opts]"
exit 1
fi
base_dir=$(dirname $0)/..
# CYGINW == 1 if Cygwin is detected, else 0.
if [[ $(uname -a) =~ "CYGWIN" ]]; then
CYGWIN=1
else
CYGWIN=0
fi
# Development jars. `mvn package` should collect all the required dependency jars here
for dir in $base_dir/package-schema-registry/target/kafka-schema-registry-package-*-development; do
CLASSPATH=$CLASSPATH:$dir/share/java/schema-registry/*
done
# Production jars, including kafka, rest-utils, and schema-registry
for library in "confluent-security/schema-registry" "confluent-common" "confluent-telemetry" "rest-utils" "schema-registry"; do
CLASSPATH=$CLASSPATH:$base_dir/share/java/$library/*
done
# Log directory to use
if [ "x$LOG_DIR" = "x" ]; then
LOG_DIR="$base_dir/logs"
fi
# create logs directory
if [ ! -d "$LOG_DIR" ]; then
mkdir -p "$LOG_DIR"
fi
# logj4 settings
if [ "x$SCHEMA_REGISTRY_LOG4J_OPTS" = "x" ]; then
# Test for files from dev -> packages so this will work as expected in dev if you have packages
# installed
if [ -e "$base_dir/config/log4j.properties" ]; then # Dev environment
LOG4J_DIR="$base_dir/config/log4j.properties"
elif [ -e "$base_dir/etc/schema-registry/log4j.properties" ]; then # Simple zip file layout
LOG4J_DIR="$base_dir/etc/schema-registry/log4j.properties"
elif [ -e "/etc/schema-registry/log4j.properties" ]; then # Normal install layout
LOG4J_DIR="/etc/schema-registry/log4j.properties"
fi
# If Cygwin is detected, LOG4J_DIR is converted to Windows format.
(( CYGWIN )) && LOG4J_DIR=$(cygpath --path --mixed "${LOG4J_DIR}")
SCHEMA_REGISTRY_LOG4J_OPTS="-Dlog4j.configuration=file:${LOG4J_DIR}"
fi
# If Cygwin is detected, LOG_DIR is converted to Windows format.
(( CYGWIN )) && LOG_DIR=$(cygpath --path --mixed "${LOG_DIR}")
SCHEMA_REGISTRY_LOG4J_OPTS="-Dschema-registry.log.dir=$LOG_DIR $SCHEMA_REGISTRY_LOG4J_OPTS"
# JMX settings
if [ -z "$SCHEMA_REGISTRY_JMX_OPTS" ]; then
SCHEMA_REGISTRY_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false "
fi
# JMX port to use
if [ $JMX_PORT ]; then
SCHEMA_REGISTRY_JMX_OPTS="$SCHEMA_REGISTRY_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT "
fi
# Generic jvm settings you want to add
if [ -z "$SCHEMA_REGISTRY_OPTS" ]; then
SCHEMA_REGISTRY_OPTS=""
fi
# Which java to use
if [ -z "$JAVA_HOME" ]; then
JAVA="java"
else
JAVA="$JAVA_HOME/bin/java"
fi
# Memory options
if [ -z "$SCHEMA_REGISTRY_HEAP_OPTS" ]; then
SCHEMA_REGISTRY_HEAP_OPTS="-Xmx512M"
fi
# JVM performance options
if [ -z "$SCHEMA_REGISTRY_JVM_PERFORMANCE_OPTS" ]; then
SCHEMA_REGISTRY_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true"
fi
while [ $# -gt 0 ]; do
COMMAND=$1
case $COMMAND in
-help)
HELP="true"
break
;;
-name)
DAEMON_NAME=$2
CONSOLE_OUTPUT_FILE=$LOG_DIR/$DAEMON_NAME.out
shift 2
;;
-loggc)
if [ -z "$SCHEMA_REGISTRY_GC_LOG_OPTS" ]; then
GC_LOG_ENABLED="true"
fi
shift
;;
-daemon)
DAEMON_MODE="true"
shift
;;
*)
break
;;
esac
done
if [ "x$$HELP" = "xtrue" ]; then
echo "USAGE: $0 [-daemon] [-name servicename] [-loggc] classname [opts]"
exit 0
fi
MAIN=$1
shift
# GC options
GC_FILE_SUFFIX='-gc.log'
GC_LOG_FILE_NAME=''
if [ "x$GC_LOG_ENABLED" = "xtrue" ]; then
GC_LOG_FILE_NAME=$DAEMON_NAME$GC_FILE_SUFFIX
# The first segment of the version number, which is '1' for releases before Java 9
# it then becomes '9', '10', ...
# Some examples of the first line of `java --version`:
# 8 -> java version "1.8.0_152"
# 9.0.4 -> java version "9.0.4"
# 10 -> java version "10" 2018-03-20
# 10.0.1 -> java version "10.0.1" 2018-04-17
# We need to match to the end of the line to prevent sed from printing the characters that do not match
JAVA_MAJOR_VERSION=$($JAVA -version 2>&1 | sed -E -n 's/.* version "([0-9]*).*$/\1/p')
if [[ "$JAVA_MAJOR_VERSION" -ge "9" ]] ; then
SCHEMA_REGISTRY_GC_LOG_OPTS="-Xlog:gc*:file=$LOG_DIR/$GC_LOG_FILE_NAME:time,tags:filecount=10,filesize=102400"
else
SCHEMA_REGISTRY_GC_LOG_OPTS="-Xloggc:$LOG_DIR/$GC_LOG_FILE_NAME -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M"
fi
fi
# If Cygwin is detected, classpath is converted to Windows format.
(( CYGWIN )) && CLASSPATH=$(cygpath --path --mixed "${CLASSPATH}")
# Launch mode
if [ "x$DAEMON_MODE" = "xtrue" ]; then
CONSOLE_OUTPUT_FILE=${CONSOLE_OUTPUT_FILE:-${LOG_DIR}/schema-registry-console.out}
nohup $JAVA $SCHEMA_REGISTRY_HEAP_OPTS $SCHEMA_REGISTRY_JVM_PERFORMANCE_OPTS $SCHEMA_REGISTRY_GC_LOG_OPTS $SCHEMA_REGISTRY_JMX_OPTS $SCHEMA_REGISTRY_LOG4J_OPTS -cp $CLASSPATH $SCHEMA_REGISTRY_OPTS "$MAIN" "$@" > "${CONSOLE_OUTPUT_FILE}" 2>&1 < /dev/null &
else
exec "$JAVA" $SCHEMA_REGISTRY_HEAP_OPTS $SCHEMA_REGISTRY_JVM_PERFORMANCE_OPTS $SCHEMA_REGISTRY_GC_LOG_OPTS $SCHEMA_REGISTRY_JMX_OPTS $SCHEMA_REGISTRY_LOG4J_OPTS -cp $CLASSPATH $SCHEMA_REGISTRY_OPTS "$MAIN" "$@"
fi