LoginSignup
9
12

More than 5 years have passed since last update.

Apache Kafkaを試してみる

Last updated at Posted at 2018-06-20

Apache Kafka

Apache Kafkaが必要になりそうなので、動作確認してみました。
Kafkaとは分散メッセージングプラットフォームで、メッセージをやり取りするプラットフォームです。

既存メッセージングとの違い

JMSなど既存のメッセージングとは下記の点が異なります。
* 分散処理が前提になっており、耐障害性が高い&スケーリング可能
* メッセージの履歴を保持する事が可能で、ストレージ的な側面も持つ。
* ストリーム処理をすることが可能。入力メッセージに対して、様々な処理がリアルタイムにできる。

インストール

テスト用インストールは非常に簡単。
今回はAWS上のAmazon Linux上で作業しています。

Javaのインストール

まっさらな環境なのでJavaのインストールから始めます。

su -
yum install java-1.8.0-openjdk-devel

Kafkaのダウンロードと展開

この記事を書いている段階で1.1.0が最新です。

wget http://ftp.tsukuba.wide.ad.jp/software/apache/kafka/1.1.0/kafka_2.11-1.1.0.tgz
tar -zxf  kafka_2.11-1.1.0.tgz
cd kafka_2.11-1.1.0

これで準備は完了。

Kafkaの起動

Zookeeperは一言で言うと分散処理システムを構築するためのミドルウェアです。
Kafkaはzookeeperなしでも動作するようだけど、後ほどマルチブローカーのテストもするので使用します。

Zookeeperを起動。

bin/zookeeper-server-start.sh config/zookeeper.properties
[2018-06-20 02:06:32,944] INFO Reading configuration from: config/zookeeper.properties 
(org.apache.zookeeper.server.quorum.QuorumPeerConfig)

ブローカーの起動

bin/kafka-server-start.sh config/server.properties
[2018-06-20 02:08:00,771] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2018-06-20 02:08:01,131] INFO starting (kafka.server.KafkaServer)

これでサービスとしては利用可能な状態

通信テスト

トピックの作成

試しにtestという名前のトピックを作成します。
トピックはメッセージの住所みたいなもので、トピックに向けてメッセージをやり取りします。

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

作成されたトピックの確認

bin/kafka-topics.sh --list --zookeeper localhost:2181
test

testトピックにメッセージを送信

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>hi this is from aws
>I'll send message

コンシューマーからメッセージを確認

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
hi this is from aws
I'll send message

ストアされている既存のメッセージを取得できるのが新鮮!!

これでTOPICを作成し、メッセージのサブスクライブ、コンシューマーによるメッセージの確認という一連の流れは確認できた。

マルチブローカーのテスト

Kafkaの特徴でもあるマルチブローカーをテストしてみたい。

マルチブローカー設定

新規ブローカーようにコンフィグファイルをコピー
今回は3台構成にするので新たに2つのファイルをコピーする

cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties

ファイルの編集

それぞれブローカーIDやリスナーポート、ログファイルの設定を変更。

config/server-1.propertiesの編集

vi config/server-1.properties
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9093
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1

config/server-2.propertiesの編集

vi config/server-2.properties
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9094
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-2

ブローカー起動

最初に設定したものと今回新たに設定した3台のブローカーを起動

bin/kafka-server-start.sh config/server.properties &
bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &

レプリケーションファクタが3の新規トピックを作成

3つのブローカーに跨るトピックを作成してみる

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

クラスタ状態の確認

新規に作成したトピック情報を取得

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic       PartitionCount:1        ReplicationFactor:3     Configs:
        Topic: my-replicated-topic      Partition: 0    Leader: 0       Replicas: 2,1,0 Isr: 1,0,2

最初の行はすべてのパーティションの要約で、それ以降の各行はパーティションに関する情報を示す。
* "Leader"は、パーティションのすべての読み取りと書き込みを担当するノードです。今回は0がリーダーになっている。
* "Replicas" はこのパーティションのログを複製するノードのリスト。
* "Isr"は "同期している"レプリカのセット

既存TOPICとの比較

最初に作成したシングルブローカのトピックでは全てノード0が担当している。

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test      PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: test     Partition: 0    Leader: 0       Replicas: 0     Isr: 0

クラスタ化されたトピックにメッセージを送信

メッセージ送信

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
>this is clusterd Topic!
>next message

Consumerから確認

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
this is clusterd Topic!
next message

フォールトトレラントの確認

試しにリーダー、ノード0を落としてみます。

ps -ef | grep server 
ec2-user  9072  2984  2 03:15 pts/0 kafka.Kafka config/server.properties
kill -9 9072

zookeeper のログ

[2018-06-20 03:20:51,000] INFO Processed session termination for sessionid: 0x1641b2fd2520000 (org.apache.zookeeper.server.PrepRequestProcessor)

[1]   Killed                  bin/kafka-server-start.sh config/server.properties

再度クラスタの確認

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic       PartitionCount:1        ReplicationFactor:3     Configs:
        Topic: my-replicated-topic      Partition: 0    Leader: 2       Replicas: 2,1,0 Isr: 1,2

リーダーノード0が落ちたことにより自動的にノード2がリーダーになっていることが確認できた。

9
12
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
12