はじめに
色々あるメッセージキュー、最近ではフルマネージドのものをたびたび使用していましたが、原理原則を大事にするためにはバックグラウンドがどうなっているかを知っておきたく、参考としてkafkaのチュートリアルを試したときのメモ+αです。
基本は、現行のQuick Startの内容に則りますが、Quick Startの範囲外の内容で一部ハマったので記録として残しておきます。
環境
今回は自宅にあるマシン2台を利用して行いました。
# kafka サーバ(ブローカー)
OS: CentOS7(CentOS Linux release 7.9.2009 (Core))
IP: 192.168.179.9
host名: Clara
# Clientマシン(Producer兼Consumer)
OS: macOS Big Sur 11.1
IP: 192.168.179.116
実際の操作
Kafkaの用意(バイナリ)
基本はQuick Startに則り、kafka 2.13-2.7.0を使います。
ただし、今回は、一応新規にkafka用のユーザーを作りました。
# openjdkのインストール(JREだけでも動くと思いますが、JDKごと入れました)
$ sudo yum install -y java-1.8.0-openjdk-devel.x86_64
# userの作成
$ sudo useradd kafka -m
$ sudo passwd kafka
$ sudo usermod -aG wheel kafka
$ su -l kafka
$ mkdir ~/kafka
$ cd ~/kafka
$ wget https://ftp.jaist.ac.jp/pub/apache/kafka/2.7.0/kafka_2.13-2.7.0.tgz
$ tar -xzvf kafka_2.13-2.7.0.tgz
$ cd kafka_2.13-2.7.0/
# host名の設定/これがなかったらkafka Brokerの起動でドメイン名の解決ができなかったため
$ cat /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
127.0.0.1 Clara localhost.localdomain
これで用意は終わりです。
Kafkaの起動
2つターミナルを起動します。zookeeperを起動し、その後Kafka Brokerを起動します。
プロパティファイルは、元から用意されているものを使います。
$ bin/zookeeper-server-start.sh config/zookeeper.properties
$ bin/kafka-server-start.sh config/server.properties
Topicを用意しローカルで動作確認
さらにターミナルを2つ用意します。
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
# Topicができたことをが確認できた
# Kafkaに書き込んで見る(ローカルでProducerを動かす)
$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
>This is first event.
>This is second event.
>^C
$
# Ctrl+Cで入力を終了する
# 改行までしたらイベントとして取り込まれる
# Kafkaから読み出す(ローカルでConsumerを動かす)
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event.
This is my second event.
外部から接続:準備
最初にTCPの口を開ける。
# 9092はKafka Broker用
# 2181はzoomkeeper用で、Brokerが同じサーバ上にしかいなければ不要かもしれない
# クラスタ組むときは要確認
$ sudo firewall-cmd --add-port=9092/tcp --permanent
$ sudo firewall-cmd --add-port=2181/tcp --permanent
$ sudo firewall-cmd --reload
これで外部のマシン(今回だと 192.168.179.116
)から接続できるかと思いましたが、できません。
今回zoomkeeperとKafka Brokerには元から用意されていたプロパティファイルを利用していますが、Producerからの入力となる口をKafka Brokerで使用している server.properties
で設定しています。
最初の状態では外部からの通信を受け取れない(外部マシンのProducerもConsumerも正しく動作しない)ため、設定を書き換えます。
前略
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
# listeners=PLAINTEXT://:9092
以下略
前略
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://192.168.179.9:9092
以下略
必要な部分がそもそもコメントアウトされているため、コメントアウトを外して適切なIPを割り当てます。なお、IPの代わりに localhost
などを指定しても、外部からは通信できませんでした。どのIPでアクセスさせたいか、明示的に指定する必要がありそうです。
ここで設定したら、再度Kafka Brokerを再度立ち上げます。
外部から接続:実行
以下で実行できます。
$ pip install kafka-python
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['192.168.179.9:9092'])
v = producer.send('quickstart-events', b'test')
metadata = v.get(timeout=10)
print(v)
上記のスクリプトを実際に実行してみます。
from kafka import KafkaConsumer, TopicPartition
consumer = KafkaConsumer('quickstart-events', bootstrap_servers=['192.168.179.9:9092'])
for msg in consumer:
print(msg)
$ python producer.py
<kafka.producer.future.FutureRecordMetadata object at 0x10c89bf70>
$
$ python consumer.py
ConsumerRecord(topic='quickstart-events', partition=0, offset=38, timestamp=1613217477754, timestamp_type=0, key=None, value=b'test', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=4, serialized_header_size=-1)
確かに、Kafka Brokerと通信ができました。(最初 server.properties
を編集する必要があることがわからずWiresharkでpucket snifferingして、どこでパケットが欠如しているか切り分けを実しました。結果、 192.168.179.9
側まで通信が到達していることがわかったので、サーバー内部に疑いをかけることができ、気づいた。)