Kafka に入門するために docker-compose を使い動作確認環境を作成しました。
要件は次のとおりです。
- ローカルからアクセスできるようにします
- port を expose します。もしアクセスするアプリケーションなども docker 上で動かす場合は設定が変わってくると思います
- 個人的に検証したいことがあったため、broker を 2 つ立ち上げます
- kafka-uiという Kafka を管理できる Web UI を使います
- 動作に必須もしくは変更する必要のあるもの以外の設定値は混乱を避けるためになるべく設定しないようにします
完成品
検索したヒットしたものをいろいろツギハギして次のような YAML を作成しました。
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.4.3
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka0:
image: confluentinc/cp-kafka:7.4.3
container_name: kafka0
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka0:29092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT_INTERNAL
kafka1:
image: confluentinc/cp-kafka:7.4.3
container_name: kafka1
ports:
- "9093:9093"
depends_on:
- zookeeper
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9093,PLAINTEXT_INTERNAL://kafka1:29093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT_INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:v0.7.1
ports:
- "8080:8080"
depends_on:
- kafka0
restart: always
environment:
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092
設定値について
使用イメージ
各イメージは記事執筆時点で最新でした。
環境変数
cp-zookeeper
ZOOKEEPER_CLIENT_PORT
Kafka からのアクセスをどのポートでリッスンするかを設定します。Kafka クラスターで使う場合、zookeeper ではこの環境変数のみ起動に必須でした。
cp-kafka
cp-kafka
の環境変数についてのドキュメントは
にあります。しかし、このページにはどの環境変数がどのような意味かについてあまり書かれていません。
このページには、cp-kafka
のKAFKA_ZOOKEEPER_CONNECT
のような環境変数はKAFKA_
を取って_
を.
に読み替えるということが書いてあります。
つまりKAFKA_ZOOKEEPER_CONNECT
はzookeeper.connect
になるわけですが、この値で検索すれば設定値の意味を調べられます。
Kafka の設定値全般は例えば、
や
にまとまっています。環境変数からの変換方式を把握した上で、このドキュメントを読めば様々な設定が Docker の環境変数経由で可能です。
Docker でない(本来の?)Kafka はプロパティファイルから設定を取得するわけですが、そちらではzookeeper.connect
形式が設定値のキーになっています。
KAFKA_ZOOKEEPER_CONNECT
Kafka ブローカーが zookeeper と通信するための接続文字列を指定します。
Kafka ブローカーが zookeeper という名前のサービス(Docker 内)の 2181 ポートを介して zookeeper に接続することを意味します。
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
Kafka のリスナーとセキュリティプロトコルのマッピングを指定します。
によるとリスナーとは、
各リスナーは異なるポートまたはネットワークインターフェースをリッスンするために使用され
とあるので、雑ですが、Kafka が通信を受信する場所、のように理解しています。
セキュリティプロトコルは
によると、
ブローカーと通信するために使われるプロトコル。有効な値は: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL。
とあるので SSL などを指定して暗号化することもできるようですが、開発環境なので、あまりそのあたりの設定の理解はせず PLAINTEXT を利用しています。
PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
とすると PLAINTEXT というリスナーが平文(PLAINTEXT)で通信し、PLAINTEXT_INTERNAL というリスナーも平文(PLAINTEXT)で通信する、という設定になると思います。
KAFKA_ADVERTISED_LISTENERS
Kafka ブローカーがクライアントや他のブローカーに対して自分自身をアドバタイズ(公開)するためのアドレスを設定します。
リスナーは<listenerName>://<hostname>:_<port>_
のように表記されます。
つまり、KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9093,PLAINTEXT_INTERNAL://kafka1:29093
という環境変数で、
- localhost:9093 で接続できる PLAINTEXT というリスナー
- kafka1:29093 で接続できる PLAINTEXT_INTERNAL というリスナー
の 2 つのリスナーがクライアントに公開されるということになります。
なぜ 2 つ必要かというとローカルと Docker ネットワーク内(kafka-ui、zookeeper、他の broker)からの両方からのアクセスができなければならないからです。
ローカルからは localhost でアクセスできますが、例えば kafka-ui コンテナからは localhost では他のコンテナにある broker との通信ができません。他のコンテナからアクセスしたい場合は kafka0 などコンテナ名をホスト名に指定してアクセスする必要があります。
これが advertised.listeners を複数設定している理由だと理解しています。
advertised.listeners は個人的には理解が難しかったです。以下のページなどが参考になるかもしれません。
KAFKA_INTER_BROKER_LISTENER_NAME
Kafka はブローカー同士でも通信するため、Docker ネットワーク内の他のブローカーにどのリスナーに向けて通信すれば良いかを伝えます。
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
Kafka が内部的に利用しているトピックである offsets topic の replication factor (どの程度冗長化するか)の設定です。ブローカー数以下である必要があるため、1にしています。
kafka-ui
kafka-uiとは Kafka を Web UI で確認、操作(トピック作成、メッセージ送受信、メッセージ削除など)できるツールです。
環境変数による設定は
に記載されています。
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS
Kafka クラスターにどのアドレスでアクセスすればよいかを設定します。
動作確認
Kafka へのメッセージ送受信をできるkafkacatを使って動作確認してみます。
kafkacat は Debian,Ubuntu では
apt install kafkacat
でインストールできますが、Mac では
brew install kcat
でインストールできるようです。私は Ubuntu で動作確認しましたが、Mac の場合は以下の記述のkafkacat
をkcat
に置き換える必要があると思います。
メッセージ送信
kafkacat -b localhost:9092 -t new_topic -P
このコマンドで対話モードになるため、適当な文字列を入力して Enter するとメッセージが送信されます。なお、Ctrl+D で抜けます。
-b
で Bootstrap broker を指定します。Kafka クライアントが Kafka クライアントと通信するときに最初にアクセスするサーバーです。ブローカーのうちの 1 つを指定すればよいです。つまり、
kafkacat -b localhost:9093 -t new_topic -P
としても良いです。Kafka と通信する際に実際に接続するブローカーは最初の接続時のブローカーとは変わる可能性もあります。実際の Kafka クラスターにおいてブローカーは多数あることもありますが、それを全て把握、列挙しなくてもどれがを指定すればよしなに Kafka との通信ができることになります。
また、-t
でトピックを指定します。デフォルトの設定では事前にトピックを作成しなくても自動で作成されます。
-P
で kafkacat をプロデューサーモードで起動します。
メッセージが Kafka に送信できたことはhttp://localhost:8080
から kafka-ui にアクセスすると下の画像のように確認できます。
メッセージ受信
画面でもメッセージを確認できましたが、kafkacat の場合は、
kafkacat -b localhost:9092 -t new_topic -C
でメッセージを受信できます。
-C
でコンシューマーモードを指定します。
これでトピックのメッセージを待ち受けることができます。