最低限押さえておくべき用語
用語 | 説明 |
---|---|
ブローカー (Broker) | クラスタの各ノードのこと。ここでは EC2 だと思えば良い |
クラスタ (Cluster) | ブローカーの集まり。ここでは EC2 の集まりだと思えば良い |
トピック (Topic) | メッセージをカテゴリ分けするための論理的なチャンネルの単位 |
パーティション (Partition) | トピックを分割する物理的な単位 |
プロデューサー (Producer) | トピックにメッセージを送信するクライアント |
コンシューマー (Consumer) | トピックからメッセージを読み取るクライアント |
コンシューマーグループ (Consumer Group) | 1つ以上のコンシューマーからなるグループ。トピック内にある複数のパーティションに対し、グループ内のコンシューマーにそれぞれ読み込ませることで負荷を分散する |
オフセット (Offset) | パーティション内でのメッセージの一意の位置を示す番号。コンシューマーはオフセットを利用して、パーティション単位でメッセージをどこまで読んだか管理する |
レプリケーション (Replication) | データの冗長性を確保するため、各パーティションのコピーを複数のブローカーに保存する仕組み |
リーダー (Leader) とフォロワー (Follower) | 各パーティションの主コピー(リーダー)とその複製(フォロワー)。すべての読み書き操作はリーダーを通じて行われ、フォロワーはリーダーのデータを同期する |
Zookeeper | 分散アプリケーションの構成情報の管理、ノードのステータスの監視、リーダー選出などを行い、システム全体の一貫性と可用性を確保します。Kafka では、クラスタのメタデータ管理やブローカー間の調整に使用されています |
このあたりは下記の Youtube 「Kafka講座2:Apache Kafkaの分散処理のしくみ」をみるのが早いです。15 分ぐらいです。
環境
- OS: Amazon Linux 2023
- Apache kafka バージョン: 3.8.0
- インスタンスタイプ: t3.small
- ※ t2.micro では、メモリ不足で Kafka が起動しません
- 冗長構成 3 台
セキュリティグループの作成
Zookeeper と Kafka が正しく通信できるように、必要なポートを解放するセキュリティグループを作成しておきます。
ポート | 詳細 | |
---|---|---|
Zookeeper | 2181 | クライアント接続用ポート |
2888 | リーダーとフォロワー間の通信用ポート | |
3888 | リーダー選出用のポート | |
Kafka | 9092 | ブローカーがクライアントからの通信を受けるためのポート |
共通の設定
Java のインストール
Apache Kafka は Java で実装されており、その動作には Javaランタイム環境(JRE)が必要です。
ドキュメントには Java 8, 11, 17 がサポートされているという記載がある。
Apache Kafka 3.7 以降ではブローカーとツールの Java 11 のサポートは非推奨との記載があることから、Java 17 を導入することにしました。
普通に Amazon Corretto 17 (Java 17) をインストールします。
sudo dnf update -y
sudo dnf search java-17
Last metadata expiration check: 0:41:31 ago on Wed Aug 7 08:09:47 2024.
========================================================================================= Name Matched: java-17 ==========================================================================================
java-17-amazon-corretto.x86_64 : Amazon Corretto development environment
java-17-amazon-corretto-devel.x86_64 : Amazon Corretto 17 development tools
java-17-amazon-corretto-headless.x86_64 : Amazon Corretto headless development environment
java-17-amazon-corretto-javadoc.x86_64 : Amazon Corretto 17 API documentation
java-17-amazon-corretto-jmods.x86_64 : Amazon Corretto 17 jmods
sudo dnf install -y java-17-amazon-corretto-devel
java --version
openjdk 17.0.12 2024-07-16 LTS
OpenJDK Runtime Environment Corretto-17.0.12.7.1 (build 17.0.12+7-LTS)
OpenJDK 64-Bit Server VM Corretto-17.0.12.7.1 (build 17.0.12+7-LTS, mixed mode, sharing)
Kafka ダウンロード
Kafka をダウンロードします。
cd /opt
sudo wget https://downloads.apache.org/kafka/3.8.0/kafka_2.13-3.8.0.tgz
sudo tar -xzf kafka_2.13-3.8.0.tgz
cd kafka_2.13-3.8.0
Zookeeper の共通設定
Zookeeper の config ファイルのデフォルトの設定では以下の通りとなっていました。
# スナップショットが保存されるディレクトリ
dataDir=/tmp/zookeeper
# クライアントが接続するポート
clientPort=2181
# 非本番構成のため、IPごとの接続数の制限を無効化
maxClientCnxns=0
# 管理サーバーをデフォルトで無効化して、ポートの競合を回避。
# 有効にする場合は競合しないポートを設定してください
admin.enableServer=false
# admin.serverPort=8080
デフォルトの設定に加え以下の通りとし、実行します。
dataDir=/var/lib/zookeeper
clientPort=2181
maxClientCnxns=0
admin.enableServer=false
tickTime=2000
initLimit=5
syncLimit=2
autopurge.snapRetainCount=3
autopurge.purgeInterval=24
EOF'
Kafka の共通設定
Kafka のデフォルトの設定では以下の通りとなっていました。
# この設定ファイルは、ZKベースのモード(ZooKeeperが必要)で使用することを意図しています。
# 詳細やデフォルト値については、kafka.server.KafkaConfig を参照してください。
############################# Server Basics #############################
# ブローカーのID。ブローカーごとにユニークな整数に設定する必要があります。
broker.id=0
############################# Socket Server Settings #############################
# ソケットサーバーがリッスンするアドレス。この設定がない場合、ホスト名は
# java.net.InetAddress.getCanonicalHostName()の値に等しくなり、PLAINTEXT リスナー名とポート 9092 になります。
# 書式:
# listeners = listener_name://host_name:port
# 例:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
# クライアントに広告されるリスナー名、ホスト名、およびポート。
# 設定されていない場合は "listeners" の値が使用されます。
#advertised.listeners=PLAINTEXT://your.host.name:9092
# リスナー名をセキュリティプロトコルにマッピングします。デフォルトでは、それらは同じです。詳細は設定ドキュメントを参照してください。
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# サーバーがネットワークからのリクエストを受信し、ネットワークに応答を送信するために使用するスレッド数
num.network.threads=3
# サーバーがリクエストを処理するために使用するスレッド数(ディスク I/O を含む場合があります)
num.io.threads=8
# ソケットサーバーが使用する送信バッファ(SO_SNDBUF)
socket.send.buffer.bytes=102400
# ソケットサーバーが使用する受信バッファ(SO_RCVBUF)
socket.receive.buffer.bytes=102400
# ソケットサーバーが受け入れるリクエストの最大サイズ(OOM防止)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# ログファイルを保存するディレクトリのカンマ区切りリスト
log.dirs=/tmp/kafka-logs
# トピックごとのデフォルトのログパーティション数。パーティション数が多いほど消費の並列度が高くなりますが、
# それに伴ってブローカー全体でのファイル数も増加します。
num.partitions=1
# 起動時のログリカバリとシャットダウン時のフラッシュに使用するデータディレクトリごとのスレッド数。
# データディレクトリがRAIDアレイに配置されている場合は、この値を増やすことをお勧めします。
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# グループメタデータの内部トピック "__consumer_offsets" と "__transaction_state" のレプリケーション係数。
# 開発テスト以外では、可用性を確保するために、1以上の値(推奨値は3)を設定することをお勧めします。
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
# メッセージは直ちにファイルシステムに書き込まれますが、デフォルトではOSキャッシュへのfsync()は遅延して行われます。
# 次の設定は、データをディスクにフラッシュする方法を制御します。ここにはいくつかの重要なトレードオフがあります:
# 1. 耐久性: フラッシュされていないデータは、レプリケーションを使用していない場合に失われる可能性があります。
# 2. レイテンシ: フラッシュ間隔が非常に大きいと、フラッシュが発生したときに蓄積されたデータの量が多く、レイテンシのスパイクを引き起こす可能性があります。
# 3. スループット: フラッシュは通常、最も高コストな操作であり、フラッシュ間隔が小さいと、過剰なシークが発生する可能性があります。
# 以下の設定により、データを一定期間またはNメッセージごとにフラッシュするポリシーを設定できます。
# これらはグローバルに設定でき、トピックごとにオーバーライドすることも可能です。
# データをディスクに強制フラッシュする前に受け入れるメッセージ数
#log.flush.interval.messages=10000
# メッセージがログ内に留まることができる最大時間(その後フラッシュが強制される)
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# 次の設定はログセグメントの削除を制御します。ポリシーは、一定期間後または一定のサイズが蓄積された後に
# セグメントを削除するように設定できます。いずれかの基準が満たされたときにセグメントが削除されます。
# 削除は常にログの末尾から行われます。
# 年齢による削除対象になるログファイルの最小年齢
log.retention.hours=168
# ログのサイズに基づく保持ポリシー。セグメントは、ログの残りのセグメントがlog.retention.bytesを下回るまでプルーニングされます。
# log.retention.hoursとは独立して機能します。
#log.retention.bytes=1073741824
# ログセグメントファイルの最大サイズ。このサイズに達すると、新しいログセグメントが作成されます。
#log.segment.bytes=1073741824
# 保持ポリシーに従って、ログセグメントが削除可能かどうかを確認する間隔
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
# ZooKeeper接続文字列(詳細はZooKeeperのドキュメントを参照してください)。
# これはカンマ区切りのhost:portペアであり、それぞれがzkサーバーに対応します。例: "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"。
# ルートディレクトリを指定するためにURLにオプションでchroot文字列を追加することもできます。
zookeeper.connect=localhost:2181
# ZooKeeperへの接続タイムアウト(ミリ秒)
zookeeper.connection.timeout.ms=18000
############################# Group Coordinator Settings #############################
# グループコーディネーターが最初のコンシューマーリバランスを遅延させる時間(ミリ秒)。
# 新しいメンバーがグループに参加するたびに、リバランスはgroup.initial.rebalance.delay.msの値だけさらに遅延します。最大でmax.poll.interval.msまで。
# デフォルト値は3秒です。
# 開発およびテストのための、より良い初期設定にするため、ここでは0に設定しています。
# ただし、本番環境ではデフォルト値の3秒が適切であり、アプリケーション起動時の不必要でコストがかかるリバランスを避けるのに役立ちます。
group.initial.rebalance.delay.ms=0
デフォルトの設定に加え以下の通りとし、実行します。
sudo bash -c 'cat <<EOF > config/server.properties
listeners=PLAINTEXT://0.0.0.0:9092
log.dirs=/var/lib/kafka/data
num.partitions=3
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=524288000
log.retention.check.interval.ms=60000
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
EOF'
Zookeeper のデータディレクトリを作成
ZooKeeper ノードに一意の ID を割り当てるためのデータディレクトリだけを作成します。ID はノード毎に異なるため、後で EC2 のユーザーデータで設定します。
sudo mkdir -p /var/lib/zookeeper
sudo chown -R root:root /var/lib/zookeeper
sudo chmod -R 755 /var/lib/zookeeper
systemd の作成
zookeeper.service
と kafka.service
の systemd を作成します。
[Unit]
Description=Apache Zookeeper Server
Documentation=http://zookeeper.apache.org
Requires=network.target
After=network.target
[Service]
Type=simple
User=kafka
Group=kafka
ExecStart=/opt/kafka_2.13-3.8.0/bin/zookeeper-server-start.sh /opt/kafka_2.13-3.8.0/kafka_2.13-3.0.0/config/zookeeper.properties
ExecStop=/opt/kafka_2.13-3.8.0/bin/zookeeper-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
[Unit]
Description=Apache Kafka Server
Documentation=http://kafka.apache.org
Requires=zookeeper.service
After=zookeeper.service
[Service]
Type=simple
User=kafka
Group=kafka
ExecStart=/opt/kafka_2.13-3.8.0/bin/kafka-server-start.sh /opt/kafka_2.13-3.8.0/config/server.properties
ExecStop=/usr/local/kafka_2.13-3.0.0/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
新しいサービスファイルを読み込み、ブート時に自動起動するようにしておきます。
sudo systemctl daemon-reload
sudo systemctl enable zookeeper
sudo systemctl enable kafka
AMI の作成
ここまでの AMI を作成します。
インスタンスの起動
先程作成した AMI を利用して、
kafka1 EC2 インスタンス
kafka2 EC2 インスタンス
kafka3 EC2 インスタンス
を起動します。
セキュリティグループは冒頭で作成した kafka-sg を指定します。
Name | AMI | セキュリティティグループ | IP アドレス(固定) | ユーザーデータ |
---|---|---|---|---|
kafka1 | kafka-ami | kafka-sg | 10.0.1.100 | 後述の kafka1.bash |
kafka2 | kafka-ami | kafka-sg | 10.0.2.100 | 後述の kafka2.bash |
kafka3 | kafka-ami | kafka-sg | 10.0.3.100 | 後述の kafka3.bash |
kafka1 ユーザーデータ
#!/bin/bash
# ディレクトリを移動
cd /opt/kafka_2.13-3.8.0
# myidファイルを作成
echo "1" | sudo tee /var/lib/zookeeper/myid
# 自身のプライベート IP アドレスを取得
TOKEN=$(curl -X PUT "http://169.254.169.254/latest/api/token" -H "X-aws-ec2-metadata-token-ttl-seconds: 21600")
PRIVATE_IP=$(curl -s -H "X-aws-ec2-metadata-token: $TOKEN" "http://169.254.169.254/latest/meta-data/local-ipv4")
# Zookeeperのサーバー固有の設定を追加
sudo echo "server.1=$PRIVATE_IP:2888:3888" >> config/zookeeper.properties
sudo echo "server.2=10.0.2.100:2888:3888" >> config/zookeeper.properties
sudo echo "server.3=10.0.3.100:2888:3888" >> config/zookeeper.properties
# Kafkaのブローカー固有の設定を追加
sudo echo "advertised.listeners=PLAINTEXT://$PRIVATE_IP:9092" >> config/server.properties
sudo echo "broker.id=1" >> config/server.properties
sudo echo "zookeeper.connect=$PRIVATE_IP:2181,10.0.2.100:2181,10.0.3.100:2181" >> config/server.properties
# ZookeeperとKafkaの起動
sudo bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
sudo bin/kafka-server-start.sh -daemon config/server.properties
sudo systemctl start zookeeper
sudo systemctl start kafka
kafka2 ユーザーデータ
#!/bin/bash
# ディレクトリを移動
cd /opt/kafka_2.13-3.8.0
# myidファイルを作成
echo "2" | sudo tee /var/lib/zookeeper/myid
# 自身のプライベート IP アドレスを取得
TOKEN=$(curl -X PUT "http://169.254.169.254/latest/api/token" -H "X-aws-ec2-metadata-token-ttl-seconds: 21600")
PRIVATE_IP=$(curl -s -H "X-aws-ec2-metadata-token: $TOKEN" "http://169.254.169.254/latest/meta-data/local-ipv4")
# Zookeeperのサーバー固有の設定を追加
sudo echo "server.2=$PRIVATE_IP:2888:3888" >> config/zookeeper.properties
sudo echo "server.1=10.0.1.100:2888:3888" >> config/zookeeper.properties
sudo echo "server.3=10.0.3.100:2888:3888" >> config/zookeeper.properties
# Kafkaのブローカー固有の設定を追加
sudo echo "advertised.listeners=PLAINTEXT://$PRIVATE_IP:9092" >> config/server.properties
sudo echo "broker.id=2" >> config/server.properties
sudo echo "zookeeper.connect=10.0.1.00:2181,$PRIVATE_IP:2181,10.0.3.100:2181" >> config/server.properties
# ZookeeperとKafkaの起動
sudo bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
sudo bin/kafka-server-start.sh -daemon config/server.properties
sudo systemctl start zookeeper
sudo systemctl start kafka
kafka3 ユーザーデータ
#!/bin/bash
# ディレクトリを移動
cd /opt/kafka_2.13-3.8.0
# myidファイルを作成
echo "3" | sudo tee /var/lib/zookeeper/myid
# 自身のプライベート IP アドレスを取得
TOKEN=$(curl -X PUT "http://169.254.169.254/latest/api/token" -H "X-aws-ec2-metadata-token-ttl-seconds: 21600")
PRIVATE_IP=$(curl -s -H "X-aws-ec2-metadata-token: $TOKEN" "http://169.254.169.254/latest/meta-data/local-ipv4")
# Zookeeperのサーバー固有の設定を追加
sudo echo "server.3=$PRIVATE_IP:2888:3888" >> config/zookeeper.properties
sudo echo "server.1=10.0.1.100:2888:3888" >> config/zookeeper.properties
sudo echo "server.2=10.0.2.100:2888:3888" >> config/zookeeper.properties
# Kafkaのブローカー固有の設定を追加
sudo echo "advertised.listeners=PLAINTEXT://$PRIVATE_IP:9092" >> config/server.properties
sudo echo "broker.id=3" >> config/server.properties
sudo echo "zookeeper.connect=10.0.1.100:2181,$PRIVATE_IP:2181,10.0.3.100:2181" >> config/server.properties
# ZookeeperとKafkaの起動
sudo bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
sudo bin/kafka-server-start.sh -daemon config/server.properties
sudo systemctl start zookeeper
sudo systemctl start kafka
zookeeper への接続確認
bin/zookeeper-shell.sh
で zookeeper に接続できることを確認します。
sudo bin/zookeeper-shell.sh 10.0.1.100:2181
Connecting to 10.0.1.100:2181
Welcome to ZooKeeper!
JLine support is disabled
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
今回ここで何かを行うわけではありませんが、10.0.2.100:2181、10.0.3.100:2181 も同様に接続を確認しておきます。
トピック作成とステータス確認
kafka-topics.sh
を使用して、トピックを作成、ステータスなどを確認していきます。
ここでは、topic-a というトピックを作成し、レプリカを 2、パーティションを 2 で作成しています。
※ config/server.properties
で num.partitions=3
としましたが、ここでは明示的に --partitions 2
としています。
bin/kafka-topics.sh --create \
--bootstrap-server=10.0.1.100:9092 \
--topic topic-a \
--partitions 2 \
--replication-factor 2
bin/kafka-topics.sh --create \
--bootstrap-server=10.0.1.100:9092 \
--topic topic-b \
--partitions 2 \
--replication-factor 3
トピック topic-a には 2 つのパーティション (0 と 1) があり、それぞれに 2 つのレプリカがあります。
各パーティションにはリーダー (3 と 1) がいて、データの処理を担当し、レプリカ (3 と 2) (1 と 3) で、冗長性を保っています。
トピック topic-b には 2 つのパーティション (0 と 1) があり、それぞれに 2 つのレプリカがあります。
各パーティションにはリーダー (3 と 1) がいて、データの処理を担当し、レプリカ (3 と 2 と 1) (1 と 3 と 2) で、冗長性を保っています。
bin/kafka-topics.sh --describe \
--bootstrap-server 10.0.1.100:9092 \
--topic topic-a
Topic: topic-a TopicId: fsBOHF0sSgyuSVeKlfHSQg PartitionCount: 2 ReplicationFactor: 2 Configs: segment.bytes=524288000,retention.bytes=1073741824
Topic: topic-a Partition: 0 Leader: 3 Replicas: 3,2 Isr: 3,2 Elr: N/A LastKnownElr: N/A
Topic: topic-a Partition: 1 Leader: 1 Replicas: 1,3 Isr: 1,3 Elr: N/A LastKnownElr: N/A
bin/kafka-topics.sh --describe \
--bootstrap-server 10.0.1.100:9092 \
--topic topic-a
Topic: topic-b TopicId: r5hulROjShiaoOQazytRrg PartitionCount: 2 ReplicationFactor: 3 Configs: segment.bytes=524288000,retention.bytes=1073741824
Topic: topic-b Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1 Elr: N/A LastKnownElr: N/A
Topic: topic-b Partition: 1 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2 Elr: N/A LastKnownElr: N/A
ちょっとわかりにくいので図にするとこんな感じでしょうか?
テスト送信と受信
kafka-console-producer.sh
でプロデューサー側に接続し、メッセージを送信します。
bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic topic-a
>test-message-1
>test-message-2
kafka-console-consumer.sh
でコンシューマ側に接続し、メッセージを受信できることを確認します。
bin/kafka-console-consumer.sh --from-beginning \
--bootstrap-server 10.0.1.100:9092 \
--topic topic-a
test-message-1
test-message-2