4
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

はじめに

色々あるメッセージキュー、最近ではフルマネージドのものをたびたび使用していましたが、原理原則を大事にするためにはバックグラウンドがどうなっているかを知っておきたく、参考としてkafkaのチュートリアルを試したときのメモ+αです。
基本は、現行のQuick Startの内容に則りますが、Quick Startの範囲外の内容で一部ハマったので記録として残しておきます。

環境

今回は自宅にあるマシン2台を利用して行いました。

# kafka サーバ(ブローカー)
OS: CentOS7(CentOS Linux release 7.9.2009 (Core))
IP: 192.168.179.9
host名: Clara

# Clientマシン(Producer兼Consumer)
OS: macOS Big Sur 11.1
IP: 192.168.179.116

実際の操作

Kafkaの用意(バイナリ)

基本はQuick Startに則り、kafka 2.13-2.7.0を使います。
ただし、今回は、一応新規にkafka用のユーザーを作りました。

事前準備
# openjdkのインストール(JREだけでも動くと思いますが、JDKごと入れました)
$ sudo yum install -y java-1.8.0-openjdk-devel.x86_64
# userの作成
$ sudo useradd kafka -m
$ sudo passwd kafka
$ sudo usermod -aG wheel kafka
$ su -l kafka
$ mkdir ~/kafka
$ cd ~/kafka
$ wget https://ftp.jaist.ac.jp/pub/apache/kafka/2.7.0/kafka_2.13-2.7.0.tgz
$ tar -xzvf kafka_2.13-2.7.0.tgz
$ cd kafka_2.13-2.7.0/
# host名の設定/これがなかったらkafka Brokerの起動でドメイン名の解決ができなかったため
$ cat /etc/hosts
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
127.0.0.1   Clara     localhost.localdomain

これで用意は終わりです。

Kafkaの起動

2つターミナルを起動します。zookeeperを起動し、その後Kafka Brokerを起動します。
プロパティファイルは、元から用意されているものを使います。

terminal1つ目
$ bin/zookeeper-server-start.sh config/zookeeper.properties
terminal2つ目
$ bin/kafka-server-start.sh config/server.properties

Topicを用意しローカルで動作確認

さらにターミナルを2つ用意します。

terminal3つ目
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
# Topicができたことをが確認できた
# Kafkaに書き込んで見る(ローカルでProducerを動かす)
$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
>This is first event.
>This is second event.
>^C
$ 
# Ctrl+Cで入力を終了する
# 改行までしたらイベントとして取り込まれる
termina4つ目
# Kafkaから読み出す(ローカルでConsumerを動かす)
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event.
This is my second event.

外部から接続:準備

最初にTCPの口を開ける。

# 9092はKafka Broker用
# 2181はzoomkeeper用で、Brokerが同じサーバ上にしかいなければ不要かもしれない
# クラスタ組むときは要確認
$ sudo firewall-cmd --add-port=9092/tcp --permanent
$ sudo firewall-cmd --add-port=2181/tcp --permanent
$ sudo firewall-cmd --reload

これで外部のマシン(今回だと 192.168.179.116 )から接続できるかと思いましたが、できません。
今回zoomkeeperとKafka Brokerには元から用意されていたプロパティファイルを利用していますが、Producerからの入力となる口をKafka Brokerで使用している server.properties で設定しています。
最初の状態では外部からの通信を受け取れない(外部マシンのProducerもConsumerも正しく動作しない)ため、設定を書き換えます。

config/server.properties(変更前)
前略
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
# listeners=PLAINTEXT://:9092
以下略
config/server.properties(変更後)
前略
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://192.168.179.9:9092
以下略

必要な部分がそもそもコメントアウトされているため、コメントアウトを外して適切なIPを割り当てます。なお、IPの代わりに localhost などを指定しても、外部からは通信できませんでした。どのIPでアクセスさせたいか、明示的に指定する必要がありそうです。
ここで設定したら、再度Kafka Brokerを再度立ち上げます。

外部から接続:実行

以下で実行できます。

$ pip install kafka-python
producer.py
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['192.168.179.9:9092'])
v = producer.send('quickstart-events', b'test')
metadata = v.get(timeout=10)
print(v)

上記のスクリプトを実際に実行してみます。

consumer.py
from kafka import KafkaConsumer, TopicPartition
consumer = KafkaConsumer('quickstart-events', bootstrap_servers=['192.168.179.9:9092'])
for msg in consumer:
    print(msg)
terminal5
$ python producer.py
<kafka.producer.future.FutureRecordMetadata object at 0x10c89bf70>
$ 
terminal6
$ python consumer.py
ConsumerRecord(topic='quickstart-events', partition=0, offset=38, timestamp=1613217477754, timestamp_type=0, key=None, value=b'test', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=4, serialized_header_size=-1)

確かに、Kafka Brokerと通信ができました。(最初 server.properties を編集する必要があることがわからずWiresharkでpucket snifferingして、どこでパケットが欠如しているか切り分けを実しました。結果、 192.168.179.9 側まで通信が到達していることがわかったので、サーバー内部に疑いをかけることができ、気づいた。)

参考

4
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?