Kafka の入門用として、docker-compose を使い動作確認環境を構築しました。
今回の目標は次のとおりです。
- ローカルからアクセスできるようにする
- port を expose します。もしアクセスするアプリケーションも docker 上で動かす場合は設定が変わると思います
- 個人的に検証したいことがあったため、broker を 2 つ立ち上げる
- kafka-uiという Kafka を管理できる Web UI を使う
- 動作に必須、または変更が必要なもの以外の設定値は混乱を避けるため極力設定しない
完成品
検索で見つけた例を参考にしながら組み合わせて、次のような YAML を作成しました。
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.9.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka0:
image: confluentinc/cp-kafka:7.9.0
container_name: kafka0
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka0:29092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT_INTERNAL
kafka1:
image: confluentinc/cp-kafka:7.9.0
container_name: kafka1
ports:
- "9093:9093"
depends_on:
- zookeeper
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9093,PLAINTEXT_INTERNAL://kafka1:29093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT_INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
kafka-ui:
container_name: kafka-ui
image: ghcr.io/kafbat/kafka-ui:v1.2.0
ports:
- "8080:8080"
depends_on:
- kafka0
restart: always
environment:
KAFKA_CLUSTERS_0_NAME: local-cluster
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092
設定値について
使用イメージ
使用している各イメージは記事執筆時点で最新でした。
環境変数
cp-zookeeper
ZOOKEEPER_CLIENT_PORT
Kafka からのアクセスをどのポートでリッスンするかを設定します。Kafka クラスターで使う場合、zookeeper ではこの環境変数のみ起動に必須でした。
cp-kafka
cp-kafkaの環境変数についてのドキュメントは
にあります。ただし、このページには環境変数の意味についてあまり詳しくは書かれていません。
ここには、cp-kafkaのKAFKA_ZOOKEEPER_CONNECTのような環境変数はKAFKA_を取り除き、さらに_を.に置き換える、と記載されています。
つまりKAFKA_ZOOKEEPER_CONNECTはzookeeper.connectに対応します。この値で検索すれば設定内容の意味を調べられます。
Kafka の設定値全般は例えば、
や
にまとまっています。環境変数からの変換方式を把握した上で、このドキュメントを読めば様々な設定が Docker の環境変数経由で可能です。
Docker でない(本来の?)Kafka はプロパティファイルから設定を取得するわけですが、そちらではzookeeper.connect形式が設定値のキーになっています。
KAFKA_ZOOKEEPER_CONNECT
Kafka ブローカーが zookeeper と通信するための接続文字列を指定します。
Kafka ブローカーが zookeeper という名前のサービス(Docker 内)の 2181 ポートを介して zookeeper に接続することを意味します。
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
Kafka のリスナーとセキュリティプロトコルのマッピングを指定します。
によるとリスナーとは、
各リスナーは異なるポートまたはネットワークインターフェースをリッスンするために使用され
とあるので、雑ですが、Kafka が通信を受信する場所、のように理解しています。
セキュリティプロトコルは
によると、
ブローカーと通信するために使われるプロトコル。有効な値は: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL。
とあるので SSL などを指定して暗号化することもできるようですが、開発環境なので、あまりそのあたりの設定の理解はせず PLAINTEXT を利用しています。
PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXTとすると PLAINTEXT というリスナーが平文(PLAINTEXT)で通信し、PLAINTEXT_INTERNAL というリスナーも平文(PLAINTEXT)で通信する、という設定になると思います。
KAFKA_ADVERTISED_LISTENERS
Kafka ブローカーがクライアントや他のブローカーに対して自分自身をアドバタイズ(公開)するためのアドレスを設定します。
リスナーは<listenerName>://<hostname>:<port>の形式で表記されます。
例えば、
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9093,PLAINTEXT_INTERNAL://kafka1:29093
と設定すると、
- localhost:9093 で接続できる PLAINTEXT リスナー
- kafka1:29093 で接続できる PLAINTEXT_INTERNAL リスナー
の 2 つがクライアントに公開されます。
2 つ用意している理由は、ローカル環境と Docker ネットワーク内(kafka-ui、zookeeper、他の broker)からの両方でアクセスできるようにするためです。
ローカルからは localhost でアクセス可能ですが、kafka-ui コンテナなどからは localhost では他コンテナにある broker へ接続できません。そのため、他のコンテナからアクセスする場合は kafka0 などのコンテナ名をホスト名として指定する必要があります。
このため、advertised.listeners を複数設定しています。
advertised.listeners は理解が難しい項目でした。以下のページなどが参考になるかもしれません。
KAFKA_INTER_BROKER_LISTENER_NAME
Kafka はブローカー同士でも通信するため、Docker ネットワーク内の他のブローカーにどのリスナーに向けて通信すれば良いかを伝えます。
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
Kafka が内部的に利用しているトピックである offsets topic の replication factor (どの程度冗長化するか)の設定です。ブローカー数以下である必要があるため、1にしています。
kafka-ui
kafka-uiとは Kafka を Web UI で確認・操作(トピック作成、メッセージ送受信、メッセージ削除など)できるツールです。
環境変数による設定は以下に記載されています。
に記載されています。
KAFKA_CLUSTERS_0_NAME
クラスター名です。UI上に表示されます。
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS
Kafka クラスターにどのアドレスでアクセスすればよいかを設定します。
動作確認
Kafka へのメッセージ送受信にはkafkacatを利用します。
kafkacat は Debian,Ubuntu では
apt install kafkacat
でインストールできますが、Mac では
brew install kcat
でインストールできるようです。私は Ubuntu で動作確認しましたが、Mac の場合は以下の記述のkafkacatをkcatに置き換える必要があると思います。
メッセージ送信
kafkacat -b localhost:9092 -t new_topic -P
このコマンドを実行すると対話モードになり、任意の文字列を入力して Enter を押すとメッセージが送信されます。終了する場合は Ctrl+D を押します。
-bで Bootstrap broker を指定します。Kafka クライアントが最初に接続するサーバーで、ブローカーのうちの 1 つを指定すれば十分です。つまり、
kafkacat -b localhost:9093 -t new_topic -P
としても構いません。Kafka クライアントは初回接続後に適切なブローカーを選んで通信します。
実際の Kafka クラスターではブローカーが多数存在する場合もありますが、そのすべてを把握・列挙する必要はありません。いずれか 1 つを指定すれば、Kafka クライアントが適切に通信先を判断してくれます。
また、-tでトピックを指定します。デフォルトの設定では事前にトピックを作成しなくても自動で作成されます。
-Pで kafkacat をプロデューサーモードで起動します。
送信したメッセージは、http://localhost:8080から kafka-ui にアクセスすると確認できます。
メッセージ受信
画面でもメッセージを確認できましたが、kafkacat の場合は、
kafkacat -b localhost:9092 -t new_topic -C
でメッセージを受信できます。
-Cでコンシューマーモードを指定します。
これでトピックのメッセージを待ち受けることができます。

