著者: 伊藤 雅博, 株式会社日立製作所
はじめに
本稿ではKafkaクラスタとKafka Streamsの具体的な構築手順を紹介します。コンテナ管理ツールであるPodmanと、Apache Kafkaのコンテナイメージを使用して、Kafkaクラスタの構築とStreamsアプリケーションの動作確認を行います。
なお、Kafka Streamsの概要やチューニングの考え方については、以前の記事をご参照ください。
記事一覧:
- Kafka_Streamsの概要とユースケース
- Kafka Streamsのアーキテクチャとチューニングポイント
- Kafkaクラスタを構築してKafka Streamsを試す(Podmanコンテナ環境編)(本稿)
Kafka構築の選択肢
オープンソースのApache Kafkaには、構築環境によって以下のような選択肢があります。
- ベアメタル・仮想化環境
-
Apache Kafka - Tarball
- Apache Software Foundationが提供する公式のバイナリ
-
Apache Kafka - Tarball
- コンテナ環境(Docker、Podmanなど)
-
Apache Kafka - コンテナイメージ
- Apache Software Foundationが提供する公式のコンテナイメージ
- v3.7.0から提供開始
-
Apache Kafka - コンテナイメージ
- Kubernetes環境
-
Strimzi
- Apache KafkaをKubernetes上にデプロイするためのOperatorなどを提供する、Cloud Native Computing Foundation (CNCF)のオープンソースプロジェクト
-
Strimzi
本稿ではコンテナ管理ツールであるPodman上で、Apache Kafkaのコンテナイメージを利用します。
システム構成
3台構成のKafka Brokerクラスタと、2台構成のKafka Streamsアプリケーションを構築します。Podmanのコンテナ環境なので1台のサーバ上に構築することも可能ですが、今回は大規模な環境でのスケールアウトも想定し、複数台のサーバ上に構築してみます。
今回用意したサーバのスペックを以下に示します。サーバ間はホスト名で名前解決できる状態とします。
ホスト名 | CPUコア数 | メモリ容量 | ディスク容量 |
---|---|---|---|
broker00 | 4core | 8GB | 80GB |
broker01 | 4core | 8GB | 80GB |
broker02 | 4core | 8GB | 80GB |
streams00 | 2core | 8GB | 80GB |
streams01 | 2core | 8GB | 80GB |
OSとPodmanは以下のバーションをインストール済みとします。
- OS:
Red Hat Enterprise Linux release 8.9
- Podman:
v4.6.1
Apache Kafkaのコンテナイメージは以下を使用します。
-
Docker Hub - apache/kafka:
v3.7.0
本検証は動作確認が目的のため、以下の状態で実施します。もし本番環境で構築する場合は、適切なセキュリティ設定を行ってください。
- OSユーザ:
root
- SELinux:
Disabled
Kafka Brokerクラスタの構築
3台のサーバで、以下の図に示す構成のKafka Brokerクラスタを構築します。
参考ドキュメント:
事前準備
各ホストでコンテナを実行する準備を行います。
コンテナは削除すると全てのデータと設定が失われるため、データ保存用の永続ボリュームをコンテナにマウントする必要があります。今回はホスト上にKafkaのデータを保存するディレクトリを作成し、それをコンテナにマウントしてみます。
# Kafka Broker用のデータディレクトリを作成
mkdir -p /data/broker
# コンテナから書き込めるように権限変更
chmod 777 /data/broker
また、DockerHubからApache Kafkaのコンテナイメージを取得しておきます。
# DockerHubからコンテナイメージを取得
podman pull docker.io/apache/kafka:3.7.0
コンテナの起動
各ホストでKafka Broker用のコンテナを起動して、Kafka Brokerクラスタを構成します。kafka Brokerの動作モードには、ZooKeeperを使用するモードと、ZooKeeperが不要なKRaftモードの2種類が存在しますが、今回はKRaftモードで構築します。
KRaftモードで起動する際には、1台目のBrokerコンテナを起動してから1分以内に2台目のコンテナを起動してください。1台のみ起動した状態で1分以上経過すると、過半数を保てないためKRaftコントローラーがタイムアウトしてBrokerが停止します。
broker00の起動
broker00
のホストでKafka Brokerのコンテナを起動します。
Kafkaの各種設定はコンテナ起動時の環境変数で指定します。
主な設定項目:
- サーバ間の通信に必要なポートを開放
- PodmanのPort forwardでコンテナの9092, 9093, 9094ポートを、ローカルホストの9092, 9093, 9094ポートにマッピング
- ローカルディレクトリの
/data/broker
を、コンテナの/data
にマウント
# Kafka Brokerコンテナを起動
podman run -d \
--name kafka-server \
-p 9092:9092 \
-p 9093:9093 \
-p 9094:9094 \
-v /data/broker:/data \
-e KAFKA_LOG_DIRS=/data \
-e KAFKA_NODE_ID=0 \
-e KAFKA_ADVERTISED_HOST_NAME=broker00 \
-e KAFKA_PROCESS_ROLES=controller,broker \
-e KAFKA_CONTROLLER_QUORUM_VOTERS=0@broker00:9093,1@broker01:9093,2@broker02:9093 \
-e KAFKA_LISTENERS=INTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093,EXTERNAL://0.0.0.0:9094 \
-e KAFKA_ADVERTISED_LISTENERS=INTERNAL://broker00:9092,EXTERNAL://broker00:9094 \
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT \
-e KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL \
-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e CLUSTER_ID=cluster-00 \
-e KAFKA_REPLICATION_FACTOR=3 \
-e KAFKA_MIN_INSYNC_REPLICAS=2 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=3 \
-e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3 \
-e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=2 \
-e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=3000 \
-e KAFKA_HEAP_OPTS="-Xmx4g -Xms4g" \
-e KAFKA_CONTROLLER_QUORUM_ELECTION_BACKOFF_MAX_MS=10000 \
docker.io/apache/kafka:3.7.0
参考ドキュメント:
- Kafka Docker Image Usage Guide - Multi Node Cluster
- kafka/docker/examples/jvm/cluster/combined/plaintext/docker-compose.yml
broker01, broker02の起動
続いて、broker01
とbroker02
のホストでKafka Brokerのコンテナを起動します。
broker00
の設定をベースに、下記設定のIDとホスト名を書き換えて起動します。
-
-e KAFKA_CFG_NODE_ID=0
- Brokerを示す一意なID
-
-e KAFKA_ADVERTISED_HOST_NAME=broker00
- クライアントからアクセスする際のBrokerのホスト名
-
-e KAFKA_ADVERTISED_LISTENERS=INTERNAL://broker00:9092,EXTERNAL://broker00:9094
- クライアントからアクセスする際のBrokerのリスナ設定
broker01の起動:
# Kafka Brokerコンテナを起動
podman run -d \
--name kafka-server \
-p 9092:9092 \
-p 9093:9093 \
-p 9094:9094 \
-v /data/broker:/data \
-e KAFKA_LOG_DIRS=/data \
-e KAFKA_NODE_ID=1 \
-e KAFKA_ADVERTISED_HOST_NAME=broker01 \
-e KAFKA_PROCESS_ROLES=controller,broker \
-e KAFKA_CONTROLLER_QUORUM_VOTERS=0@broker00:9093,1@broker01:9093,2@broker02:9093 \
-e KAFKA_LISTENERS=INTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093,EXTERNAL://0.0.0.0:9094 \
-e KAFKA_ADVERTISED_LISTENERS=INTERNAL://broker01:9092,EXTERNAL://broker01:9094 \
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT \
-e KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL \
-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e CLUSTER_ID=cluster-00 \
-e KAFKA_REPLICATION_FACTOR=3 \
-e KAFKA_MIN_INSYNC_REPLICAS=2 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=3 \
-e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3 \
-e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=2 \
-e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=3000 \
-e KAFKA_HEAP_OPTS="-Xmx4g -Xms4g" \
-e KAFKA_CONTROLLER_QUORUM_ELECTION_BACKOFF_MAX_MS=10000 \
docker.io/apache/kafka:3.7.0
broker02の起動:
# Kafka Brokerコンテナを起動
podman run -d \
--name kafka-server \
-p 9092:9092 \
-p 9093:9093 \
-p 9094:9094 \
-v /data/broker:/data \
-e KAFKA_LOG_DIRS=/data \
-e KAFKA_NODE_ID=2 \
-e KAFKA_ADVERTISED_HOST_NAME=broker02 \
-e KAFKA_PROCESS_ROLES=controller,broker \
-e KAFKA_CONTROLLER_QUORUM_VOTERS=0@broker00:9093,1@broker01:9093,2@broker02:9093 \
-e KAFKA_LISTENERS=INTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093,EXTERNAL://0.0.0.0:9094 \
-e KAFKA_ADVERTISED_LISTENERS=INTERNAL://broker02:9092,EXTERNAL://broker02:9094 \
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT \
-e KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL \
-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e CLUSTER_ID=cluster-00 \
-e KAFKA_REPLICATION_FACTOR=3 \
-e KAFKA_MIN_INSYNC_REPLICAS=2 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=3 \
-e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3 \
-e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=2 \
-e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=3000 \
-e KAFKA_HEAP_OPTS="-Xmx4g -Xms4g" \
-e KAFKA_CONTROLLER_QUORUM_ELECTION_BACKOFF_MAX_MS=10000 \
docker.io/apache/kafka:3.7.0
Brokerの確認起動
コンテナを起動して30秒以上経過してから、コンテナの状態(STATUS)を確認します。コンテナが起動(Up)していればOKです。
# コンテナ一覧を表示: STATUSがUpであれば起動している
podman ps
## CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
## 4515dda315df docker.io/apache/kafka:3.7.0 /etc/kafka/docker... 47 seconds ago Up 47 seconds 0.0.0.0:9092-9094->9092-9094/tcp kafka-server
# 問題が発生した場合はログを確認
podman logs kafka-server
(参考)Brokerの設定を変更する場合は、一旦コンテナを削除してから起動してください。
# 既に起動中のコンテナを削除
podman rm -f kafka-server
Kafka Streamsアプリケーションの実行
Kafka StreamsのサンプルアプリケーションであるWordCount(単語の出現回数カウント)を実行してみます。以下の図に示す構成のStreamアプリケーションを実行します。
WordCountアプリケーションについて
WordCountは、メッセージの英文に含まれる各単語の出現回数を数える処理です。以下のように、入力メッセージの文字列の単語に分割して、これまでの各単語の出現回数を出力します。
事前準備
Kafka Streamsの設定ファイルとデータ保存用の永続ボリューム(ディレクトリ)を作成して、コンテナにマウントします。
まずはKafka Streamsの設定ファイルとState Storeを配置するローカルディレクトリを作成します。
# 設定ファイル配置用のディレクトリを作成
mkdir -p /data/streams/config
# State Store配置用のディレクトリを作成
mkdir -p /data/streams/state
# コンテナから書き込めるように権限変更
chmod 777 /data/streams/state
次にKafka Streamsの設定ファイルを作成して配置します。インスタンスのスレッド数、Record Cacheのメモリ量、State Storeのスタンバイレプリカの個数はデフォルト値よりも増やします。これらのチューニングの意味については前回の記事(Kafka Streamsのアーキテクチャとチューニングポイント)をご確認ください。
/data/streams/config/streams.properties
:
# 初期接続先のBroker一覧
bootstrap.servers=broker00:9094,broker01:9094,broker02:9094
# アプリケーションID(ConsumerグループID)。同じIDで起動したインスタンスはグループ化される。
application.id=streams-wordcount
# インスタンスのスレッド数: 初期設定: `1`
num.stream.threads=2
# State Storeのディレクトリ: 初期設定: `/tmp/kafka-streams`
# コンテナには/stateをマウントして、/state/storeはStreams側で作成させる(後から権限変更するため)
state.dir=/state/store
# Record Cacheのメモリ量: 初期設定: `10485760`(10MB)
statestore.cache.max.bytes=104857600
# State Storeのスタンバイレプリカの個数: 初期設定: `0`
num.standby.replicas=1
また、DockerHubからApache Kafkaのコンテナイメージを取得しておきます。
# DockerHubからコンテナイメージを取得
podman pull docker.io/apache/kafka:3.7.0
Topicの準備
WordCountアプリケーションのデータ入力元と出力先のTopicを作成します。Topic名は以下の通りです。
- 入力元Topic名:
streams-plaintext-input
- 出力先Topic名:
streams-wordcount-output
両TopicともPartition数は3、複製数も3に指定します。
# コンテナを起動してログイン
podman run -it docker.io/apache/kafka:3.7.0 /bin/bash
# 入力元Topicを作成
/opt/kafka/bin/kafka-topics.sh \
--bootstrap-server broker00:9094,broker01:9094,broker02:9094 \
--create \
--topic streams-plaintext-input \
--replication-factor 3 \
--partitions 3
# 出力先Topicを作成
/opt/kafka/bin/kafka-topics.sh \
--bootstrap-server broker00:9094,broker01:9094,broker02:9094 \
--create \
--topic streams-wordcount-output \
--replication-factor 3 \
--partitions 3 \
--config cleanup.policy=compact
# 入力元Topicを確認
/opt/kafka/bin/kafka-topics.sh \
--bootstrap-server broker00:9094,broker01:9094,broker02:9094 \
--describe --topic streams-plaintext-input
## Topic: streams-plaintext-input TopicId: yViEPXN5SWeizTeIrJTjEw PartitionCount: 3 ReplicationFactor: 3 Configs: min.insync.replicas=2
## Topic: streams-plaintext-input Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,3,2
## Topic: streams-plaintext-input Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
## Topic: streams-plaintext-input Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,2,1
# 出力先Topicを確認
/opt/kafka/bin/kafka-topics.sh \
--bootstrap-server broker00:9094,broker01:9094,broker02:9094 \
--describe --topic streams-wordcount-output
## Topic: streams-wordcount-output TopicId: g_bUeCc7TEeHDbzRYevnSQPartitionCount: 3 ReplicationFactor: 3 Configs: min.insync.replicas=2,cleanup.policy=compact
## Topic: streams-wordcount-output Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,2,1
## Topic: streams-wordcount-output Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
## Topic: streams-wordcount-output Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
# コンテナからログアウト
exit
1個目のインスタンスを起動
WordCountアプリケーションを起動
WordCountアプリケーションのインスタンスをホストstreams00
で起動します。
# Kafka Streamsのコンテナを起動してログイン
podman run -it --rm \
--name=kafka-streams \
-v /data/streams/config:/config \
-v /data/streams/state:/state \
docker.io/apache/kafka:3.7.0 /bin/bash
# WordCountアプリケーションを起動
/opt/kafka/bin/kafka-run-class.sh \
org.apache.kafka.streams.examples.wordcount.WordCountDemo \
/config/streams.properties
ターミナルは起動したままにします。
Consumerグループの確認
別のターミナルから、Kafka StreamsのConsumerグループの構成を確認します。
# コンテナを起動してログイン
podman run -it docker.io/apache/kafka:3.7.0 /bin/bash
# Consumerグループの構成を確認
/opt/kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server broker00:9094,broker01:9094,broker02:9094 \
--describe --group streams-wordcount --members
Consumerグループは1Consumer * 2スレッドで構成されていることが確認できます。
GROUP CONSUMER-ID HOST CLIENT-ID #PARTITIONS
streams-wordcount streams-wordcount-1b896f06-609c-49c5-893f-e88f84859778-StreamThread-1-consumer-e6e5e41b-a094-4270-9a10-af3a4a6ed6da /10.197.214.132 streams-wordcount-1b896f06-609c-49c5-893f-e88f84859778-StreamThread-1-consumer 3
streams-wordcount streams-wordcount-1b896f06-609c-49c5-893f-e88f84859778-StreamThread-2-consumer-548f8bfa-77c6-4635-b3bd-9a39ae6f4906 /10.197.214.132 streams-wordcount-1b896f06-609c-49c5-893f-e88f84859778-StreamThread-2-consumer 3
WordCountの動作確認
メッセージを入力
別のターミナルでコンソールProducerを起動して、Wordcountアプリケーションにメッセージを入力します。1行が1メッセージ(レコード)として送信されます。
# コンテナを起動してログイン
podman run -it docker.io/apache/kafka:3.7.0 /bin/bash
# 入力元Topicにメッセージを書き込み
/opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server broker00:9094,broker01:9094,broker02:9094 \
--topic streams-plaintext-input
適当な文章を入力します:
This is a pen
処理結果を参照
別のターミナルでコンソールConsumerを起動して、Wordcountアプリケーションの処理結果を参照します。
# コンテナを起動してログイン
podman run -it docker.io/apache/kafka:3.7.0 /bin/bash
# 出力先Topicからメッセージを読み出し
/opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server broker00:9094,broker01:9094,broker02:9094 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
単語の出現回数が出力されます:
this 1
is 1
a 1
pen 1
続けてコンソールProducerに何らかのメッセージを入力すると、処理結果がコンソールConsumerに出力されます。
2個目のインスタンスを起動してスケールアウト
ホストstreams01
で2つ目のインスタンスを起動して、WordCountアプリケーションをスケールアウトしてみます。同じapplication.id
で複数のインスタンスを起動すると、自動的にConsumerグループ化されてスケールアウトします。
# Kafka Streamsのコンテナを起動してログイン
podman run -it --rm \
--name=kafka-streams \
-v /data/streams/config:/config:Z \
-v /data/streams/state:/state:Z \
docker.io/apache/kafka:3.7.0 /bin/bash
# WordCountアプリケーションを起動
/opt/kafka/bin/kafka-run-class.sh \
org.apache.kafka.streams.examples.wordcount.WordCountDemo \
/config/streams.properties
別のターミナルでConsumerグループの詳細を確認します。インスタンスを起動してしばらくすると、リバランスが実行されます。
# コンテナを起動してログイン
podman run -it docker.io/apache/kafka:3.7.0 /bin/bash
# Consumerグループの構成を確認
/opt/kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server broker00:9094,broker01:9094,broker02:9094 \
--describe --group streams-wordcount --members
Consumerグループは2Consumer * 2スレッドで構成されていることが確認できます。
GROUP CONSUMER-ID HOST CLIENT-ID #PARTITIONS
streams-wordcount streams-wordcount-1b896f06-609c-49c5-893f-e88f84859778-StreamThread-1-consumer-e6e5e41b-a094-4270-9a10-af3a4a6ed6da /10.197.214.132 streams-wordcount-1b896f06-609c-49c5-893f-e88f84859778-StreamThread-1-consumer 2
streams-wordcount streams-wordcount-1b896f06-609c-49c5-893f-e88f84859778-StreamThread-2-consumer-548f8bfa-77c6-4635-b3bd-9a39ae6f4906 /10.197.214.132 streams-wordcount-1b896f06-609c-49c5-893f-e88f84859778-StreamThread-2-consumer 1
streams-wordcount streams-wordcount-fe277093-e154-479f-aeb6-bab0eb5ad740-StreamThread-1-consumer-19a088ad-1944-42b4-9756-f9447bce54f6 /10.197.214.134 streams-wordcount-fe277093-e154-479f-aeb6-bab0eb5ad740-StreamThread-1-consumer 1
streams-wordcount streams-wordcount-fe277093-e154-479f-aeb6-bab0eb5ad740-StreamThread-2-consumer-61069326-f842-4771-bd62-752cf7eb9a73 /10.197.214.134 streams-wordcount-fe277093-e154-479f-aeb6-bab0eb5ad740-StreamThread-2-consumer 2
1インスタンスにスケールイン
起動中のインスタンスを1つ停止して、スケールイン(フォールトトレランス)の動作を確認してみます。ホストstreams00
で起動中のインスタンスのターミナルセッションでCtrl-C
を入力し、インスタンスが停止して10秒ほど経過するとリバランスが実行されます。
# コンテナを起動してログイン
podman run -it docker.io/apache/kafka:3.7.0 /bin/bash
# Consumerグループの構成を確認
/opt/kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server broker00:9094,broker01:9094,broker02:9094 \
--describe --group streams-wordcount --members
Consumerグループは1Consumer * 2スレッドで構成されていることが確認できます。
GROUP CONSUMER-ID HOST CLIENT-ID #PARTITIONS
streams-wordcount streams-wordcount-fe277093-e154-479f-aeb6-bab0eb5ad740-StreamThread-1-consumer-19a088ad-1944-42b4-9756-f9447bce54f6 /10.197.214.134 streams-wordcount-fe277093-e154-479f-aeb6-bab0eb5ad740-StreamThread-1-consumer 3
streams-wordcount streams-wordcount-fe277093-e154-479f-aeb6-bab0eb5ad740-StreamThread-2-consumer-61069326-f842-4771-bd62-752cf7eb9a73 /10.197.214.134 streams-wordcount-fe277093-e154-479f-aeb6-bab0eb5ad740-StreamThread-2-consumer 3
おわりに
本稿ではApache Kafkaのコンテナイメージを活用してKafka Brokerクラスタを構築し、Kafka Streamsのサンプルアプリケーションを実行する方法を紹介しました。またStreamsアプリケーションのスケールアウトとスケールイン(フォールトトレランス)の動作を確認しました。