Apache Kafka
前提:Zookeeperがポート番号2181番で起動している状態ではじめること。
- Apache Kafkaの導入からトピックの生成、メッセージ送受信方法を公開します。
- Pub/Subに興味があ流けど、Web上の情報だけではうまくいかなかった方、このマニュアルで、環境をそろえた上で、やってみてください。(動作確認済みです。)
zkServer.sh start
1 Scalaをインストールする。
※Scalaのバージョンは、Apache Kafka公式サイトの、推奨バージョンをインストールすること。
※今回は、kafka_2.10-0.8.2の推奨バージョンである、Scala 2.10をインストールする。
su
yum -y install wget
yum -y install tar
yum -y install java-1.6.0-openjdk-devel
mkdir /usr/local/download/
cd /usr/local/download/
wget http://www.scala-lang.org/files/archive/scala-2.10.4.tgz
tar xzvf scala-2.10.4.tgz
mv /usr/local/download/scala-2.10.4 /usr/local/scala-2.10.4
cd /usr/local
ln -s scala-2.10.4/ scala
echo "export PATH=\$PATH:/usr/local/scala/bin" >> /etc/profile;
export PATH=$PATH:/usr/local/scala/bin
2 ダウンロードおよび解凍
cd /usr/local/download
wget http://ftp.meisei-u.ac.jp/mirror/apache/dist/kafka/0.8.2-beta/kafka_2.10-0.8.2-beta.tgz
tar xzvf kafka_2.10-0.8.2-beta.tgz
mv /usr/local/download/kafka_2.10-0.8.2-beta /usr/local/kafka_2.10-0.8.2-beta
cd /usr/local/
ln -s kafka_2.10-0.8.2-beta/ kafka
echo "export PATH=\$PATH:/usr/local/kafka/bin" >> /etc/profile;
export PATH=$PATH:/usr/local/kafka/bin
3 実行
kafka-server-start.sh /usr/local/kafka/config/server.properties &
4 トピックの生成
Let's create a topic named "test" with a single partition and only one replica:
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test2
kafka-topics.sh --list --zookeeper localhost:2181
# >test2
5 メッセージ送受信テスト
5.1 Kafkaのメッセージ送受信をテストします。ターミナル(端末)を2つ起動し、2つともvm1にsshで接続し、root権限にします。
5.1.1 vm1のconsole1で次のコマンドを入力します。→入力待ち状態へ移行します。(プロデューサーとなる。)
kafka-console-producer.sh --broker-list localhost:9092 --topic test2
# >
5.1.2 vm1のconsole2で次のコマンドを入力します。→出力待ち状態へ移行します。(コンシューマーとなる。こちら側のコンソールは表示専用)
kafka-console-consumer.sh --zookeeper localhost:2181 --topic test2 --from-beginning
# >
5.1.3 vm1のconsole1で何かメッセージを入力して、Enter(改行)します。→console2の画面にも、結果が出力されます。(通信成功)
Hello
# >Hello
6 マルチブローカークラスターのための設定
6.1 server.propertiesファイルの編集
- broker.idの設定(Zookeeperと同じように、各vmにユニークなidを振る必要があります。)
- 待ち受けポート番号の設定(VM1のみ9092番、VM2〜VM5は9093番〜9096番に設定します。)
- logディレクトリの設定(/tmp/kafka-logs)
- /tmp/kafka-logsディレクトリの作成
vm1の設定
sed -i -e 's/^broker\.id/#broker\.id/g' /usr/local/kafka/config/server.properties;
sed -i -e '/^#broker\.id/a broker\.id=1' /usr/local/kafka/config/server.properties;
sed -i -e 's/^port/#port/g' /usr/local/kafka/config/server.properties;
sed -i -e '/^#port/a port=9092' /usr/local/kafka/config/server.properties;
sed -i -e 's/^log\.dir/#log\.dir/g' /usr/local/kafka/config/server.properties;
sed -i -e '/^#log\.dir/a log\.dir=\/tmp\/kafka-logs' /usr/local/kafka/config/server.properties;
mkdir /tmp/kafka-logs
vm2の設定
sed -i -e 's/^broker\.id/#broker\.id/g' /usr/local/kafka/config/server.properties;
sed -i -e '/^#broker\.id/a broker\.id=2' /usr/local/kafka/config/server.properties;
sed -i -e 's/^port/#port/g' /usr/local/kafka/config/server.properties;
sed -i -e '/^#port/a port=9093' /usr/local/kafka/config/server.properties;
sed -i -e 's/^log\.dir/#log\.dir/g' /usr/local/kafka/config/server.properties;
sed -i -e '/^#log\.dir/a log\.dir=\/tmp\/kafka-logs' /usr/local/kafka/config/server.properties;
mkdir /tmp/kafka-logs
vm3の設定
sed -i -e 's/^broker\.id/#broker\.id/g' /usr/local/kafka/config/server.properties;
sed -i -e '/^#broker\.id/a broker\.id=3' /usr/local/kafka/config/server.properties;
sed -i -e 's/^port/#port/g' /usr/local/kafka/config/server.properties;
sed -i -e '/^#port/a port=9094' /usr/local/kafka/config/server.properties;
sed -i -e 's/^log\.dir/#log\.dir/g' /usr/local/kafka/config/server.properties;
sed -i -e '/^#log\.dir/a log\.dir=\/tmp\/kafka-logs' /usr/local/kafka/config/server.properties;
mkdir /tmp/kafka-logs
vm4の設定
sed -i -e 's/^broker\.id/#broker\.id/g' /usr/local/kafka/config/server.properties;
sed -i -e '/^#broker\.id/a broker\.id=4' /usr/local/kafka/config/server.properties;
sed -i -e 's/^port/#port/g' /usr/local/kafka/config/server.properties;
sed -i -e '/^#port/a port=9095' /usr/local/kafka/config/server.properties;
sed -i -e 's/^log\.dir/#log\.dir/g' /usr/local/kafka/config/server.properties;
sed -i -e '/^#log\.dir/a log\.dir=\/tmp\/kafka-logs' /usr/local/kafka/config/server.properties;
mkdir /tmp/kafka-logs
vm5の設定
sed -i -e 's/^broker\.id/#broker\.id/g' /usr/local/kafka/config/server.properties;
sed -i -e '/^#broker\.id/a broker\.id=5' /usr/local/kafka/config/server.properties;
sed -i -e 's/^port/#port/g' /usr/local/kafka/config/server.properties;
sed -i -e '/^#port/a port=9096' /usr/local/kafka/config/server.properties;
sed -i -e 's/^log\.dir/#log\.dir/g' /usr/local/kafka/config/server.properties;
sed -i -e '/^#log\.dir/a log\.dir=\/tmp\/kafka-logs' /usr/local/kafka/config/server.properties;
mkdir /tmp/kafka-logs
7 マルチブローカーでのkafkaの起動
kafka-server-stop.sh
kafka-server-start.sh /usr/local/kafka/config/server.properties &
8 マルチブローカー上でのトピックの作成
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic my-replicated-topic
9 マルチブローカー上でのトピックの表示
kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
# >Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
# >Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
※各項目の説明
- "leader" is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.
- "replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
- "isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.
10 プロデューサーとコンシューマの起動
※起動は数秒置きながら行うとエラーが発生しない。
vm1上でプロデューサーを起動します。
kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
vm2〜vm5でコンシューマを起動します。
kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
起動がうまくいかない場合
- server.propertiesのhost.nameがコメントアウトされている場合は外す。
host.name=localhost
- logの読み込みに失敗している場合
- ログファイルを削除します。
rm -rf /var/log/kafka
mkdir /var/log/kafka
- 手動で起動します。
cd /usr/local/kafka/
pidproxy /var/run/kafka.pid /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.propertie