1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Confluent Platform メモ - (1)環境構築

Last updated at Posted at 2021-08-17

はじめに

Apache Kafkaの商用版であるConfluent Platformについてのメモ書きです。初物なので調べた内容の整理なども含めてログを残しておきます。
商用版といってもConfluent Community Licenseというのがあり、一部の機能は無償で利用できるので無償の範囲で試しています。

関連記事

Confluent Platform メモ - (1)環境構築
Confluent Platform メモ - (2)メッセージ送受信簡易テスト
Confluent Platform メモ - (3)Schema Registry簡易テスト

参考情報

Apache Kafkaの概要とアーキテクチャ
Confluent Platformってどんなの?
Confluent Platform(Community版)をインストールしてKafka Connectを試してみる
Apache KafkaのProducer/Broker/Consumerのしくみと設定一覧
Manual Install using ZIP and TAR Archives

Confluent Platform Community Licenseについて

Confluent Platformにはいくつかコンポーネントが含まれており、コンポーネントごとにライセンスが異なるようです。
Confluent Community License FAQ
以下の表は上のサイトからの抜粋です。
image.png

Confluent Platformのコアのコンポーネントである Kafka, ZooKeeper辺りはAPACHE 2.0 LICENCEとして利用可能のようです。Schema RgistryやREST Proxyは COMMUNITY LICENSEの範囲で利用できそうです。
Control Centerやクラスター関連の機能を使う場合にはENTERPRISE LICENSEが必要(有償)ということのようです。

参考:
Confluent Platform Licenses
Confluent Community License Version 1.0

環境情報

RHEL V8.2
Confluent Community V6.2.0

Windows10上のVirtualBoxに仮想OSとしてRHEL V8.2を立てているのでそこにインストールします。
Kafkaは複数のノードに分散されたクラスター構成を行うことができますが、ここでは簡易的なテスト環境を作る目的なので、1ノード上に1Brokerのみを構成します。

前提条件はこちら
Confluent System Requirements

javaは既にインストール済み

[root@test12 ~]# java -version
openjdk version "1.8.0_242"
OpenJDK Runtime Environment (build 1.8.0_242-b08)
OpenJDK 64-Bit Server VM (build 25.242-b08, mixed mode)

Confluent Platform Community Componentのインストール

オフラインでのインストール方法があるのでこちらの手順に従ってやってみます。
Manual Install using ZIP and TAR Archives

上のガイドにある confluent-community-6.2.0.tar.gz (約350MB) をダウンロードしてターゲットマシンに持っていき展開します。ここでは、/opt/以下に展開することにします。

[root@test12 /Local_Inst_Image/Confluent]# ls -la
合計 348552
drwxrwx---. 1 root vboxsf         0  8月 13 18:00 .
drwxrwx---. 1 root vboxsf     16384  8月 13 17:59 ..
-rwxrwx---. 1 root vboxsf 356898902  8月 13 18:00 confluent-community-6.2.0.tar.gz

[root@test12 /Local_Inst_Image/Confluent]# tar xzf confluent-community-6.2.0.tar.gz -C /opt

/opt/confluent-6.2.0以下にファイルが展開されました。
owner,group変更します。

[root@test12 /opt]# chown -R root:root confluent-6.2.0

[root@test12 /opt]# ls -la confluent-6.2.0/
合計 8
drwxr-xr-x. 7 root root   77  6月  6 08:51 .
drwxr-xr-x. 7 root root   95  8月 13 18:05 ..
-rw-r--r--. 1 root root  871  6月  6 08:51 README
drwxr-xr-x. 3 root root 4096  6月  6 07:11 bin
drwxr-xr-x. 8 root root  116  6月  6 07:11 etc
drwxr-xr-x. 3 root root   21  6月  6 07:11 lib
drwxr-xr-x. 6 root root   71  6月  6 07:11 share
drwxr-xr-x. 2 root root  178  6月  6 08:51 src

tar展開するだけでOKみたいです。楽ちん。

Confluentコンポーネントの管理

ここでは、ZooKeeper, Kafka(Broker), Schem Registry の3つのコンポーネントを扱います。
とりあえず全てデフォルトの構成で起動させてみます。

(Control Centerというコンポーネントを使うとGUIの管理用インターフェースが提供されるようで、それを使うと分かりやすそうなのですが、残念ながらCommunity Licenseでは利用できないようです...)

ZooKeeper

構成

Configure Confluent Platform - ZooKeeper
デフォルトで提供されているプロパティファイルを確認。

/opt/confluent-6.2.0/etc/kafka/zookeeper.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080

この辺にパラメーターのReferenceらしきものはあり...
Running ZooKeeper in Production - Configuration Options
Complete Listはこっち見ろということでApacheのサイトに飛ばされたが...
Apache - ZooKeeper Administrator's Guide
admin.enableServerとか見つからない。上のリンクはV3.4.10のドキュメントのようだがバージョンが古いっぽい。
新し目のバージョンのドキュメントにはadmin.enableServerの記載はあった。
Apache - ZooKeeper Administrator's Guide (3.5.9)
(後でZooKeeperの起動時ログを確認したところV3.5.9だった)

起動/停止

Start Confluent Platform

  • 起動コマンド: bin/zookeeper-server-start -daemon <property_file>
  • 停止コマンド: bin/zookeeper-server-stop
  • ログ: logs/zookeeper.out
  • Listenポート: 2181

※マニュアルにコマンドリファレンスみたいなものが見当たらない...。上のファイル覗いたらシェル・スクリプトだったのでそこから判断するに -daemonオプションをつければバックグラウンド起動してくれるようです。(末尾のシェル・スクリプト一覧参照)

提供されるプロパティファイルをそのまま使って以下のコマンドで起動してみます。
/opt/confluent-6.2.0/bin/zookeeper-server-start -daemon /opt/confluent-6.2.0/etc/kafka/zookeeper.properties

logs/zookeeper.outにメッセージが出力されます。
(server.logにも同じ内容のメッセージが出力されますが、server.logは後述のKafkaのメッセージも合わせて出力されるようです。)

起動時ログ: logs/zookeeper.out
[2021-08-14 09:24:44,670] INFO Reading configuration from: /opt/confluent-6.2.0/etc/kafka/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2021-08-14 09:24:44,695] INFO clientPortAddress is 0.0.0.0:2181 (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2021-08-14 09:24:44,695] INFO secureClientPort is not set (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2021-08-14 09:24:44,697] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2021-08-14 09:24:44,697] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2021-08-14 09:24:44,697] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
[2021-08-14 09:24:44,697] WARN Either no config or no quorum defined in config, running  in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2021-08-14 09:24:44,700] INFO Log4j 1.2 jmx support found and enabled. (org.apache.zookeeper.jmx.ManagedUtil)
[2021-08-14 09:24:44,714] INFO Reading configuration from: /opt/confluent-6.2.0/etc/kafka/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2021-08-14 09:24:44,714] INFO clientPortAddress is 0.0.0.0:2181 (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2021-08-14 09:24:44,714] INFO secureClientPort is not set (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2021-08-14 09:24:44,715] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
[2021-08-14 09:24:44,717] INFO zookeeper.snapshot.trust.empty : false (org.apache.zookeeper.server.persistence.FileTxnSnapLog)
[2021-08-14 09:24:44,734] INFO Server environment:zookeeper.version=3.5.9-83df9301aa5c2a5d284a9940177808c01bc35cef, built on 01/06/2021 20:03 GMT (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,734] INFO Server environment:host.name=localhost (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,734] INFO Server environment:java.version=1.8.0_242 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,734] INFO Server environment:java.vendor=Oracle Corporation (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,734] INFO Server environment:java.home=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.242.b08-4.el8.x86_64/jre (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,735] INFO Server environment:java.class.path=/opt/confluent-6.2.0/bin/../share/java/kafka/netty-transport-4.1.62.Final.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/netty-buffer-4.1.62.Final.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/connect-mirror-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-streams-scala_2.13-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-util-ajax-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jakarta.inject-2.6.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/audience-annotations-0.5.0.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/scala-java8-compat_2.13-0.9.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-metadata-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jakarta.activation-api-1.2.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/slf4j-api-1.7.30.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/scala-logging_2.13-3.9.2.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/zookeeper-jute-3.5.9.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/netty-codec-4.1.62.Final.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/connect-json-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-security-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/scala-reflect-2.13.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/scala-library-2.13.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-jaxrs-base-2.10.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka_2.13-6.2.0-ccs-sources.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/netty-handler-4.1.62.Final.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jakarta.ws.rs-api-2.1.6.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/zstd-jni-1.4.9-1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-util-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-server-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/zookeeper-3.5.9.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/javassist-3.27.0-GA.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-shell-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-client-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/commons-cli-1.4.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-core-2.10.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-servlet-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/connect-runtime-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/scala-collection-compat_2.13-2.3.0.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-jaxrs-json-provider-2.10.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-streams-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jakarta.validation-api-2.0.2.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-module-scala_2.13-2.10.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jakarta.annotation-api-1.3.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jersey-server-2.34.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/hk2-utils-2.6.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/lz4-java-1.7.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/maven-artifact-3.8.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/connect-transforms-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/metrics-core-2.2.0.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-annotations-2.10.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka_2.13-6.2.0-ccs-test-sources.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/connect-mirror-client-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jakarta.xml.bind-api-2.3.2.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/reflections-0.9.12.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/argparse4j-0.7.0.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/netty-transport-native-epoll-4.1.62.Final.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/activation-1.1.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-raft-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-tools-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-http-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka_2.13-6.2.0-ccs-javadoc.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/connect-api-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/connect-file-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jersey-container-servlet-2.34.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jersey-container-servlet-core-2.34.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/netty-resolver-4.1.62.Final.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jaxb-api-2.3.0.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-streams-test-utils-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-log4j-appender-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jersey-hk2-2.34.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/javax.servlet-api-3.1.0.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-datatype-jdk8-2.10.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/slf4j-log4j12-1.7.30.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/commons-lang3-3.8.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/plexus-utils-3.2.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/javax.ws.rs-api-2.1.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jersey-common-2.34.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka_2.13-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-module-paranamer-2.10.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/netty-common-4.1.62.Final.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/netty-transport-native-unix-common-4.1.62.Final.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka_2.13-6.2.0-ccs-test.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-streams-examples-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-io-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/osgi-resource-locator-1.0.3.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/rocksdbjni-5.18.4.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/aopalliance-repackaged-2.6.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-dataformat-csv-2.10.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/snappy-java-1.1.8.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-databind-2.10.5.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-continuation-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/hk2-locator-2.6.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/confluent-log4j-1.2.17-cp2.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/hk2-api-2.6.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jopt-simple-5.0.4.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/paranamer-2.8.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-module-jaxb-annotations-2.10.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-clients-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-servlets-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/connect-basic-auth-extension-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jline-3.12.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jersey-client-2.34.jar:/opt/confluent-6.2.0/bin/../share/java/confluent-telemetry/* (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,737] INFO Server environment:java.library.path=/opt/ibm/cics/lib:/opt/ibm/cicssm/lib:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,737] INFO Server environment:java.io.tmpdir=/tmp (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,737] INFO Server environment:java.compiler=<NA> (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,737] INFO Server environment:os.name=Linux (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,738] INFO Server environment:os.arch=amd64 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,738] INFO Server environment:os.version=4.18.0-193.el8.x86_64 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,738] INFO Server environment:user.name=root (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,738] INFO Server environment:user.home=/root (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,738] INFO Server environment:user.dir=/opt/confluent-6.2.0/bin (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,738] INFO Server environment:os.memory.free=497MB (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,738] INFO Server environment:os.memory.max=512MB (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,738] INFO Server environment:os.memory.total=512MB (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,740] INFO minSessionTimeout set to 6000 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,740] INFO maxSessionTimeout set to 60000 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,740] INFO Created server with tickTime 3000 minSessionTimeout 6000 maxSessionTimeout 60000 datadir /tmp/zookeeper/version-2 snapdir /tmp/zookeeper/version-2 (org.apache.zookeeper.server.ZooKeeperServer)
[2021-08-14 09:24:44,749] INFO Using org.apache.zookeeper.server.NIOServerCnxnFactory as server connection factory (org.apache.zookeeper.server.ServerCnxnFactory)
[2021-08-14 09:24:44,753] INFO Configuring NIO connection handler with 10s sessionless connection timeout, 1 selector thread(s), 8 worker threads, and 64 kB direct buffers. (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2021-08-14 09:24:44,757] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2021-08-14 09:24:44,800] INFO zookeeper.snapshotSizeFactor = 0.33 (org.apache.zookeeper.server.ZKDatabase)
[2021-08-14 09:24:44,803] INFO Reading snapshot /tmp/zookeeper/version-2/snapshot.0 (org.apache.zookeeper.server.persistence.FileSnap)
[2021-08-14 09:24:44,810] INFO Snapshotting: 0x0 to /tmp/zookeeper/version-2/snapshot.0 (org.apache.zookeeper.server.persistence.FileTxnSnapLog)
[2021-08-14 09:24:44,822] INFO PrepRequestProcessor (sid:0) started, reconfigEnabled=false (org.apache.zookeeper.server.PrepRequestProcessor)
[2021-08-14 09:24:44,828] INFO Using checkIntervalMs=60000 maxPerMinute=10000 (org.apache.zookeeper.server.ContainerManager)

起動したっぽい。ポート2181がListenされた状態になりました。
起動時のログ見ると以下のメッセージが出てたので、ZooKeeperのバージョンとしては3.5.9らしい。

Server environment:zookeeper.version=3.5.9-83df9301aa5c2a5d284a9940177808c01bc35cef, built on 01/06/2021 20:03 GMT (org.apache.zookeeper.server.ZooKeeperServer)

Kafka(Broker)

構成

Configure Confluent Platform - Kafka

製品提供のプロパティファイルはこちら。

/opt/confluent-6.2.0/etc/kafka/server.properties
/opt/confluent-6.2.0/etc/kafka/server.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000

##################### Confluent Metrics Reporter #######################
# Confluent Control Center and Confluent Auto Data Balancer integration
#
# Uncomment the following lines to publish monitoring data for
# Confluent Control Center and Confluent Auto Data Balancer
# If you are using a dedicated metrics cluster, also adjust the settings
# to point to your metrics kakfa cluster.
#metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter
#confluent.metrics.reporter.bootstrap.servers=localhost:9092
#
# Uncomment the following line if the metrics cluster has a single broker
#confluent.metrics.reporter.topic.replicas=1

############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

コメントが多くて長いのでパラメータ部分だけ抽出するとこんな感じ。

[root@test12 /opt/confluent-6.2.0/etc/kafka]# cat server.properties  | grep -v -e "^#" | grep -v -e "^\s*$"
broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

パラメーターのリファレンスはこちら
Kafka Broker Configurations

※Listenするポート関連の設定がされていないが、リファレンスを見るとlisnersが指定されていなければportが使われるっぽく、portのデフォルトが9092になってました。

※topicに保持されるデータは実体としてはOS上のファイルとして保持されます。その書き出し先はlog.dirsパラメーターで指定され、上の通りデフォルトでは/tmp/kafka-logsになっています。/tmp/ディレクトリ以下は通常は自動クリーンアップの対象になっていることが多いので、自動削除されないようにするにはこのパラメーターの設定を変更して出力先ディレクトリを変更するか、/tmpクリーンアップの設定を変更する必要があります。

参考:
Stack Overflow - Which directory does apache kafka store the data in broker nodes
Kafka Broker Configurations - log.dirs
【CentOS】/tmp配下のファイルが消える理由

起動/停止

Start Confluent Platform

  • 起動コマンド: bin/kafka-server-start -daemon <property_file>
  • 停止コマンド: bin/kafka-server-stop
  • ログ: logs/kafkaServer.out
  • Listenポート: 9092

以下のコマンドで起動
/opt/confluent-6.2.0/bin/kafka-server-start -daemon /opt/confluent-6.2.0/etc/kafka/server.properties

起動時ログ: logs/kafkaServer.out
[2021-08-14 11:17:16,090] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2021-08-14 11:17:16,584] INFO Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation (org.apache.zookeeper.common.X509Util)
[2021-08-14 11:17:16,712] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
[2021-08-14 11:17:16,716] INFO starting (kafka.server.KafkaServer)
[2021-08-14 11:17:16,717] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2021-08-14 11:17:16,747] INFO [ZooKeeperClient Kafka server] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
[2021-08-14 11:17:16,752] INFO Client environment:zookeeper.version=3.5.9-83df9301aa5c2a5d284a9940177808c01bc35cef, built on 01/06/2021 20:03 GMT (org.apache.zookeeper.ZooKeeper)
[2021-08-14 11:17:16,752] INFO Client environment:host.name=localhost (org.apache.zookeeper.ZooKeeper)
[2021-08-14 11:17:16,752] INFO Client environment:java.version=1.8.0_242 (org.apache.zookeeper.ZooKeeper)
[2021-08-14 11:17:16,752] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
[2021-08-14 11:17:16,752] INFO Client environment:java.home=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.242.b08-4.el8.x86_64/jre (org.apache.zookeeper.ZooKeeper)
[2021-08-14 11:17:16,761] INFO Client environment:java.class.path=/opt/confluent-6.2.0/bin/../share/java/kafka/netty-transport-4.1.62.Final.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/netty-buffer-4.1.62.Final.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/connect-mirror-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-streams-scala_2.13-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-util-ajax-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jakarta.inject-2.6.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/audience-annotations-0.5.0.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/scala-java8-compat_2.13-0.9.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-metadata-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jakarta.activation-api-1.2.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/slf4j-api-1.7.30.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/scala-logging_2.13-3.9.2.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/zookeeper-jute-3.5.9.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/netty-codec-4.1.62.Final.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/connect-json-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-security-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/scala-reflect-2.13.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/scala-library-2.13.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-jaxrs-base-2.10.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka_2.13-6.2.0-ccs-sources.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/netty-handler-4.1.62.Final.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jakarta.ws.rs-api-2.1.6.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/zstd-jni-1.4.9-1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-util-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-server-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/zookeeper-3.5.9.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/javassist-3.27.0-GA.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-shell-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-client-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/commons-cli-1.4.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-core-2.10.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-servlet-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/connect-runtime-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/scala-collection-compat_2.13-2.3.0.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-jaxrs-json-provider-2.10.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-streams-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jakarta.validation-api-2.0.2.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-module-scala_2.13-2.10.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jakarta.annotation-api-1.3.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jersey-server-2.34.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/hk2-utils-2.6.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/lz4-java-1.7.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/maven-artifact-3.8.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/connect-transforms-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/metrics-core-2.2.0.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-annotations-2.10.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka_2.13-6.2.0-ccs-test-sources.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/connect-mirror-client-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jakarta.xml.bind-api-2.3.2.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/reflections-0.9.12.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/argparse4j-0.7.0.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/netty-transport-native-epoll-4.1.62.Final.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/activation-1.1.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-raft-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-tools-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-http-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka_2.13-6.2.0-ccs-javadoc.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/connect-api-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/connect-file-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jersey-container-servlet-2.34.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jersey-container-servlet-core-2.34.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/netty-resolver-4.1.62.Final.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jaxb-api-2.3.0.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-streams-test-utils-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-log4j-appender-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jersey-hk2-2.34.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/javax.servlet-api-3.1.0.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-datatype-jdk8-2.10.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/slf4j-log4j12-1.7.30.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/commons-lang3-3.8.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/plexus-utils-3.2.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/javax.ws.rs-api-2.1.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jersey-common-2.34.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka_2.13-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-module-paranamer-2.10.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/netty-common-4.1.62.Final.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/netty-transport-native-unix-common-4.1.62.Final.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka_2.13-6.2.0-ccs-test.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-streams-examples-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-io-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/osgi-resource-locator-1.0.3.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/rocksdbjni-5.18.4.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/aopalliance-repackaged-2.6.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-dataformat-csv-2.10.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/snappy-java-1.1.8.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-databind-2.10.5.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-continuation-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/hk2-locator-2.6.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/confluent-log4j-1.2.17-cp2.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/hk2-api-2.6.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jopt-simple-5.0.4.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/paranamer-2.8.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jackson-module-jaxb-annotations-2.10.5.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/kafka-clients-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jetty-servlets-9.4.40.v20210413.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/connect-basic-auth-extension-6.2.0-ccs.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jline-3.12.1.jar:/opt/confluent-6.2.0/bin/../share/java/kafka/jersey-client-2.34.jar:/opt/confluent-6.2.0/bin/../share/java/confluent-telemetry/* (org.apache.zookeeper.ZooKeeper)
[2021-08-14 11:17:16,763] INFO Client environment:java.library.path=/opt/ibm/cics/lib:/opt/ibm/cicssm/lib:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)
[2021-08-14 11:17:16,763] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
[2021-08-14 11:17:16,763] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
[2021-08-14 11:17:16,763] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
[2021-08-14 11:17:16,763] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
[2021-08-14 11:17:16,763] INFO Client environment:os.version=4.18.0-193.el8.x86_64 (org.apache.zookeeper.ZooKeeper)
[2021-08-14 11:17:16,764] INFO Client environment:user.name=root (org.apache.zookeeper.ZooKeeper)
[2021-08-14 11:17:16,764] INFO Client environment:user.home=/root (org.apache.zookeeper.ZooKeeper)
[2021-08-14 11:17:16,764] INFO Client environment:user.dir=/opt/confluent-6.2.0/bin (org.apache.zookeeper.ZooKeeper)
[2021-08-14 11:17:16,764] INFO Client environment:os.memory.free=980MB (org.apache.zookeeper.ZooKeeper)
[2021-08-14 11:17:16,764] INFO Client environment:os.memory.max=1024MB (org.apache.zookeeper.ZooKeeper)
[2021-08-14 11:17:16,764] INFO Client environment:os.memory.total=1024MB (org.apache.zookeeper.ZooKeeper)
[2021-08-14 11:17:16,766] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=18000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@5b8dfcc1 (org.apache.zookeeper.ZooKeeper)
[2021-08-14 11:17:16,770] INFO jute.maxbuffer value is 4194304 Bytes (org.apache.zookeeper.ClientCnxnSocket)
[2021-08-14 11:17:16,785] INFO zookeeper.request.timeout value is 0. feature enabled= (org.apache.zookeeper.ClientCnxn)
[2021-08-14 11:17:16,796] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
[2021-08-14 11:17:16,798] INFO Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2021-08-14 11:17:16,802] INFO Socket connection established, initiating session, client: /0:0:0:0:0:0:0:1:37008, server: localhost/0:0:0:0:0:0:0:1:2181 (org.apache.zookeeper.ClientCnxn)
[2021-08-14 11:17:16,831] INFO Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x1000070216a0000, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn)
[2021-08-14 11:17:16,834] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)
[2021-08-14 11:17:17,053] INFO [feature-zk-node-event-process-thread]: Starting (kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread)
[2021-08-14 11:17:17,065] INFO Feature ZK node at path: /feature does not exist (kafka.server.FinalizedFeatureChangeListener)
[2021-08-14 11:17:17,065] INFO Cleared cache (kafka.server.FinalizedFeatureCache)
[2021-08-14 11:17:17,311] INFO Cluster ID = Mp9ekWqlR2-jIyzo3tDKMg (kafka.server.KafkaServer)
[2021-08-14 11:17:17,314] WARN No meta.properties file under dir /tmp/kafka-logs/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2021-08-14 11:17:17,381] INFO KafkaConfig values:
        advertised.host.name = null
        advertised.listeners = null
        advertised.port = null
        alter.config.policy.class.name = null
        alter.log.dirs.replication.quota.window.num = 11
        alter.log.dirs.replication.quota.window.size.seconds = 1
        authorizer.class.name =
        auto.create.topics.enable = true
        auto.leader.rebalance.enable = true
        background.threads = 10
        broker.heartbeat.interval.ms = 2000
        broker.id = 0
        broker.id.generation.enable = true
        broker.rack = null
        broker.session.timeout.ms = 9000
        client.quota.callback.class = null
        compression.type = producer
        connection.failed.authentication.delay.ms = 100
        connections.max.idle.ms = 600000
        connections.max.reauth.ms = 0
        control.plane.listener.name = null
        controlled.shutdown.enable = true
        controlled.shutdown.max.retries = 3
        controlled.shutdown.retry.backoff.ms = 5000
        controller.listener.names = null
        controller.quorum.append.linger.ms = 25
        controller.quorum.election.backoff.max.ms = 1000
        controller.quorum.election.timeout.ms = 1000
        controller.quorum.fetch.timeout.ms = 2000
        controller.quorum.request.timeout.ms = 2000
        controller.quorum.retry.backoff.ms = 20
        controller.quorum.voters = []
        controller.quota.window.num = 11
        controller.quota.window.size.seconds = 1
        controller.socket.timeout.ms = 30000
        create.topic.policy.class.name = null
        default.replication.factor = 1
        delegation.token.expiry.check.interval.ms = 3600000
        delegation.token.expiry.time.ms = 86400000
        delegation.token.master.key = null
        delegation.token.max.lifetime.ms = 604800000
        delegation.token.secret.key = null
        delete.records.purgatory.purge.interval.requests = 1
        delete.topic.enable = true
        fetch.max.bytes = 57671680
        fetch.purgatory.purge.interval.requests = 1000
        group.initial.rebalance.delay.ms = 0
        group.max.session.timeout.ms = 1800000
        group.max.size = 2147483647
        group.min.session.timeout.ms = 6000
        host.name =
        initial.broker.registration.timeout.ms = 60000
        inter.broker.listener.name = null
        inter.broker.protocol.version = 2.8-IV1
        kafka.metrics.polling.interval.secs = 10
        kafka.metrics.reporters = []
        leader.imbalance.check.interval.seconds = 300
        leader.imbalance.per.broker.percentage = 10
        listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
        listeners = null
        log.cleaner.backoff.ms = 15000
        log.cleaner.dedupe.buffer.size = 134217728
        log.cleaner.delete.retention.ms = 86400000
        log.cleaner.enable = true
        log.cleaner.io.buffer.load.factor = 0.9
        log.cleaner.io.buffer.size = 524288
        log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
        log.cleaner.max.compaction.lag.ms = 9223372036854775807
        log.cleaner.min.cleanable.ratio = 0.5
        log.cleaner.min.compaction.lag.ms = 0
        log.cleaner.threads = 1
        log.cleanup.policy = [delete]
        log.dir = /tmp/kafka-logs
        log.dirs = /tmp/kafka-logs
        log.flush.interval.messages = 9223372036854775807
        log.flush.interval.ms = null
        log.flush.offset.checkpoint.interval.ms = 60000
        log.flush.scheduler.interval.ms = 9223372036854775807
        log.flush.start.offset.checkpoint.interval.ms = 60000
        log.index.interval.bytes = 4096
        log.index.size.max.bytes = 10485760
        log.message.downconversion.enable = true
        log.message.format.version = 2.8-IV1
        log.message.timestamp.difference.max.ms = 9223372036854775807
        log.message.timestamp.type = CreateTime
        log.preallocate = false
        log.retention.bytes = -1
        log.retention.check.interval.ms = 300000
        log.retention.hours = 168
        log.retention.minutes = null
        log.retention.ms = null
        log.roll.hours = 168
        log.roll.jitter.hours = 0
        log.roll.jitter.ms = null
        log.roll.ms = null
        log.segment.bytes = 1073741824
        log.segment.delete.delay.ms = 60000
        max.connection.creation.rate = 2147483647
        max.connections = 2147483647
        max.connections.per.ip = 2147483647
        max.connections.per.ip.overrides =
        max.incremental.fetch.session.cache.slots = 1000
        message.max.bytes = 1048588
        metadata.log.dir = null
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        min.insync.replicas = 1
        node.id = -1
        num.io.threads = 8
        num.network.threads = 3
        num.partitions = 1
        num.recovery.threads.per.data.dir = 1
        num.replica.alter.log.dirs.threads = null
        num.replica.fetchers = 1
        offset.metadata.max.bytes = 4096
        offsets.commit.required.acks = -1
        offsets.commit.timeout.ms = 5000
        offsets.load.buffer.size = 5242880
        offsets.retention.check.interval.ms = 600000
        offsets.retention.minutes = 10080
        offsets.topic.compression.codec = 0
        offsets.topic.num.partitions = 50
        offsets.topic.replication.factor = 1
        offsets.topic.segment.bytes = 104857600
        password.encoder.cipher.algorithm = AES/CBC/PKCS5Padding
        password.encoder.iterations = 4096
        password.encoder.key.length = 128
        password.encoder.keyfactory.algorithm = null
        password.encoder.old.secret = null
        password.encoder.secret = null
        port = 9092
        principal.builder.class = null
        process.roles = []
        producer.purgatory.purge.interval.requests = 1000
        queued.max.request.bytes = -1
        queued.max.requests = 500
        quota.consumer.default = 9223372036854775807
        quota.producer.default = 9223372036854775807
        quota.window.num = 11
        quota.window.size.seconds = 1
        replica.fetch.backoff.ms = 1000
        replica.fetch.max.bytes = 1048576
        replica.fetch.min.bytes = 1
        replica.fetch.response.max.bytes = 10485760
        replica.fetch.wait.max.ms = 500
        replica.high.watermark.checkpoint.interval.ms = 5000
        replica.lag.time.max.ms = 30000
        replica.selector.class = null
        replica.socket.receive.buffer.bytes = 65536
        replica.socket.timeout.ms = 30000
        replication.quota.window.num = 11
        replication.quota.window.size.seconds = 1
        request.timeout.ms = 30000
        reserved.broker.max.id = 1000
        sasl.client.callback.handler.class = null
        sasl.enabled.mechanisms = [GSSAPI]
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.principal.to.local.rules = [DEFAULT]
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism.controller.protocol = GSSAPI
        sasl.mechanism.inter.broker.protocol = GSSAPI
        sasl.server.callback.handler.class = null
        security.inter.broker.protocol = PLAINTEXT
        security.providers = null
        socket.connection.setup.timeout.max.ms = 30000
        socket.connection.setup.timeout.ms = 10000
        socket.receive.buffer.bytes = 102400
        socket.request.max.bytes = 104857600
        socket.send.buffer.bytes = 102400
        ssl.cipher.suites = []
        ssl.client.auth = none
        ssl.enabled.protocols = [TLSv1.2]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.principal.mapping.rules = DEFAULT
        ssl.protocol = TLSv1.2
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.abort.timed.out.transaction.cleanup.interval.ms = 10000
        transaction.max.timeout.ms = 900000
        transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
        transaction.state.log.load.buffer.size = 5242880
        transaction.state.log.min.isr = 1
        transaction.state.log.num.partitions = 50
        transaction.state.log.replication.factor = 1
        transaction.state.log.segment.bytes = 104857600
        transactional.id.expiration.ms = 604800000
        unclean.leader.election.enable = false
        zookeeper.clientCnxnSocket = null
        zookeeper.connect = localhost:2181
        zookeeper.connection.timeout.ms = 18000
        zookeeper.max.in.flight.requests = 10
        zookeeper.session.timeout.ms = 18000
        zookeeper.set.acl = false
        zookeeper.ssl.cipher.suites = null
        zookeeper.ssl.client.enable = false
        zookeeper.ssl.crl.enable = false
        zookeeper.ssl.enabled.protocols = null
        zookeeper.ssl.endpoint.identification.algorithm = HTTPS
        zookeeper.ssl.keystore.location = null
        zookeeper.ssl.keystore.password = null
        zookeeper.ssl.keystore.type = null
        zookeeper.ssl.ocsp.enable = false
        zookeeper.ssl.protocol = TLSv1.2
        zookeeper.ssl.truststore.location = null
        zookeeper.ssl.truststore.password = null
        zookeeper.ssl.truststore.type = null
        zookeeper.sync.time.ms = 2000
 (kafka.server.KafkaConfig)
[2021-08-14 11:17:17,394] INFO KafkaConfig values:
        advertised.host.name = null
        advertised.listeners = null
        advertised.port = null
        alter.config.policy.class.name = null
        alter.log.dirs.replication.quota.window.num = 11
        alter.log.dirs.replication.quota.window.size.seconds = 1
        authorizer.class.name =
        auto.create.topics.enable = true
        auto.leader.rebalance.enable = true
        background.threads = 10
        broker.heartbeat.interval.ms = 2000
        broker.id = 0
        broker.id.generation.enable = true
        broker.rack = null
        broker.session.timeout.ms = 9000
        client.quota.callback.class = null
        compression.type = producer
        connection.failed.authentication.delay.ms = 100
        connections.max.idle.ms = 600000
        connections.max.reauth.ms = 0
        control.plane.listener.name = null
        controlled.shutdown.enable = true
        controlled.shutdown.max.retries = 3
        controlled.shutdown.retry.backoff.ms = 5000
        controller.listener.names = null
        controller.quorum.append.linger.ms = 25
        controller.quorum.election.backoff.max.ms = 1000
        controller.quorum.election.timeout.ms = 1000
        controller.quorum.fetch.timeout.ms = 2000
        controller.quorum.request.timeout.ms = 2000
        controller.quorum.retry.backoff.ms = 20
        controller.quorum.voters = []
        controller.quota.window.num = 11
        controller.quota.window.size.seconds = 1
        controller.socket.timeout.ms = 30000
        create.topic.policy.class.name = null
        default.replication.factor = 1
        delegation.token.expiry.check.interval.ms = 3600000
        delegation.token.expiry.time.ms = 86400000
        delegation.token.master.key = null
        delegation.token.max.lifetime.ms = 604800000
        delegation.token.secret.key = null
        delete.records.purgatory.purge.interval.requests = 1
        delete.topic.enable = true
        fetch.max.bytes = 57671680
        fetch.purgatory.purge.interval.requests = 1000
        group.initial.rebalance.delay.ms = 0
        group.max.session.timeout.ms = 1800000
        group.max.size = 2147483647
        group.min.session.timeout.ms = 6000
        host.name =
        initial.broker.registration.timeout.ms = 60000
        inter.broker.listener.name = null
        inter.broker.protocol.version = 2.8-IV1
        kafka.metrics.polling.interval.secs = 10
        kafka.metrics.reporters = []
        leader.imbalance.check.interval.seconds = 300
        leader.imbalance.per.broker.percentage = 10
        listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
        listeners = null
        log.cleaner.backoff.ms = 15000
        log.cleaner.dedupe.buffer.size = 134217728
        log.cleaner.delete.retention.ms = 86400000
        log.cleaner.enable = true
        log.cleaner.io.buffer.load.factor = 0.9
        log.cleaner.io.buffer.size = 524288
        log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
        log.cleaner.max.compaction.lag.ms = 9223372036854775807
        log.cleaner.min.cleanable.ratio = 0.5
        log.cleaner.min.compaction.lag.ms = 0
        log.cleaner.threads = 1
        log.cleanup.policy = [delete]
        log.dir = /tmp/kafka-logs
        log.dirs = /tmp/kafka-logs
        log.flush.interval.messages = 9223372036854775807
        log.flush.interval.ms = null
        log.flush.offset.checkpoint.interval.ms = 60000
        log.flush.scheduler.interval.ms = 9223372036854775807
        log.flush.start.offset.checkpoint.interval.ms = 60000
        log.index.interval.bytes = 4096
        log.index.size.max.bytes = 10485760
        log.message.downconversion.enable = true
        log.message.format.version = 2.8-IV1
        log.message.timestamp.difference.max.ms = 9223372036854775807
        log.message.timestamp.type = CreateTime
        log.preallocate = false
        log.retention.bytes = -1
        log.retention.check.interval.ms = 300000
        log.retention.hours = 168
        log.retention.minutes = null
        log.retention.ms = null
        log.roll.hours = 168
        log.roll.jitter.hours = 0
        log.roll.jitter.ms = null
        log.roll.ms = null
        log.segment.bytes = 1073741824
        log.segment.delete.delay.ms = 60000
        max.connection.creation.rate = 2147483647
        max.connections = 2147483647
        max.connections.per.ip = 2147483647
        max.connections.per.ip.overrides =
        max.incremental.fetch.session.cache.slots = 1000
        message.max.bytes = 1048588
        metadata.log.dir = null
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        min.insync.replicas = 1
        node.id = -1
        num.io.threads = 8
        num.network.threads = 3
        num.partitions = 1
        num.recovery.threads.per.data.dir = 1
        num.replica.alter.log.dirs.threads = null
        num.replica.fetchers = 1
        offset.metadata.max.bytes = 4096
        offsets.commit.required.acks = -1
        offsets.commit.timeout.ms = 5000
        offsets.load.buffer.size = 5242880
        offsets.retention.check.interval.ms = 600000
        offsets.retention.minutes = 10080
        offsets.topic.compression.codec = 0
        offsets.topic.num.partitions = 50
        offsets.topic.replication.factor = 1
        offsets.topic.segment.bytes = 104857600
        password.encoder.cipher.algorithm = AES/CBC/PKCS5Padding
        password.encoder.iterations = 4096
        password.encoder.key.length = 128
        password.encoder.keyfactory.algorithm = null
        password.encoder.old.secret = null
        password.encoder.secret = null
        port = 9092
        principal.builder.class = null
        process.roles = []
        producer.purgatory.purge.interval.requests = 1000
        queued.max.request.bytes = -1
        queued.max.requests = 500
        quota.consumer.default = 9223372036854775807
        quota.producer.default = 9223372036854775807
        quota.window.num = 11
        quota.window.size.seconds = 1
        replica.fetch.backoff.ms = 1000
        replica.fetch.max.bytes = 1048576
        replica.fetch.min.bytes = 1
        replica.fetch.response.max.bytes = 10485760
        replica.fetch.wait.max.ms = 500
        replica.high.watermark.checkpoint.interval.ms = 5000
        replica.lag.time.max.ms = 30000
        replica.selector.class = null
        replica.socket.receive.buffer.bytes = 65536
        replica.socket.timeout.ms = 30000
        replication.quota.window.num = 11
        replication.quota.window.size.seconds = 1
        request.timeout.ms = 30000
        reserved.broker.max.id = 1000
        sasl.client.callback.handler.class = null
        sasl.enabled.mechanisms = [GSSAPI]
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.principal.to.local.rules = [DEFAULT]
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism.controller.protocol = GSSAPI
        sasl.mechanism.inter.broker.protocol = GSSAPI
        sasl.server.callback.handler.class = null
        security.inter.broker.protocol = PLAINTEXT
        security.providers = null
        socket.connection.setup.timeout.max.ms = 30000
        socket.connection.setup.timeout.ms = 10000
        socket.receive.buffer.bytes = 102400
        socket.request.max.bytes = 104857600
        socket.send.buffer.bytes = 102400
        ssl.cipher.suites = []
        ssl.client.auth = none
        ssl.enabled.protocols = [TLSv1.2]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.principal.mapping.rules = DEFAULT
        ssl.protocol = TLSv1.2
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.abort.timed.out.transaction.cleanup.interval.ms = 10000
        transaction.max.timeout.ms = 900000
        transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
        transaction.state.log.load.buffer.size = 5242880
        transaction.state.log.min.isr = 1
        transaction.state.log.num.partitions = 50
        transaction.state.log.replication.factor = 1
        transaction.state.log.segment.bytes = 104857600
        transactional.id.expiration.ms = 604800000
        unclean.leader.election.enable = false
        zookeeper.clientCnxnSocket = null
        zookeeper.connect = localhost:2181
        zookeeper.connection.timeout.ms = 18000
        zookeeper.max.in.flight.requests = 10
        zookeeper.session.timeout.ms = 18000
        zookeeper.set.acl = false
        zookeeper.ssl.cipher.suites = null
        zookeeper.ssl.client.enable = false
        zookeeper.ssl.crl.enable = false
        zookeeper.ssl.enabled.protocols = null
        zookeeper.ssl.endpoint.identification.algorithm = HTTPS
        zookeeper.ssl.keystore.location = null
        zookeeper.ssl.keystore.password = null
        zookeeper.ssl.keystore.type = null
        zookeeper.ssl.ocsp.enable = false
        zookeeper.ssl.protocol = TLSv1.2
        zookeeper.ssl.truststore.location = null
        zookeeper.ssl.truststore.password = null
        zookeeper.ssl.truststore.type = null
        zookeeper.sync.time.ms = 2000
 (kafka.server.KafkaConfig)
[2021-08-14 11:17:17,454] INFO [ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2021-08-14 11:17:17,455] INFO [ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2021-08-14 11:17:17,456] INFO [ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2021-08-14 11:17:17,458] INFO [ThrottledChannelReaper-ControllerMutation]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2021-08-14 11:17:17,484] INFO Log directory /tmp/kafka-logs not found, creating it. (kafka.log.LogManager)
[2021-08-14 11:17:17,500] INFO Loading logs from log dirs ArraySeq(/tmp/kafka-logs) (kafka.log.LogManager)
[2021-08-14 11:17:17,506] INFO Attempting recovery for all logs in /tmp/kafka-logs since no clean shutdown file was found (kafka.log.LogManager)
[2021-08-14 11:17:17,532] INFO Loaded 0 logs in 33ms. (kafka.log.LogManager)
[2021-08-14 11:17:17,533] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
[2021-08-14 11:17:17,536] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2021-08-14 11:17:18,486] INFO Updated connection-accept-rate max connection creation rate to 2147483647 (kafka.network.ConnectionQuotas)
[2021-08-14 11:17:18,490] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2021-08-14 11:17:18,568] INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Created data-plane acceptor and processors for endpoint : ListenerName(PLAINTEXT) (kafka.network.SocketServer)
[2021-08-14 11:17:18,614] INFO [broker-0-to-controller-send-thread]: Starting (kafka.server.BrokerToControllerRequestThread)
[2021-08-14 11:17:18,647] INFO [ExpirationReaper-0-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2021-08-14 11:17:18,648] INFO [ExpirationReaper-0-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2021-08-14 11:17:18,648] INFO [ExpirationReaper-0-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2021-08-14 11:17:18,649] INFO [ExpirationReaper-0-ElectLeader]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2021-08-14 11:17:18,670] INFO [LogDirFailureHandler]: Starting (kafka.server.ReplicaManager$LogDirFailureHandler)
[2021-08-14 11:17:18,701] INFO Creating /brokers/ids/0 (is it secure? false) (kafka.zk.KafkaZkClient)
[2021-08-14 11:17:18,720] INFO Stat of the created znode at /brokers/ids/0 is: 25,25,1628907438713,1628907438713,1,0,0,72058075634860032,202,0,25
 (kafka.zk.KafkaZkClient)
[2021-08-14 11:17:18,721] INFO Registered broker 0 at path /brokers/ids/0 with addresses: PLAINTEXT://localhost:9092, czxid (broker epoch): 25 (kafka.zk.KafkaZkClient)
[2021-08-14 11:17:18,788] INFO [ExpirationReaper-0-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2021-08-14 11:17:18,799] INFO [ExpirationReaper-0-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2021-08-14 11:17:18,801] INFO [ExpirationReaper-0-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2021-08-14 11:17:18,803] INFO Successfully created /controller_epoch with initial epoch 0 (kafka.zk.KafkaZkClient)
[2021-08-14 11:17:18,815] INFO [GroupCoordinator 0]: Starting up. (kafka.coordinator.group.GroupCoordinator)
[2021-08-14 11:17:18,849] INFO [GroupCoordinator 0]: Startup complete. (kafka.coordinator.group.GroupCoordinator)
[2021-08-14 11:17:18,850] INFO Feature ZK node created at path: /feature (kafka.server.FinalizedFeatureChangeListener)
[2021-08-14 11:17:18,897] INFO Updated cache from existing <empty> to latest FinalizedFeaturesAndEpoch(features=Features{}, epoch=0). (kafka.server.FinalizedFeatureCache)
[2021-08-14 11:17:18,902] INFO [ProducerId Manager 0]: Acquired new producerId block (brokerId:0,blockStartProducerId:0,blockEndProducerId:999) by writing to Zk with path version 1 (kafka.coordinator.transaction.ProducerIdManager)
[2021-08-14 11:17:18,902] INFO [TransactionCoordinator id=0] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
[2021-08-14 11:17:18,906] INFO [TransactionCoordinator id=0] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2021-08-14 11:17:18,908] INFO [Transaction Marker Channel Manager 0]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2021-08-14 11:17:18,946] INFO [ExpirationReaper-0-AlterAcls]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2021-08-14 11:17:18,974] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
[2021-08-14 11:17:19,052] INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Starting socket server acceptors and processors (kafka.network.SocketServer)
[2021-08-14 11:17:19,069] INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Started data-plane acceptor and processor(s) for endpoint : ListenerName(PLAINTEXT) (kafka.network.SocketServer)
[2021-08-14 11:17:19,070] INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Started socket server acceptors and processors (kafka.network.SocketServer)
[2021-08-14 11:17:19,074] INFO Kafka version: 6.2.0-ccs (org.apache.kafka.common.utils.AppInfoParser)
[2021-08-14 11:17:19,074] INFO Kafka commitId: 1a5755cf9401c84f (org.apache.kafka.common.utils.AppInfoParser)
[2021-08-14 11:17:19,074] INFO Kafka startTimeMs: 1628907439070 (org.apache.kafka.common.utils.AppInfoParser)
[2021-08-14 11:17:19,076] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
[2021-08-14 11:17:19,134] INFO [broker-0-to-controller-send-thread]: Recorded new controller, from now on will use broker localhost:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)

ポート9092がListenされた状態になりました。

Schema Registry

構成

Configure Confluent Platform - Schema Registry

製品提供のプロパティファイルはこちら。
etc/schema-registry/schema-registry.properties

/opt/confluent-6.2.0/etc/schema-registry/schema-registry.properties
#
# Copyright 2018 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# The address the socket server listens on.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=http://0.0.0.0:8081

# Zookeeper connection string for the Zookeeper cluster used by your Kafka cluster
# (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# Note: use of this property is deprecated.
#kafkastore.connection.url=localhost:2181

# Alternatively, Schema Registry can now operate without Zookeeper, handling all coordination via
# Kafka brokers. Use this setting to specify the bootstrap servers for your Kafka cluster and it
# will be used both for selecting the leader schema registry instance and for storing the data for
# registered schemas.
# (Note that you cannot mix the two modes; use this mode only on new deployments or by shutting down
# all instances, switching to the new configuration, and then starting the schema registry
# instances again.)
kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092

# The name of the topic to store schemas in
kafkastore.topic=_schemas

# If true, API requests that fail will include extra debugging information, including stack traces
debug=false

パラメーターのリファレンスはこちら
Schema Registry Configuration Options

起動/停止

Start Confluent Platform

  • 起動コマンド: bin/schema-registry-start -daemon <property_file>
  • 停止コマンド: bin/schema-registry-stop
  • ログ: logs/schema-registry.log
  • Listenポート: 8081

以下のコマンドで起動
/opt/confluent-6.2.0/bin/schema-registry-start -daemon /opt/confluent-6.2.0/etc/schema-registry/schema-registry.properties

起動時ログ: logs/schema-registry.log
[2021-08-14 11:57:16,923] INFO SchemaRegistryConfig values:
        access.control.allow.headers =
        access.control.allow.methods =
        access.control.allow.origin =
        access.control.skip.options = true
        authentication.method = NONE
        authentication.realm =
        authentication.roles = [*]
        authentication.skip.paths = []
        avro.compatibility.level =
        compression.enable = true
        csrf.prevention.enable = false
        csrf.prevention.token.endpoint = /csrf
        csrf.prevention.token.expiration.minutes = 30
        csrf.prevention.token.max.entries = 10000
        debug = false
        host.name = localhost
        idle.timeout.ms = 30000
        inter.instance.headers.whitelist = []
        inter.instance.protocol = http
        kafkastore.bootstrap.servers = [PLAINTEXT://localhost:9092]
        kafkastore.checkpoint.dir = /tmp
        kafkastore.checkpoint.version = 0
        kafkastore.connection.url =
        kafkastore.group.id =
        kafkastore.init.timeout.ms = 60000
        kafkastore.sasl.kerberos.kinit.cmd = /usr/bin/kinit
        kafkastore.sasl.kerberos.min.time.before.relogin = 60000
        kafkastore.sasl.kerberos.service.name =
        kafkastore.sasl.kerberos.ticket.renew.jitter = 0.05
        kafkastore.sasl.kerberos.ticket.renew.window.factor = 0.8
        kafkastore.sasl.mechanism = GSSAPI
        kafkastore.security.protocol = PLAINTEXT
        kafkastore.ssl.cipher.suites =
        kafkastore.ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1
        kafkastore.ssl.endpoint.identification.algorithm =
        kafkastore.ssl.key.password = [hidden]
        kafkastore.ssl.keymanager.algorithm = SunX509
        kafkastore.ssl.keystore.location =
        kafkastore.ssl.keystore.password = [hidden]
        kafkastore.ssl.keystore.type = JKS
        kafkastore.ssl.protocol = TLS
        kafkastore.ssl.provider =
        kafkastore.ssl.trustmanager.algorithm = PKIX
        kafkastore.ssl.truststore.location =
        kafkastore.ssl.truststore.password = [hidden]
        kafkastore.ssl.truststore.type = JKS
        kafkastore.timeout.ms = 500
        kafkastore.topic = _schemas
        kafkastore.topic.replication.factor = 3
        kafkastore.topic.skip.validation = false
        kafkastore.update.handlers = []
        kafkastore.write.max.retries = 5
        kafkastore.zk.session.timeout.ms = 30000
        leader.eligibility = true
        listeners = [http://0.0.0.0:8081]
        master.eligibility = null
        metric.reporters = []
        metrics.jmx.prefix = kafka.schema.registry
        metrics.num.samples = 2
        metrics.sample.window.ms = 30000
        metrics.tag.map = []
        mode.mutability = true
        port = 8081
        request.logger.name = io.confluent.rest-utils.requests
        request.queue.capacity = 2147483647
        request.queue.capacity.growby = 64
        request.queue.capacity.init = 128
        resource.extension.class = []
        resource.extension.classes = []
        resource.static.locations = []
        response.http.headers.config =
        response.mediatype.default = application/vnd.schemaregistry.v1+json
        response.mediatype.preferred = [application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json]
        rest.servlet.initializor.classes = []
        schema.cache.expiry.secs = 300
        schema.cache.size = 1000
        schema.compatibility.level = backward
        schema.providers = []
        schema.registry.group.id = schema-registry
        schema.registry.inter.instance.protocol =
        schema.registry.resource.extension.class = []
        schema.registry.zk.namespace = schema_registry
        shutdown.graceful.ms = 1000
        ssl.cipher.suites = []
        ssl.client.auth = false
        ssl.client.authentication = NONE
        ssl.enabled.protocols = []
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = [hidden]
        ssl.keymanager.algorithm =
        ssl.keystore.location =
        ssl.keystore.password = [hidden]
        ssl.keystore.reload = false
        ssl.keystore.type = JKS
        ssl.keystore.watch.location =
        ssl.protocol = TLS
        ssl.provider =
        ssl.trustmanager.algorithm =
        ssl.truststore.location =
        ssl.truststore.password = [hidden]
        ssl.truststore.type = JKS
        thread.pool.max = 200
        thread.pool.min = 8
        websocket.path.prefix = /ws
        websocket.servlet.initializor.classes = []
        zookeeper.set.acl = false
 (io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig)
[2021-08-14 11:57:16,998] INFO Logging initialized @1039ms to org.eclipse.jetty.util.log.Slf4jLog (org.eclipse.jetty.util.log)
[2021-08-14 11:57:17,007] INFO Initial capacity 128, increased by 64, maximum capacity 2147483647. (io.confluent.rest.ApplicationServer)
[2021-08-14 11:57:17,117] INFO Adding listener: http://0.0.0.0:8081 (io.confluent.rest.ApplicationServer)
[2021-08-14 11:57:17,999] INFO Registering schema provider for AVRO: io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider (io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry)
[2021-08-14 11:57:17,999] INFO Registering schema provider for JSON: io.confluent.kafka.schemaregistry.json.JsonSchemaProvider (io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry)
[2021-08-14 11:57:17,999] INFO Registering schema provider for PROTOBUF: io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider (io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry)
[2021-08-14 11:57:18,063] INFO Initializing KafkaStore with broker endpoints: PLAINTEXT://localhost:9092 (io.confluent.kafka.schemaregistry.storage.KafkaStore)
[2021-08-14 11:57:18,088] INFO Creating schemas topic _schemas (io.confluent.kafka.schemaregistry.storage.KafkaStore)
[2021-08-14 11:57:18,090] WARN Creating the schema topic _schemas using a replication factor of 1, which is less than the desired one of 3. If this is a production environment, it's crucial to add more brokers and increase the replication factor of the topic. (io.confluent.kafka.schemaregistry.storage.KafkaStore)
[2021-08-14 11:57:18,442] INFO Kafka store reader thread starting consumer (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread)
[2021-08-14 11:57:18,603] INFO Seeking to beginning for all partitions (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread)
[2021-08-14 11:57:18,604] INFO Initialized last consumed offset to -1 (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread)
[2021-08-14 11:57:18,606] INFO [kafka-store-reader-thread-_schemas]: Starting (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread)
[2021-08-14 11:57:18,851] INFO Wait to catch up until the offset at 0 (io.confluent.kafka.schemaregistry.storage.KafkaStore)
[2021-08-14 11:57:18,963] INFO Reached offset at 0 (io.confluent.kafka.schemaregistry.storage.KafkaStore)
[2021-08-14 11:57:18,964] INFO Joining schema registry with Kafka-based coordination (io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry)
[2021-08-14 11:57:19,843] INFO Finished rebalance with leader election result: Assignment{version=1, error=0, leader='sr-1-76d7d323-be6d-42fa-b17e-d6d60fd14ac6', leaderIdentity=version=1,host=localhost,port=8081,scheme=http,leaderEligibility=true} (io.confluent.kafka.schemaregistry.leaderelector.kafka.KafkaGroupLeaderElector)
[2021-08-14 11:57:19,877] INFO Wait to catch up until the offset at 1 (io.confluent.kafka.schemaregistry.storage.KafkaStore)
[2021-08-14 11:57:19,881] INFO Reached offset at 1 (io.confluent.kafka.schemaregistry.storage.KafkaStore)
[2021-08-14 11:57:20,084] INFO jetty-9.4.40.v20210413; built: 2021-04-13T20:42:42.668Z; git: b881a572662e1943a14ae12e7e1207989f218b74; jvm 1.8.0_242-b08 (org.eclipse.jetty.server.Server)
[2021-08-14 11:57:20,185] INFO DefaultSessionIdManager workerName=node0 (org.eclipse.jetty.server.session)
[2021-08-14 11:57:20,186] INFO No SessionScavenger set, using defaults (org.eclipse.jetty.server.session)
[2021-08-14 11:57:20,187] INFO node0 Scavenging every 600000ms (org.eclipse.jetty.server.session)
[2021-08-14 11:57:20,992] INFO HV000001: Hibernate Validator 6.1.7.Final (org.hibernate.validator.internal.util.Version)
[2021-08-14 11:57:21,380] INFO Started o.e.j.s.ServletContextHandler@3e7dd664{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler)
[2021-08-14 11:57:21,401] INFO Started o.e.j.s.ServletContextHandler@71c27ee8{/ws,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler)
[2021-08-14 11:57:21,431] INFO Started NetworkTrafficServerConnector@4c762604{HTTP/1.1, (http/1.1)}{0.0.0.0:8081} (org.eclipse.jetty.server.AbstractConnector)
[2021-08-14 11:57:21,432] INFO Started @5478ms (org.eclipse.jetty.server.Server)
[2021-08-14 11:57:21,432] INFO Server started, listening for requests... (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain)

ポート8081がListenされた状態になりました。

参考: 起動/停止関連スクリプト

/opt/confluent-6.2.0/bin/zookeeper-server-start
zookeeper-server-start
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

if [ $# -lt 1 ];
then
        echo "USAGE: $0 [-daemon] zookeeper.properties"
        exit 1
fi
base_dir=$(dirname $0)

if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
  LOG4J_CONFIG_NORMAL_INSTALL="/etc/kafka/log4j.properties"
  LOG4J_CONFIG_ZIP_INSTALL="$base_dir/../etc/kafka/log4j.properties"
  if [ -e "$LOG4J_CONFIG_NORMAL_INSTALL" ]; then # Normal install layout
    KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${LOG4J_CONFIG_NORMAL_INSTALL}"
  elif [ -e "${LOG4J_CONFIG_ZIP_INSTALL}" ]; then # Simple zip file layout
    KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${LOG4J_CONFIG_ZIP_INSTALL}"
  else # Fallback to normal default
    KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
  fi
fi
export KAFKA_LOG4J_OPTS

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M"
fi

EXTRA_ARGS=${EXTRA_ARGS-'-name zookeeper -loggc'}

COMMAND=$1
case $COMMAND in
  -daemon)
     EXTRA_ARGS="-daemon "$EXTRA_ARGS
     shift
     ;;
 *)
     ;;
esac

exec $base_dir/kafka-run-class $EXTRA_ARGS org.apache.zookeeper.server.quorum.QuorumPeerMain "$@"
/opt/confluent-6.2.0/bin/zookeeper-server-stop
zookeeper-server-stop
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
SIGNAL=${SIGNAL:-TERM}

OSNAME=$(uname -s)
if [[ "$OSNAME" == "OS/390" ]]; then
    if [ -z $JOBNAME ]; then
        JOBNAME="ZKEESTRT"
    fi
    PIDS=$(ps -A -o pid,jobname,comm | grep -i $JOBNAME | grep java | grep -v grep | awk '{print $1}')
elif [[ "$OSNAME" == "OS400" ]]; then
    PIDS=$(ps -Af | grep java | grep -i QuorumPeerMain | grep -v grep | awk '{print $2}')
else
    PIDS=$(ps ax | grep java | grep -i QuorumPeerMain | grep -v grep | awk '{print $1}')
fi

if [ -z "$PIDS" ]; then
  echo "No zookeeper server to stop"
  exit 1
else
  kill -s $SIGNAL $PIDS
fi
/opt/confluent-6.2.0/bin/kafka-server-start
kafka-server-start
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

if [ $# -lt 1 ];
then
        echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
        exit 1
fi
base_dir=$(dirname $0)

if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
  LOG4J_CONFIG_NORMAL_INSTALL="/etc/kafka/log4j.properties"
  LOG4J_CONFIG_ZIP_INSTALL="$base_dir/../etc/kafka/log4j.properties"
  if [ -e "$LOG4J_CONFIG_NORMAL_INSTALL" ]; then # Normal install layout
    KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${LOG4J_CONFIG_NORMAL_INSTALL}"
  elif [ -e "${LOG4J_CONFIG_ZIP_INSTALL}" ]; then # Simple zip file layout
    KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${LOG4J_CONFIG_ZIP_INSTALL}"
  else # Fallback to normal default
    KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
  fi
fi
export KAFKA_LOG4J_OPTS

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}

COMMAND=$1
case $COMMAND in
  -daemon)
    EXTRA_ARGS="-daemon "$EXTRA_ARGS
    shift
    ;;
  *)
    ;;
esac

exec $base_dir/kafka-run-class $EXTRA_ARGS kafka.Kafka "$@"
/opt/confluent-6.2.0/bin/kafka-server-stop
kafka-server-stop
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
SIGNAL=${SIGNAL:-TERM}

OSNAME=$(uname -s)
if [[ "$OSNAME" == "OS/390" ]]; then
    if [ -z $JOBNAME ]; then
        JOBNAME="KAFKSTRT"
    fi
    PIDS=$(ps -A -o pid,jobname,comm | grep -i $JOBNAME | grep java | grep -v grep | awk '{print $1}')
elif [[ "$OSNAME" == "OS400" ]]; then
    PIDS=$(ps -Af | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $2}')
else
    PIDS=$(ps ax | grep ' kafka\.Kafka ' | grep java | grep -v grep | awk '{print $1}')
    PIDS_SUPPORT=$(ps ax | grep -i 'io\.confluent\.support\.metrics\.SupportedKafka' | grep java | grep -v grep | awk '{print $1}')
fi

if [ -z "$PIDS" ]; then
  # Normal Kafka is not running, but maybe we are running the support wrapper?
  if [ -z "${PIDS_SUPPORT}" ]; then
    echo "No kafka server to stop"
    exit 1
  else
    kill -s $SIGNAL $PIDS_SUPPORT
  fi
else
  kill -s $SIGNAL $PIDS
fi
/opt/confluent-6.2.0/bin/kafka-run-class
kafka-run-class
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

if [ $# -lt 1 ];
then
  echo "USAGE: $0 [-daemon] [-name servicename] [-loggc] classname [opts]"
  exit 1
fi

# CYGWIN == 1 if Cygwin is detected, else 0.
if [[ $(uname -a) =~ "CYGWIN" ]]; then
  CYGWIN=1
else
  CYGWIN=0
fi

if [ -z "$INCLUDE_TEST_JARS" ]; then
  INCLUDE_TEST_JARS=false
fi

# Exclude jars not necessary for running commands.
regex="(-(test|test-sources|src|scaladoc|javadoc)\.jar|jar.asc)$"
should_include_file() {
  if [ "$INCLUDE_TEST_JARS" = true ]; then
    return 0
  fi
  file=$1
  if [ -z "$(echo "$file" | egrep "$regex")" ] ; then
    return 0
  else
    return 1
  fi
}

base_dir=$(dirname $0)/..

if [ -z "$SCALA_VERSION" ]; then
  SCALA_VERSION=2.13.5
  if [[ -f "$base_dir/gradle.properties" ]]; then
    SCALA_VERSION=`grep "^scalaVersion=" "$base_dir/gradle.properties" | cut -d= -f 2`
  fi
fi

if [ -z "$SCALA_BINARY_VERSION" ]; then
  SCALA_BINARY_VERSION=$(echo $SCALA_VERSION | cut -f 1-2 -d '.')
fi

# run ./gradlew copyDependantLibs to get all dependant jars in a local dir
shopt -s nullglob
if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
  for dir in "$base_dir"/core/build/dependant-libs-${SCALA_VERSION}*;
  do
    CLASSPATH="$CLASSPATH:$dir/*"
  done
fi

for file in "$base_dir"/examples/build/libs/kafka-examples*.jar;
do
  if should_include_file "$file"; then
    CLASSPATH="$CLASSPATH":"$file"
  fi
done

if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
  clients_lib_dir=$(dirname $0)/../clients/build/libs
  streams_lib_dir=$(dirname $0)/../streams/build/libs
  streams_dependant_clients_lib_dir=$(dirname $0)/../streams/build/dependant-libs-${SCALA_VERSION}
else
  clients_lib_dir=/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs
  streams_lib_dir=$clients_lib_dir
  streams_dependant_clients_lib_dir=$streams_lib_dir
fi


for file in "$clients_lib_dir"/kafka-clients*.jar;
do
  if should_include_file "$file"; then
    CLASSPATH="$CLASSPATH":"$file"
  fi
done

for file in "$streams_lib_dir"/kafka-streams*.jar;
do
  if should_include_file "$file"; then
    CLASSPATH="$CLASSPATH":"$file"
  fi
done

if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
  for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar;
  do
    if should_include_file "$file"; then
      CLASSPATH="$CLASSPATH":"$file"
    fi
  done
else
  VERSION_NO_DOTS=`echo $UPGRADE_KAFKA_STREAMS_TEST_VERSION | sed 's/\.//g'`
  SHORT_VERSION_NO_DOTS=${VERSION_NO_DOTS:0:((${#VERSION_NO_DOTS} - 1))} # remove last char, ie, bug-fix number
  for file in "$base_dir"/streams/upgrade-system-tests-$SHORT_VERSION_NO_DOTS/build/libs/kafka-streams-upgrade-system-tests*.jar;
  do
    if should_include_file "$file"; then
      CLASSPATH="$file":"$CLASSPATH"
    fi
  done
  if [ "$SHORT_VERSION_NO_DOTS" = "0100" ]; then
    CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zkclient-0.8.jar":"$CLASSPATH"
    CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zookeeper-3.4.6.jar":"$CLASSPATH"
  fi
  if [ "$SHORT_VERSION_NO_DOTS" = "0101" ]; then
    CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zkclient-0.9.jar":"$CLASSPATH"
    CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zookeeper-3.4.8.jar":"$CLASSPATH"
  fi
fi

for file in "$streams_dependant_clients_lib_dir"/rocksdb*.jar;
do
  CLASSPATH="$CLASSPATH":"$file"
done

for file in "$streams_dependant_clients_lib_dir"/*hamcrest*.jar;
do
  CLASSPATH="$CLASSPATH":"$file"
done

for file in "$base_dir"/shell/build/libs/kafka-shell*.jar;
do
  if should_include_file "$file"; then
    CLASSPATH="$CLASSPATH":"$file"
  fi
done

for dir in "$base_dir"/shell/build/dependant-libs-${SCALA_VERSION}*;
do
  CLASSPATH="$CLASSPATH:$dir/*"
done

for file in "$base_dir"/tools/build/libs/kafka-tools*.jar;
do
  if should_include_file "$file"; then
    CLASSPATH="$CLASSPATH":"$file"
  fi
done

for dir in "$base_dir"/tools/build/dependant-libs-${SCALA_VERSION}*;
do
  CLASSPATH="$CLASSPATH:$dir/*"
done

for cc_pkg in "api" "transforms" "runtime" "file" "mirror" "mirror-client" "json" "tools" "basic-auth-extension"
do
  for file in "$base_dir"/connect/${cc_pkg}/build/libs/connect-${cc_pkg}*.jar;
  do
    if should_include_file "$file"; then
      CLASSPATH="$CLASSPATH":"$file"
    fi
  done
  if [ -d "$base_dir/connect/${cc_pkg}/build/dependant-libs" ] ; then
    CLASSPATH="$CLASSPATH:$base_dir/connect/${cc_pkg}/build/dependant-libs/*"
  fi
done

# classpath addition for release
for file in "$base_dir"/libs/*;
do
  if should_include_file "$file"; then
    CLASSPATH="$CLASSPATH":"$file"
  fi
done

# CONFLUENT: classpath addition for releases with LSB-style layout
CLASSPATH="$CLASSPATH":"$base_dir/share/java/kafka/*"

# classpath for telemetry
CLASSPATH="$CLASSPATH":"$base_dir/share/java/confluent-telemetry/*"

for file in "$base_dir"/core/build/libs/kafka_${SCALA_BINARY_VERSION}*.jar;
do
  if should_include_file "$file"; then
    CLASSPATH="$CLASSPATH":"$file"
  fi
done
shopt -u nullglob

if [ -z "$CLASSPATH" ] ; then
  echo "Classpath is empty. Please build the project first e.g. by running './gradlew jar -PscalaVersion=$SCALA_VERSION'"
  exit 1
fi

# JMX settings
if [ -z "$KAFKA_JMX_OPTS" ]; then
  KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false  -Dcom.sun.management.jmxremote.ssl=false "
fi

# JMX port to use
if [  $JMX_PORT ]; then
  KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT "
fi

# Log directory to use
if [ "x$LOG_DIR" = "x" ]; then
  LOG_DIR="$base_dir/logs"
fi

# Log4j settings
if [ -z "$KAFKA_LOG4J_OPTS" ]; then
  # Log to console. This is a tool.
  LOG4J_CONFIG_NORMAL_INSTALL="/etc/kafka/tools-log4j.properties"
  LOG4J_CONFIG_ZIP_INSTALL="$base_dir/etc/kafka/tools-log4j.properties"
  if [ -e "$LOG4J_CONFIG_NORMAL_INSTALL" ]; then # Normal install layout
    LOG4J_DIR="${LOG4J_CONFIG_NORMAL_INSTALL}"
  elif [ -e "${LOG4J_CONFIG_ZIP_INSTALL}" ]; then # Simple zip file layout
    LOG4J_DIR="${LOG4J_CONFIG_ZIP_INSTALL}"
  else # Fallback to normal default
    LOG4J_DIR="$base_dir/config/tools-log4j.properties"
  fi
  # If Cygwin is detected, LOG4J_DIR is converted to Windows format.
  (( CYGWIN )) && LOG4J_DIR=$(cygpath --path --mixed "${LOG4J_DIR}")
  KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${LOG4J_DIR}"
else
  # create logs directory
  if [ ! -d "$LOG_DIR" ]; then
    mkdir -p "$LOG_DIR"
  fi
fi

# If Cygwin is detected, LOG_DIR is converted to Windows format.
(( CYGWIN )) && LOG_DIR=$(cygpath --path --mixed "${LOG_DIR}")
KAFKA_LOG4J_OPTS="-Dkafka.logs.dir=$LOG_DIR $KAFKA_LOG4J_OPTS"

# Generic jvm settings you want to add
if [ -z "$KAFKA_OPTS" ]; then
  KAFKA_OPTS=""
fi

# Set Debug options if enabled
if [ "x$KAFKA_DEBUG" != "x" ]; then

    # Use default ports
    DEFAULT_JAVA_DEBUG_PORT="5005"

    if [ -z "$JAVA_DEBUG_PORT" ]; then
        JAVA_DEBUG_PORT="$DEFAULT_JAVA_DEBUG_PORT"
    fi

    # Use the defaults if JAVA_DEBUG_OPTS was not set
    DEFAULT_JAVA_DEBUG_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=${DEBUG_SUSPEND_FLAG:-n},address=$JAVA_DEBUG_PORT"
    if [ -z "$JAVA_DEBUG_OPTS" ]; then
        JAVA_DEBUG_OPTS="$DEFAULT_JAVA_DEBUG_OPTS"
    fi

    echo "Enabling Java debug options: $JAVA_DEBUG_OPTS"
    KAFKA_OPTS="$JAVA_DEBUG_OPTS $KAFKA_OPTS"
fi

# Which java to use
if [ -z "$JAVA_HOME" ]; then
  JAVA="java"
else
  JAVA="$JAVA_HOME/bin/java"
fi

# Memory options
if [ -z "$KAFKA_HEAP_OPTS" ]; then
  KAFKA_HEAP_OPTS="-Xmx256M"
fi

# JVM performance options
# MaxInlineLevel=15 is the default since JDK 14 and can be removed once older JDKs are no longer supported
if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then
  KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true"
fi

while [ $# -gt 0 ]; do
  COMMAND=$1
  case $COMMAND in
    -name)
      DAEMON_NAME=$2
      CONSOLE_OUTPUT_FILE=$LOG_DIR/$DAEMON_NAME.out
      shift 2
      ;;
    -loggc)
      if [ -z "$KAFKA_GC_LOG_OPTS" ]; then
        GC_LOG_ENABLED="true"
      fi
      shift
      ;;
    -daemon)
      DAEMON_MODE="true"
      shift
      ;;
    *)
      break
      ;;
  esac
done

# GC options
GC_FILE_SUFFIX='-gc.log'
GC_LOG_FILE_NAME=''
if [ "x$GC_LOG_ENABLED" = "xtrue" ]; then
  GC_LOG_FILE_NAME=$DAEMON_NAME$GC_FILE_SUFFIX

  # The first segment of the version number, which is '1' for releases before Java 9
  # it then becomes '9', '10', ...
  # Some examples of the first line of `java --version`:
  # 8 -> java version "1.8.0_152"
  # 9.0.4 -> java version "9.0.4"
  # 10 -> java version "10" 2018-03-20
  # 10.0.1 -> java version "10.0.1" 2018-04-17
  # We need to match to the end of the line to prevent sed from printing the characters that do not match
  JAVA_MAJOR_VERSION=$("$JAVA" -version 2>&1 | sed -E -n 's/.* version "([0-9]*).*$/\1/p')
  if [[ "$JAVA_MAJOR_VERSION" -ge "9" ]] ; then
    KAFKA_GC_LOG_OPTS="-Xlog:gc*:file=$LOG_DIR/$GC_LOG_FILE_NAME:time,tags:filecount=10,filesize=100M"
  else
    KAFKA_GC_LOG_OPTS="-Xloggc:$LOG_DIR/$GC_LOG_FILE_NAME -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M"
  fi
fi

# Remove a possible colon prefix from the classpath (happens at lines like `CLASSPATH="$CLASSPATH:$file"` when CLASSPATH is blank)
# Syntax used on the right side is native Bash string manipulation; for more details see
# http://tldp.org/LDP/abs/html/string-manipulation.html, specifically the section titled "Substring Removal"
CLASSPATH=${CLASSPATH#:}

# If Cygwin is detected, classpath is converted to Windows format.
(( CYGWIN )) && CLASSPATH=$(cygpath --path --mixed "${CLASSPATH}")


# Launch mode
if [ "x$DAEMON_MODE" = "xtrue" ]; then
  nohup "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp "$CLASSPATH" $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
else
  exec "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp "$CLASSPATH" $KAFKA_OPTS "$@"
fi
/opt/confluent-6.2.0/bin/schema-registry-start
schema-registry-start
#!/bin/bash
#
# Copyright 2018 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

print_synopsis() {
        echo "USAGE: $0 [-daemon] schema-registry.properties"
}

EXTRA_ARGS=${EXTRA_ARGS-'-name schemaRegistry'}
# EXTRA_ARGS=${EXTRA_ARGS-'-name schemaRegistry -loggc'}

ARG=$1
case $ARG in
  -daemon)
    EXTRA_ARGS="-daemon "$EXTRA_ARGS
    shift
    ;;
  *)
    ;;
esac

if [[ $# -lt 1 ]]; then
        print_synopsis
        exit 1
fi

if [[ ! -e "$1" ]]; then
        echo "Property file $1 does not exist"
        print_synopsis
        exit 1
fi

exec $(dirname $0)/schema-registry-run-class ${EXTRA_ARGS} io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain "$@"
/opt/confluent-6.2.0/bin/schema-registry-stop
schema-registry-stop
#!/bin/bash
#
# Copyright 2018 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# When stopping, search for both the current SchemaRegistryMain class and the deprecated Main class.
exec $(dirname $0)/schema-registry-stop-service "(io.confluent.kafka.schemaregistry.rest.Main)|(io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain)"
/opt/confluent-6.2.0/bin/schema-registry-run-class
schema-registry-run-class
#!/bin/bash
#
# Copyright 2018 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

if [ $# -lt 1 ];
then
  echo "USAGE: $0 [-daemon] [-name servicename] [-loggc] classname [opts]"
  exit 1
fi

base_dir=$(dirname $0)/..

# CYGINW == 1 if Cygwin is detected, else 0.
if [[ $(uname -a) =~ "CYGWIN" ]]; then
  CYGWIN=1
else
  CYGWIN=0
fi

# Development jars. `mvn package` should collect all the required dependency jars here
for dir in $base_dir/package-schema-registry/target/kafka-schema-registry-package-*-development; do
  CLASSPATH=$CLASSPATH:$dir/share/java/schema-registry/*
done

# Production jars, including kafka, rest-utils, and schema-registry
for library in "confluent-security/schema-registry" "confluent-common" "confluent-telemetry" "rest-utils" "schema-registry"; do
  CLASSPATH=$CLASSPATH:$base_dir/share/java/$library/*
done

# Log directory to use
if [ "x$LOG_DIR" = "x" ]; then
  LOG_DIR="$base_dir/logs"
fi

# create logs directory
if [ ! -d "$LOG_DIR" ]; then
  mkdir -p "$LOG_DIR"
fi

# logj4 settings
if [ "x$SCHEMA_REGISTRY_LOG4J_OPTS" = "x" ]; then
  # Test for files from dev -> packages so this will work as expected in dev if you have packages
  # installed
  if [ -e "$base_dir/config/log4j.properties" ]; then # Dev environment
    LOG4J_DIR="$base_dir/config/log4j.properties"
  elif [ -e "$base_dir/etc/schema-registry/log4j.properties" ]; then # Simple zip file layout
    LOG4J_DIR="$base_dir/etc/schema-registry/log4j.properties"
  elif [ -e "/etc/schema-registry/log4j.properties" ]; then # Normal install layout
    LOG4J_DIR="/etc/schema-registry/log4j.properties"
  fi

    # If Cygwin is detected, LOG4J_DIR is converted to Windows format.
    (( CYGWIN )) && LOG4J_DIR=$(cygpath --path --mixed "${LOG4J_DIR}")

    SCHEMA_REGISTRY_LOG4J_OPTS="-Dlog4j.configuration=file:${LOG4J_DIR}"
fi

# If Cygwin is detected, LOG_DIR is converted to Windows format.
(( CYGWIN )) && LOG_DIR=$(cygpath --path --mixed "${LOG_DIR}")

SCHEMA_REGISTRY_LOG4J_OPTS="-Dschema-registry.log.dir=$LOG_DIR $SCHEMA_REGISTRY_LOG4J_OPTS"

# JMX settings
if [ -z "$SCHEMA_REGISTRY_JMX_OPTS" ]; then
  SCHEMA_REGISTRY_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false "
fi

# JMX port to use
if [  $JMX_PORT ]; then
  SCHEMA_REGISTRY_JMX_OPTS="$SCHEMA_REGISTRY_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT "
fi

# Generic jvm settings you want to add
if [ -z "$SCHEMA_REGISTRY_OPTS" ]; then
  SCHEMA_REGISTRY_OPTS=""
fi

# Which java to use
if [ -z "$JAVA_HOME" ]; then
  JAVA="java"
else
  JAVA="$JAVA_HOME/bin/java"
fi

# Memory options
if [ -z "$SCHEMA_REGISTRY_HEAP_OPTS" ]; then
  SCHEMA_REGISTRY_HEAP_OPTS="-Xmx512M"
fi

# JVM performance options
if [ -z "$SCHEMA_REGISTRY_JVM_PERFORMANCE_OPTS" ]; then
  SCHEMA_REGISTRY_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true"
fi

while [ $# -gt 0 ]; do
  COMMAND=$1
  case $COMMAND in
    -help)
      HELP="true"
      break
      ;;
    -name)
      DAEMON_NAME=$2
      CONSOLE_OUTPUT_FILE=$LOG_DIR/$DAEMON_NAME.out
      shift 2
      ;;
    -loggc)
      if [ -z "$SCHEMA_REGISTRY_GC_LOG_OPTS" ]; then
        GC_LOG_ENABLED="true"
      fi
      shift
      ;;
    -daemon)
      DAEMON_MODE="true"
      shift
      ;;
    *)
      break
      ;;
  esac
done

if [ "x$$HELP" = "xtrue" ]; then
  echo "USAGE: $0 [-daemon] [-name servicename] [-loggc] classname [opts]"
  exit 0
fi

MAIN=$1
shift

# GC options
GC_FILE_SUFFIX='-gc.log'
GC_LOG_FILE_NAME=''
if [ "x$GC_LOG_ENABLED" = "xtrue" ]; then
  GC_LOG_FILE_NAME=$DAEMON_NAME$GC_FILE_SUFFIX

  # The first segment of the version number, which is '1' for releases before Java 9
  # it then becomes '9', '10', ...
  # Some examples of the first line of `java --version`:
  # 8 -> java version "1.8.0_152"
  # 9.0.4 -> java version "9.0.4"
  # 10 -> java version "10" 2018-03-20
  # 10.0.1 -> java version "10.0.1" 2018-04-17
  # We need to match to the end of the line to prevent sed from printing the characters that do not match
  JAVA_MAJOR_VERSION=$($JAVA -version 2>&1 | sed -E -n 's/.* version "([0-9]*).*$/\1/p')
  if [[ "$JAVA_MAJOR_VERSION" -ge "9" ]] ; then
    SCHEMA_REGISTRY_GC_LOG_OPTS="-Xlog:gc*:file=$LOG_DIR/$GC_LOG_FILE_NAME:time,tags:filecount=10,filesize=102400"
  else
    SCHEMA_REGISTRY_GC_LOG_OPTS="-Xloggc:$LOG_DIR/$GC_LOG_FILE_NAME -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M"
  fi
fi

# If Cygwin is detected, classpath is converted to Windows format.
(( CYGWIN )) && CLASSPATH=$(cygpath --path --mixed "${CLASSPATH}")

# Launch mode
if [ "x$DAEMON_MODE" = "xtrue" ]; then
  CONSOLE_OUTPUT_FILE=${CONSOLE_OUTPUT_FILE:-${LOG_DIR}/schema-registry-console.out}
  nohup $JAVA $SCHEMA_REGISTRY_HEAP_OPTS $SCHEMA_REGISTRY_JVM_PERFORMANCE_OPTS $SCHEMA_REGISTRY_GC_LOG_OPTS $SCHEMA_REGISTRY_JMX_OPTS $SCHEMA_REGISTRY_LOG4J_OPTS -cp $CLASSPATH $SCHEMA_REGISTRY_OPTS "$MAIN" "$@" > "${CONSOLE_OUTPUT_FILE}" 2>&1 < /dev/null &
else
  exec "$JAVA" $SCHEMA_REGISTRY_HEAP_OPTS $SCHEMA_REGISTRY_JVM_PERFORMANCE_OPTS $SCHEMA_REGISTRY_GC_LOG_OPTS $SCHEMA_REGISTRY_JMX_OPTS $SCHEMA_REGISTRY_LOG4J_OPTS -cp $CLASSPATH $SCHEMA_REGISTRY_OPTS "$MAIN" "$@"
fi
1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?