この記事は SRA Advent Calendar 2024 の12月24日の記事です。
本記事においてはApache Kafka 3.8.0におけるコードを参照しています。
Apache Kafka について
Apache Kafka(以下、Kafka)はストリーミング処理に欠かせない重要なOSSです。Kafkaを利用することにより高可用性、スケーラブル、低レイテンシといった様々なメリットを得ることができます。
並列分散処理の環境を構築するためには複数のサーバを用意したり、環境設定用のPuppet、Ansibleを準備したり、といった作業を想定される方もいらっしゃるかと思います。しかし、Kafka には動作を簡単に確認できるDocker環境などが用意されており、テスト用に試すのであれば意外と簡単に環境を立てることも可能です。
また、性能測定のためにProducerとConsumerをそれぞれ用意する必要がありますが、テスト用のツールについてもコミュニティ提供のものがいくつか存在しています。
本記事においては、Kafka に興味のある方に向けて、Docker環境を用いて簡単にKafkaを試すための手順についてご説明いたします。
Kafka の Docker イメージ
Kafkaでは3.7.0のリリース以降、コミュニティからKafkaのDocker イメージが提供されています。
このイメージを利用することで、簡単にBrokerを起動することができます。
docker run -d --name broker apache/kafka:latest
また、最近ではKafkaの中でもGraalVMベースで開発されたKafka-native というイメージが登場しています。
こちらについては本記事では詳しく取り上げませんが、GraalVM ベースの Kafkaではさらに軽量で高速に起動することができます。
興味のある方はぜひKIP-974をご確認ください。
Docker Compose ファイルについて
Docker Compose ファイルについても、コミュニティ付帯のものを利用することが可能です。
Kafka のリポジトリには docker/examples というディレクトリがあり、その中は以下のとおりのディレクトリ構造をしています。このディレクトリのなかに docker-compose.yml がそれぞれ用意されており、目的に応じて使い分けることが可能です。
├── examples
│ ├── docker-compose-files
│ │ ├── cluster
│ │ │ ├── combined
│ │ │ │ ├── plaintext
│ │ │ │ └── ssl
│ │ │ └── isolated
│ │ │ ├── plaintext
│ │ │ └── ssl
│ │ └── single-node
│ │ ├── file-input
│ │ ├── plaintext
│ │ └── ssl
各ディレクトリの中について少し見てみましょう。
まず、1ノードで起動するモード(single-node)とクラスタで起動するモード(cluster)があります。さらに、クラスタの中ではControllerとBrokerが別のコンテナで動作するモード(isolated)と同じコンテナで動作するモード(combined)にさらに分かれています。それぞれのパターンに対し、Kafka内での通信が暗号化されているモード(ssl)と平文で通信するモード(plaintext)が用意されている、といった構造になっています。
一例としてComposeファイルの中身を見てみます。
今回は Compose ファイルの中から、cluster -> isolate -> plaintext のものを確認します。
controller-1:
image: ${IMAGE}
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: 'controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller-1:9093,2@controller-2:9093,3@controller-3:9093'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
...
kafka-1:
image: ${IMAGE}
ports:
- 29092:9092
hostname: kafka-1
container_name: kafka-1
environment:
KAFKA_NODE_ID: 4
KAFKA_PROCESS_ROLES: 'broker'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller-1:9093,2@controller-2:9093,3@controller-3:9093'
KAFKA_LISTENERS: 'PLAINTEXT://:19092,PLAINTEXT_HOST://:9092'
...
利用するDockerイメージはIMAGE
変数によって指定することが可能です。バージョンを切り替えたい場合など、簡単に利用するイメージを変更することができます。
また、各ノード設定内容はenvironment
内の変数でそれぞれ定義されています。
controller では KAFKA_PROCESS_ROLES
としてcontroller
、Brokerではbroker
がそれぞれ定義されることにより、各ノード上での動作を切り替えています。(なお、combinedモードのクラスタを利用する場合には、broker,controller
が設定されます)
Brokerが動作するノード上では9092
番ポートがそれぞれフォワードされています。cluster環境では3台のBrokerが起動し、ローカル上の29092, 39092, 49092
にアクセスすることで各Brokerに接続をすることが可能です。
実際に動かしてみる
Kafka クラスタの起動
では、実際にKafkaクラスタを動作させてみましょう。
Compose ファイルを使って Kafka クラスタを起動するには、以下のようにIMAGEにdockerイメージのタグを指定します。
今回はComposeファイルの中から、前節で中身を参照した cluster -> isolate -> plaintext のものを選択して利用します。
$ IMAGE=apache/kafka:latest docker compose -f docker/examples/docker-compose-files/cluster/isolated/plaintext/docker-compose.yml up -d
[+] Running 6/6
✔ Container plaintext-controller-3-1 Started 0.7s
✔ Container plaintext-controller-2-1 Started 0.7s
✔ Container plaintext-controller-1-1 Started 0.7s
✔ Container kafka-3 Started 1.4s
✔ Container kafka-1 Started 1.3s
✔ Container kafka-2 Started 1.4s
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
f6c59323b6dc apache/kafka:latest "/__cacert_entrypoin…" 4 seconds ago Up 3 seconds 0.0.0.0:39092->9092/tcp, [::]:39092->9092/tcp kafka-2
9c8aea6a29e5 apache/kafka:latest "/__cacert_entrypoin…" 4 seconds ago Up 3 seconds 0.0.0.0:29092->9092/tcp, [::]:29092->9092/tcp kafka-1
22124b348942 apache/kafka:latest "/__cacert_entrypoin…" 4 seconds ago Up 3 seconds 0.0.0.0:49092->9092/tcp, [::]:49092->9092/tcp kafka-3
fceba9972dca apache/kafka:latest "/__cacert_entrypoin…" 4 seconds ago Up 4 seconds 9092/tcp plaintext-controller-2-1
9f097d2dffe7 apache/kafka:latest "/__cacert_entrypoin…" 4 seconds ago Up 4 seconds 9092/tcp plaintext-controller-3-1
3e326f5b2031 apache/kafka:latest "/__cacert_entrypoin…" 4 seconds ago Up 4 seconds 9092/tcp plaintext-controller-1-1
メッセージの送受信
起動したKafkaクラスタに対してデータの送受信を行ってみます。
自作のアプリケーションを用いてもよいですが、今回は簡単にkafka-console-producer.sh, kafka-console-consumer.sh
を利用します。
事前に、送受信用のトピックを作成しておきましょう。
$ bin/kafka-topics.sh --bootstrap-server localhost:29092,localhost:39092,localhost:49092 --create --topic testTopic --partitions 3
...
Created topic testTopic.
produce, consume のテストのためには別途コンソールを開いておき、
それぞれのコンソールで以下のコマンドを実行します。
$ ./bin/kafka-console-producer.sh --bootstrap-server localhost:29092,localhost:39092,localhost:49092 --topic testTopic
> hoge # プロンプトが表示されたら入力
./bin/kafka-console-consumer.sh --bootstrap-server localhost:29092,localhost:39092,localhost:49092 --topic testTopic --from-beginning
hoge # コンソール1で入力すると表示される
デバッガの接続
Kafka内部での細かい動作を確認したい場合、動作中のプロセスに対してデバッガを接続したくなることがあります。
コミュニティのComposeファイルそのままではデバッガ用のポートは解放されていませんが、設定を追加することでデバッガをアタッチすることも可能です。
Kafkaにおいてはdaemonの起動など、スクリプトによる操作時にkafka-run-class.sh
が利用されており、kafka-run-class.sh
では環境変数に応じて起動時のオプションが設定されるようになっています。
# Set Debug options if enabled
if [ "x$KAFKA_DEBUG" != "x" ]; then
# Use default ports
DEFAULT_JAVA_DEBUG_PORT="5005"
if [ -z "$JAVA_DEBUG_PORT" ]; then
JAVA_DEBUG_PORT="$DEFAULT_JAVA_DEBUG_PORT"
fi
# Use the defaults if JAVA_DEBUG_OPTS was not set
DEFAULT_JAVA_DEBUG_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=${DEBUG_SUSPEND_FLAG:-n},address=$JAVA_DEBUG_PORT"
if [ -z "$JAVA_DEBUG_OPTS" ]; then
JAVA_DEBUG_OPTS="$DEFAULT_JAVA_DEBUG_OPTS"
fi
echo "Enabling Java debug options: $JAVA_DEBUG_OPTS"
KAFKA_OPTS="$JAVA_DEBUG_OPTS $KAFKA_OPTS"
fi
そのため、以下のようにComposeファイルに記載を追加することで、ローカルの25005
からBrokerプロセスにデバッガを接続することができます。
kafka-1:
image: ${IMAGE}
ports:
- 29092:9092
+ - 25005:5005
environment:
+ KAFKA_DEBUG: 'y'
+ JAVA_DEBUG_PORT: '*:5005'
...
テストに便利なツール群
これまででKafkaクラスタ環境は構築できましたが、より本格的にテストを行うためのツールもKafkaには付帯しています。現在のKafkaのbinディレクトリ以下には、約40のスクリプトが用意されていますが、今回はそのうちのいくつかについてご紹介します。
kafka-[producer,consumer]-perf-test.sh
ProducerとConsumerで性能がどの程度かを簡単に測定するためには、perf-test.sh
というツールが利用できます。
ProducerとConsumerで別のスクリプトが用意されており、それぞれメッセージのサイズや多重度を指定することができます。
任意のパラメータを指定することで、自分がかけたい負荷を簡単に書けることが可能です。
コマンドの実行例としては以下の通りです。
$ bin/kafka-producer-perf-test.sh --producer-props bootstrap.servers=localhost:29092,localhost:39092,localhost:49092 --topic testTopic --num-records 1000 --record-size 10 --throughput 100
...
1000 records sent, 99.930049 records/sec (0.00 MB/sec), 3.31 ms avg latency, 263.00 ms max latency, 2 ms 50th, 4 ms 95th, 38 ms 99th, 263 ms 99.9th.
$ bin/kafka-consumer-perf-test.sh --bootstrap-server localhost:29092,localhost:39092,localhost:49092 --topic testTopic --messages 1000
...
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2024-12-23 11:15:57:371, 2024-12-23 11:15:57:808, 0.0095, 0.0218, 1000, 2288.3295, 375, 62, 0.1537, 16129.0323
送受信したデータに対して、どの程度のレイテンシとなっているか等を測定することができます。
kafka-e2e-latency.sh
kafka-[producer,consumer]-perf-test.sh
はProducerとConsuemrの性能を確認するためのツールでしたが、送信から受信までのEnd-to-Endでのレイテンシを計測したい場合もあります。
他のProducerなどと異なり、引数ごとの指定内容が決まっていますので注意が必要です。
各引数についてはコマンド実行時にヘルプが表示され、以下の通りの内容となっています。
- broker_list: Broker
- topic: 送信先トピック名
- num_messages: 送信するメッセージ数
- producer_acks: producer が送信の際に利用する Acks の設定
- message_size_bytes: 送信する1メッセージのサイズ(バイト数)
$./bin/kafka-e2e-latency.sh
USAGE: java org.apache.kafka.tools.EndToEndLatency broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file
$ ./bin/kafka-e2e-latency.sh localhost:29092,localhost:39092,localhost:49092 testTopic 1000 all 10
...
0 142.06386600000002
Avg latency: 3.2986 ms
Percentiles: 50th = 2, 99th = 7, 99.9th = 142
trogdor.sh
Kafkaにおける障害試験の実施を行うためのツールです。
trogdor用のCoordinator,Agentと呼ばれるプロセスを事前に動作させておきます。
また、別途試験用のファイルを定義しておき、ジョブとして登録することでディスク障害・ネットワーク障害などを疑似的に再現することが可能です。
詳しくはtrogdorのREADME、もしくはWikiを参照ください。
まとめ
何かと面倒に感じることの多い並列分散環境の構築ですが、最近では用途に応じて簡単に環境を構築することが可能です。これを機に、みなさんもApache Kafkaの世界に飛び込んでみてはいかがでしょうか。