#Apache Kafka
Apache Kafkaが必要になりそうなので、動作確認してみました。
Kafkaとは分散メッセージングプラットフォームで、メッセージをやり取りするプラットフォームです。
##既存メッセージングとの違い
JMSなど既存のメッセージングとは下記の点が異なります。
- 分散処理が前提になっており、耐障害性が高い&スケーリング可能
- メッセージの履歴を保持する事が可能で、ストレージ的な側面も持つ。
- ストリーム処理をすることが可能。入力メッセージに対して、様々な処理がリアルタイムにできる。
インストール
テスト用インストールは非常に簡単。
今回はAWS上のAmazon Linux上で作業しています。
Javaのインストール
まっさらな環境なのでJavaのインストールから始めます。
su -
yum install java-1.8.0-openjdk-devel
Kafkaのダウンロードと展開
この記事を書いている段階で1.1.0が最新です。
wget http://ftp.tsukuba.wide.ad.jp/software/apache/kafka/1.1.0/kafka_2.11-1.1.0.tgz
tar -zxf kafka_2.11-1.1.0.tgz
cd kafka_2.11-1.1.0
これで準備は完了。
Kafkaの起動
Zookeeperは一言で言うと分散処理システムを構築するためのミドルウェアです。
Kafkaはzookeeperなしでも動作するようだけど、後ほどマルチブローカーのテストもするので使用します。
###Zookeeperを起動。
bin/zookeeper-server-start.sh config/zookeeper.properties
[2018-06-20 02:06:32,944] INFO Reading configuration from: config/zookeeper.properties
(org.apache.zookeeper.server.quorum.QuorumPeerConfig)
ブローカーの起動
bin/kafka-server-start.sh config/server.properties
[2018-06-20 02:08:00,771] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2018-06-20 02:08:01,131] INFO starting (kafka.server.KafkaServer)
これでサービスとしては利用可能な状態
##通信テスト
###トピックの作成
試しにtestという名前のトピックを作成します。
トピックはメッセージの住所みたいなもので、トピックに向けてメッセージをやり取りします。
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
###作成されたトピックの確認
bin/kafka-topics.sh --list --zookeeper localhost:2181
test
###testトピックにメッセージを送信
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>hi this is from aws
>I'll send message
###コンシューマーからメッセージを確認
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
hi this is from aws
I'll send message
ストアされている既存のメッセージを取得できるのが新鮮!!
これでTOPICを作成し、メッセージのサブスクライブ、コンシューマーによるメッセージの確認という一連の流れは確認できた。
#マルチブローカーのテスト
Kafkaの特徴でもあるマルチブローカーをテストしてみたい。
##マルチブローカー設定
新規ブローカーようにコンフィグファイルをコピー
今回は3台構成にするので新たに2つのファイルをコピーする
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
##ファイルの編集
それぞれブローカーIDやリスナーポート、ログファイルの設定を変更。
config/server-1.propertiesの編集
vi config/server-1.properties
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9093
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1
###config/server-2.propertiesの編集
vi config/server-2.properties
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9094
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-2
ブローカー起動
最初に設定したものと今回新たに設定した3台のブローカーを起動
bin/kafka-server-start.sh config/server.properties &
bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &
###レプリケーションファクタが3の新規トピックを作成
3つのブローカーに跨るトピックを作成してみる
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
###クラスタ状態の確認
新規に作成したトピック情報を取得
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 2,1,0 Isr: 1,0,2
最初の行はすべてのパーティションの要約で、それ以降の各行はパーティションに関する情報を示す。
- "Leader"は、パーティションのすべての読み取りと書き込みを担当するノードです。今回は0がリーダーになっている。
- "Replicas" はこのパーティションのログを複製するノードのリスト。
- "Isr"は "同期している"レプリカのセット
###既存TOPICとの比較
最初に作成したシングルブローカのトピックでは全てノード0が担当している。
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
##クラスタ化されたトピックにメッセージを送信
###メッセージ送信
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
>this is clusterd Topic!
>next message
Consumerから確認
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
this is clusterd Topic!
next message
フォールトトレラントの確認
試しにリーダー、ノード0を落としてみます。
ps -ef | grep server
ec2-user 9072 2984 2 03:15 pts/0 kafka.Kafka config/server.properties
kill -9 9072
zookeeper のログ
[2018-06-20 03:20:51,000] INFO Processed session termination for sessionid: 0x1641b2fd2520000 (org.apache.zookeeper.server.PrepRequestProcessor)
[1] Killed bin/kafka-server-start.sh config/server.properties
再度クラスタの確認
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 1,2
リーダーノード0が落ちたことにより自動的にノード2がリーダーになっていることが確認できた。