2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Kafkaクラスタを構築してKafka Streamsを試す(Podmanコンテナ環境編)

Last updated at Posted at 2024-07-24

著者: 伊藤 雅博, 株式会社日立製作所

はじめに

本稿ではKafkaクラスタとKafka Streamsの具体的な構築手順を紹介します。コンテナ管理ツールであるPodmanと、Apache Kafkaのコンテナイメージを使用して、Kafkaクラスタの構築とStreamsアプリケーションの動作確認を行います。

なお、Kafka Streamsの概要やチューニングの考え方については、以前の記事をご参照ください。

記事一覧:

  1. Kafka_Streamsの概要とユースケース
  2. Kafka Streamsのアーキテクチャとチューニングポイント
  3. Kafkaクラスタを構築してKafka Streamsを試す(Podmanコンテナ環境編)(本稿)

Kafka構築の選択肢

オープンソースのApache Kafkaには、構築環境によって以下のような選択肢があります。

  • ベアメタル・仮想化環境
  • コンテナ環境(Docker、Podmanなど)
  • Kubernetes環境
    • Strimzi
      • Apache KafkaをKubernetes上にデプロイするためのOperatorなどを提供する、Cloud Native Computing Foundation (CNCF)のオープンソースプロジェクト

本稿ではコンテナ管理ツールであるPodman上で、Apache Kafkaのコンテナイメージを利用します。

システム構成

3台構成のKafka Brokerクラスタと、2台構成のKafka Streamsアプリケーションを構築します。Podmanのコンテナ環境なので1台のサーバ上に構築することも可能ですが、今回は大規模な環境でのスケールアウトも想定し、複数台のサーバ上に構築してみます。

podman_kafka.png

今回用意したサーバのスペックを以下に示します。サーバ間はホスト名で名前解決できる状態とします。

ホスト名 CPUコア数 メモリ容量 ディスク容量
broker00 4core 8GB 80GB
broker01 4core 8GB 80GB
broker02 4core 8GB 80GB
streams00 2core 8GB 80GB
streams01 2core 8GB 80GB

OSとPodmanは以下のバーションをインストール済みとします。

  • OS: Red Hat Enterprise Linux release 8.9
  • Podman: v4.6.1

Apache Kafkaのコンテナイメージは以下を使用します。

本検証は動作確認が目的のため、以下の状態で実施します。もし本番環境で構築する場合は、適切なセキュリティ設定を行ってください。

  • OSユーザ: root
  • SELinux: Disabled

Kafka Brokerクラスタの構築

3台のサーバで、以下の図に示す構成のKafka Brokerクラスタを構築します。

podman_kafka_brokers.png

参考ドキュメント:

事前準備

各ホストでコンテナを実行する準備を行います。

コンテナは削除すると全てのデータと設定が失われるため、データ保存用の永続ボリュームをコンテナにマウントする必要があります。今回はホスト上にKafkaのデータを保存するディレクトリを作成し、それをコンテナにマウントしてみます。

# Kafka Broker用のデータディレクトリを作成
mkdir -p /data/broker

# コンテナから書き込めるように権限変更
chmod 777 /data/broker

また、DockerHubからApache Kafkaのコンテナイメージを取得しておきます。

# DockerHubからコンテナイメージを取得
podman pull docker.io/apache/kafka:3.7.0

コンテナの起動

各ホストでKafka Broker用のコンテナを起動して、Kafka Brokerクラスタを構成します。kafka Brokerの動作モードには、ZooKeeperを使用するモードと、ZooKeeperが不要なKRaftモードの2種類が存在しますが、今回はKRaftモードで構築します。

KRaftモードで起動する際には、1台目のBrokerコンテナを起動してから1分以内に2台目のコンテナを起動してください。1台のみ起動した状態で1分以上経過すると、過半数を保てないためKRaftコントローラーがタイムアウトしてBrokerが停止します。

broker00の起動

broker00のホストでKafka Brokerのコンテナを起動します。

Kafkaの各種設定はコンテナ起動時の環境変数で指定します。

主な設定項目:

  • サーバ間の通信に必要なポートを開放
    • PodmanのPort forwardでコンテナの9092, 9093, 9094ポートを、ローカルホストの9092, 9093, 9094ポートにマッピング
  • ローカルディレクトリの/data/brokerを、コンテナの/dataにマウント
# Kafka Brokerコンテナを起動
podman run -d \
--name kafka-server \
-p 9092:9092 \
-p 9093:9093 \
-p 9094:9094 \
-v /data/broker:/data \
-e KAFKA_LOG_DIRS=/data \
-e KAFKA_NODE_ID=0 \
-e KAFKA_ADVERTISED_HOST_NAME=broker00 \
-e KAFKA_PROCESS_ROLES=controller,broker \
-e KAFKA_CONTROLLER_QUORUM_VOTERS=0@broker00:9093,1@broker01:9093,2@broker02:9093 \
-e KAFKA_LISTENERS=INTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093,EXTERNAL://0.0.0.0:9094 \
-e KAFKA_ADVERTISED_LISTENERS=INTERNAL://broker00:9092,EXTERNAL://broker00:9094 \
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT \
-e KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL \
-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e CLUSTER_ID=cluster-00 \
-e KAFKA_REPLICATION_FACTOR=3 \
-e KAFKA_MIN_INSYNC_REPLICAS=2 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=3 \
-e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3 \
-e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=2 \
-e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=3000 \
-e KAFKA_HEAP_OPTS="-Xmx4g -Xms4g" \
-e KAFKA_CONTROLLER_QUORUM_ELECTION_BACKOFF_MAX_MS=10000 \
docker.io/apache/kafka:3.7.0

参考ドキュメント:

broker01, broker02の起動

続いて、broker01broker02のホストでKafka Brokerのコンテナを起動します。

broker00の設定をベースに、下記設定のIDとホスト名を書き換えて起動します。

  • -e KAFKA_CFG_NODE_ID=0
    • Brokerを示す一意なID
  • -e KAFKA_ADVERTISED_HOST_NAME=broker00
    • クライアントからアクセスする際のBrokerのホスト名
  • -e KAFKA_ADVERTISED_LISTENERS=INTERNAL://broker00:9092,EXTERNAL://broker00:9094
    • クライアントからアクセスする際のBrokerのリスナ設定

broker01の起動:

# Kafka Brokerコンテナを起動
podman run -d \
--name kafka-server \
-p 9092:9092 \
-p 9093:9093 \
-p 9094:9094 \
-v /data/broker:/data \
-e KAFKA_LOG_DIRS=/data \
-e KAFKA_NODE_ID=1 \
-e KAFKA_ADVERTISED_HOST_NAME=broker01 \
-e KAFKA_PROCESS_ROLES=controller,broker \
-e KAFKA_CONTROLLER_QUORUM_VOTERS=0@broker00:9093,1@broker01:9093,2@broker02:9093 \
-e KAFKA_LISTENERS=INTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093,EXTERNAL://0.0.0.0:9094 \
-e KAFKA_ADVERTISED_LISTENERS=INTERNAL://broker01:9092,EXTERNAL://broker01:9094 \
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT \
-e KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL \
-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e CLUSTER_ID=cluster-00 \
-e KAFKA_REPLICATION_FACTOR=3 \
-e KAFKA_MIN_INSYNC_REPLICAS=2 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=3 \
-e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3 \
-e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=2 \
-e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=3000 \
-e KAFKA_HEAP_OPTS="-Xmx4g -Xms4g" \
-e KAFKA_CONTROLLER_QUORUM_ELECTION_BACKOFF_MAX_MS=10000 \
docker.io/apache/kafka:3.7.0

broker02の起動:

# Kafka Brokerコンテナを起動
podman run -d \
--name kafka-server \
-p 9092:9092 \
-p 9093:9093 \
-p 9094:9094 \
-v /data/broker:/data \
-e KAFKA_LOG_DIRS=/data \
-e KAFKA_NODE_ID=2 \
-e KAFKA_ADVERTISED_HOST_NAME=broker02 \
-e KAFKA_PROCESS_ROLES=controller,broker \
-e KAFKA_CONTROLLER_QUORUM_VOTERS=0@broker00:9093,1@broker01:9093,2@broker02:9093 \
-e KAFKA_LISTENERS=INTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093,EXTERNAL://0.0.0.0:9094 \
-e KAFKA_ADVERTISED_LISTENERS=INTERNAL://broker02:9092,EXTERNAL://broker02:9094 \
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT \
-e KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL \
-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e CLUSTER_ID=cluster-00 \
-e KAFKA_REPLICATION_FACTOR=3 \
-e KAFKA_MIN_INSYNC_REPLICAS=2 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=3 \
-e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3 \
-e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=2 \
-e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=3000 \
-e KAFKA_HEAP_OPTS="-Xmx4g -Xms4g" \
-e KAFKA_CONTROLLER_QUORUM_ELECTION_BACKOFF_MAX_MS=10000 \
docker.io/apache/kafka:3.7.0

Brokerの確認起動

コンテナを起動して30秒以上経過してから、コンテナの状態(STATUS)を確認します。コンテナが起動(Up)していればOKです。

# コンテナ一覧を表示: STATUSがUpであれば起動している
podman ps
## CONTAINER ID  IMAGE                         COMMAND               CREATED         STATUS                    PORTS                             NAMES
## 4515dda315df  docker.io/apache/kafka:3.7.0  /etc/kafka/docker...  47 seconds ago  Up 47 seconds             0.0.0.0:9092-9094->9092-9094/tcp  kafka-server

# 問題が発生した場合はログを確認
podman logs kafka-server

(参考)Brokerの設定を変更する場合は、一旦コンテナを削除してから起動してください。

# 既に起動中のコンテナを削除
podman rm -f kafka-server

Kafka Streamsアプリケーションの実行

Kafka StreamsのサンプルアプリケーションであるWordCount(単語の出現回数カウント)を実行してみます。以下の図に示す構成のStreamアプリケーションを実行します。

podman_kafka_streams.png

WordCountアプリケーションについて

WordCountは、メッセージの英文に含まれる各単語の出現回数を数える処理です。以下のように、入力メッセージの文字列の単語に分割して、これまでの各単語の出現回数を出力します。

ks_wordcount_pt.png

事前準備

Kafka Streamsの設定ファイルとデータ保存用の永続ボリューム(ディレクトリ)を作成して、コンテナにマウントします。

まずはKafka Streamsの設定ファイルとState Storeを配置するローカルディレクトリを作成します。

# 設定ファイル配置用のディレクトリを作成
mkdir -p /data/streams/config

# State Store配置用のディレクトリを作成
mkdir -p /data/streams/state

# コンテナから書き込めるように権限変更
chmod 777 /data/streams/state

次にKafka Streamsの設定ファイルを作成して配置します。インスタンスのスレッド数、Record Cacheのメモリ量、State Storeのスタンバイレプリカの個数はデフォルト値よりも増やします。これらのチューニングの意味については前回の記事(Kafka Streamsのアーキテクチャとチューニングポイント)をご確認ください。

/data/streams/config/streams.properties:

# 初期接続先のBroker一覧
bootstrap.servers=broker00:9094,broker01:9094,broker02:9094

# アプリケーションID(ConsumerグループID)。同じIDで起動したインスタンスはグループ化される。
application.id=streams-wordcount

# インスタンスのスレッド数: 初期設定: `1`
num.stream.threads=2

# State Storeのディレクトリ: 初期設定: `/tmp/kafka-streams`
# コンテナには/stateをマウントして、/state/storeはStreams側で作成させる(後から権限変更するため)
state.dir=/state/store

# Record Cacheのメモリ量: 初期設定: `10485760`(10MB)
statestore.cache.max.bytes=104857600

# State Storeのスタンバイレプリカの個数: 初期設定: `0`
num.standby.replicas=1

また、DockerHubからApache Kafkaのコンテナイメージを取得しておきます。

# DockerHubからコンテナイメージを取得
podman pull docker.io/apache/kafka:3.7.0

Topicの準備

WordCountアプリケーションのデータ入力元と出力先のTopicを作成します。Topic名は以下の通りです。

  • 入力元Topic名: streams-plaintext-input
  • 出力先Topic名: streams-wordcount-output

両TopicともPartition数は3、複製数も3に指定します。

# コンテナを起動してログイン
podman run -it docker.io/apache/kafka:3.7.0 /bin/bash

# 入力元Topicを作成
/opt/kafka/bin/kafka-topics.sh \
--bootstrap-server broker00:9094,broker01:9094,broker02:9094 \
--create \
--topic streams-plaintext-input \
--replication-factor 3 \
--partitions 3

# 出力先Topicを作成
/opt/kafka/bin/kafka-topics.sh \
--bootstrap-server broker00:9094,broker01:9094,broker02:9094 \
--create \
--topic streams-wordcount-output \
--replication-factor 3 \
--partitions 3 \
--config cleanup.policy=compact

# 入力元Topicを確認
/opt/kafka/bin/kafka-topics.sh \
--bootstrap-server broker00:9094,broker01:9094,broker02:9094 \
--describe --topic streams-plaintext-input
## Topic: streams-plaintext-input  TopicId: yViEPXN5SWeizTeIrJTjEw PartitionCount: 3       ReplicationFactor: 3    Configs: min.insync.replicas=2
##         Topic: streams-plaintext-input  Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1,3,2
##         Topic: streams-plaintext-input  Partition: 1    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1
##         Topic: streams-plaintext-input  Partition: 2    Leader: 3       Replicas: 3,1,2 Isr: 3,2,1

# 出力先Topicを確認
/opt/kafka/bin/kafka-topics.sh \
--bootstrap-server broker00:9094,broker01:9094,broker02:9094 \
--describe --topic streams-wordcount-output
## Topic: streams-wordcount-output TopicId: g_bUeCc7TEeHDbzRYevnSQPartitionCount: 3       ReplicationFactor: 3    Configs: min.insync.replicas=2,cleanup.policy=compact
##         Topic: streams-wordcount-output Partition: 0    Leader: 3      Replicas: 3,1,2 Isr: 3,2,1
##         Topic: streams-wordcount-output Partition: 1    Leader: 1      Replicas: 1,2,3 Isr: 1,2,3
##         Topic: streams-wordcount-output Partition: 2    Leader: 2      Replicas: 2,3,1 Isr: 2,3,1

# コンテナからログアウト
exit

1個目のインスタンスを起動

WordCountアプリケーションを起動

WordCountアプリケーションのインスタンスをホストstreams00で起動します。

# Kafka Streamsのコンテナを起動してログイン
podman run -it --rm \
--name=kafka-streams \
-v /data/streams/config:/config \
-v /data/streams/state:/state \
docker.io/apache/kafka:3.7.0 /bin/bash

# WordCountアプリケーションを起動
/opt/kafka/bin/kafka-run-class.sh \
org.apache.kafka.streams.examples.wordcount.WordCountDemo \
/config/streams.properties

ターミナルは起動したままにします。

Consumerグループの確認

別のターミナルから、Kafka StreamsのConsumerグループの構成を確認します。

# コンテナを起動してログイン
podman run -it docker.io/apache/kafka:3.7.0 /bin/bash

# Consumerグループの構成を確認
/opt/kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server broker00:9094,broker01:9094,broker02:9094 \
--describe --group streams-wordcount --members

Consumerグループは1Consumer * 2スレッドで構成されていることが確認できます。

GROUP             CONSUMER-ID                                                                                                         HOST            CLIENT-ID                                                                      #PARTITIONS     
streams-wordcount streams-wordcount-1b896f06-609c-49c5-893f-e88f84859778-StreamThread-1-consumer-e6e5e41b-a094-4270-9a10-af3a4a6ed6da /10.197.214.132 streams-wordcount-1b896f06-609c-49c5-893f-e88f84859778-StreamThread-1-consumer 3               
streams-wordcount streams-wordcount-1b896f06-609c-49c5-893f-e88f84859778-StreamThread-2-consumer-548f8bfa-77c6-4635-b3bd-9a39ae6f4906 /10.197.214.132 streams-wordcount-1b896f06-609c-49c5-893f-e88f84859778-StreamThread-2-consumer 3             

WordCountの動作確認

メッセージを入力

別のターミナルでコンソールProducerを起動して、Wordcountアプリケーションにメッセージを入力します。1行が1メッセージ(レコード)として送信されます。

# コンテナを起動してログイン
podman run -it docker.io/apache/kafka:3.7.0 /bin/bash

# 入力元Topicにメッセージを書き込み
/opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server broker00:9094,broker01:9094,broker02:9094 \
--topic streams-plaintext-input

適当な文章を入力します:

This is a pen

処理結果を参照

別のターミナルでコンソールConsumerを起動して、Wordcountアプリケーションの処理結果を参照します。

# コンテナを起動してログイン
podman run -it docker.io/apache/kafka:3.7.0 /bin/bash

# 出力先Topicからメッセージを読み出し
/opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server broker00:9094,broker01:9094,broker02:9094 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

単語の出現回数が出力されます:

this    1
is      1
a       1
pen     1

続けてコンソールProducerに何らかのメッセージを入力すると、処理結果がコンソールConsumerに出力されます。

2個目のインスタンスを起動してスケールアウト

ホストstreams01で2つ目のインスタンスを起動して、WordCountアプリケーションをスケールアウトしてみます。同じapplication.idで複数のインスタンスを起動すると、自動的にConsumerグループ化されてスケールアウトします。

# Kafka Streamsのコンテナを起動してログイン
podman run -it --rm \
--name=kafka-streams \
-v /data/streams/config:/config:Z \
-v /data/streams/state:/state:Z \
docker.io/apache/kafka:3.7.0 /bin/bash

# WordCountアプリケーションを起動
/opt/kafka/bin/kafka-run-class.sh \
org.apache.kafka.streams.examples.wordcount.WordCountDemo \
/config/streams.properties

別のターミナルでConsumerグループの詳細を確認します。インスタンスを起動してしばらくすると、リバランスが実行されます。

# コンテナを起動してログイン
podman run -it docker.io/apache/kafka:3.7.0 /bin/bash

# Consumerグループの構成を確認
/opt/kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server broker00:9094,broker01:9094,broker02:9094 \
--describe --group streams-wordcount --members

Consumerグループは2Consumer * 2スレッドで構成されていることが確認できます。

GROUP             CONSUMER-ID                                                                                                         HOST            CLIENT-ID                                                                      #PARTITIONS     
streams-wordcount streams-wordcount-1b896f06-609c-49c5-893f-e88f84859778-StreamThread-1-consumer-e6e5e41b-a094-4270-9a10-af3a4a6ed6da /10.197.214.132 streams-wordcount-1b896f06-609c-49c5-893f-e88f84859778-StreamThread-1-consumer 2               
streams-wordcount streams-wordcount-1b896f06-609c-49c5-893f-e88f84859778-StreamThread-2-consumer-548f8bfa-77c6-4635-b3bd-9a39ae6f4906 /10.197.214.132 streams-wordcount-1b896f06-609c-49c5-893f-e88f84859778-StreamThread-2-consumer 1               
streams-wordcount streams-wordcount-fe277093-e154-479f-aeb6-bab0eb5ad740-StreamThread-1-consumer-19a088ad-1944-42b4-9756-f9447bce54f6 /10.197.214.134 streams-wordcount-fe277093-e154-479f-aeb6-bab0eb5ad740-StreamThread-1-consumer 1               
streams-wordcount streams-wordcount-fe277093-e154-479f-aeb6-bab0eb5ad740-StreamThread-2-consumer-61069326-f842-4771-bd62-752cf7eb9a73 /10.197.214.134 streams-wordcount-fe277093-e154-479f-aeb6-bab0eb5ad740-StreamThread-2-consumer 2  

1インスタンスにスケールイン

起動中のインスタンスを1つ停止して、スケールイン(フォールトトレランス)の動作を確認してみます。ホストstreams00で起動中のインスタンスのターミナルセッションでCtrl-Cを入力し、インスタンスが停止して10秒ほど経過するとリバランスが実行されます。

# コンテナを起動してログイン
podman run -it docker.io/apache/kafka:3.7.0 /bin/bash

# Consumerグループの構成を確認
/opt/kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server broker00:9094,broker01:9094,broker02:9094 \
--describe --group streams-wordcount --members

Consumerグループは1Consumer * 2スレッドで構成されていることが確認できます。

GROUP             CONSUMER-ID                                                                                                         HOST            CLIENT-ID                                                                      #PARTITIONS     
streams-wordcount streams-wordcount-fe277093-e154-479f-aeb6-bab0eb5ad740-StreamThread-1-consumer-19a088ad-1944-42b4-9756-f9447bce54f6 /10.197.214.134 streams-wordcount-fe277093-e154-479f-aeb6-bab0eb5ad740-StreamThread-1-consumer 3               
streams-wordcount streams-wordcount-fe277093-e154-479f-aeb6-bab0eb5ad740-StreamThread-2-consumer-61069326-f842-4771-bd62-752cf7eb9a73 /10.197.214.134 streams-wordcount-fe277093-e154-479f-aeb6-bab0eb5ad740-StreamThread-2-consumer 3   

おわりに

本稿ではApache Kafkaのコンテナイメージを活用してKafka Brokerクラスタを構築し、Kafka Streamsのサンプルアプリケーションを実行する方法を紹介しました。またStreamsアプリケーションのスケールアウトとスケールイン(フォールトトレランス)の動作を確認しました。

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?