本章では、Kafkaコネクタライブラリを使ってMQTTのトピックにアクセスしてMQTTに生産されたメッセージをKafkaのトピックに書き込む処理を説明します。
Kafkaコネクタを使う前に以下を確認してください。
- Kafkaクラスタの動作環境がセットアップされていたかどうかを確認してください。
- MQTTブローカーを準備しておく必要があります。
1.connect-standalone.propertiesの編集
KafkaでMQTTへコネクトする時に、connect-standalone.propertiesを使います。
以下のように、connect-standalone.propertiesを編集することが必要です。
cd /usr/local/lib/kafka/config
vi connect-standalone.properties
# 編集
bootstrap.servers=192.168.0.97:9092,192.168.0.98:9092,192.168.0.99:9092
offset.storage.file.filename=/var/lib/connect.offsets
オプションの使い方
オプション | 解説 |
---|---|
bootstrap.servers |
各ノードのIPで設定します。ポートは変わらず。 |
offset.storage.file.filename | Offset情報格納ファイルです。 このファイルは一度、空のファイルで作成しないといけません。 書込み権限付与も必須です。 |
2.Kafkaコネクタライブラリのセットアップ
Kafkaコネクタを利用してMQTTブローカーかMQTTのセンサデータをApache Kafkaに流し込んでいきます。
このKafkaコネクタはApache Kafkaの正式なコネクタではなく、コミュニティからのものです。
Githubからソースコード見るときには、以下のリンクを利用してください。
kafka-connect-mqttリソースのダウンロード先
KafkaコネクMQTTのライブラリーをKafkaのlibsにコピーします。
対象Jars:
kafka-connect-mqtt-1.0-SNAPSHOT.jar
org.eclipse.paho.client.mqttv3-1.0.2.jar
コピー先:
/usr/local/lib/kafka/libs
KafkaコネクMQTT用の構成ファイルをKafkaのconfigにコピーします。
対象File:
mqtt.properties
コピー先:
/usr/local/lib/kafka/config
mqtt.propertiesの内容
##
# Basic
##
name=mqtt
connector.class=com.evokly.kafka.connect.mqtt.MqttSourceConnector
tasks.max=1
##
#Settings
##
# Where to put processed messages - default tomqtt
kafka.topic=topic-mqtt-kafka
# What client id to use - defaults tonull
which means random client_id
mqtt.client_id=mqtt-kafka-99
# Use clean session in connection? - defaulttrue
mqtt.clean_session=true
# What mqtt connection timeout to use - defaults to30
seconds
mqtt.connection_timeout=30
# What mqtt connection keep alive to use - defaults to60
seconds
mqtt.keep_alive_interval=60
# Mqtt broker address to use - defaults totcp://localhost:1883
# if using TLS then certs can be used
mqtt.server_uris=tcp://192.168.0.99:1883
# Mqtt topic to listen to - defaults to#
(wildcard - all)
mqtt.topic=topic-mqtt
# エンコード無しようになる設定
key.converter.schemas.enable=false
value.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.json.JsonConverter
Kafkaコネクタライブラリのセットアップは以上で完了です。
以上の設定ができたらMQTTブローカーのトピック「topic-mqtt」->Apache Kafkaのトピック「topic-mqtt-kafka」に流し込んでくるのが出来たはずです。
3.KafkaコネクタMQTTセンサデータ取得手順
Kafkaコネクタのセットアップされていた上で、以下の手順の通りに、
KafkaのコンシューマーMQTTブローカーから流し込んできたメッセージを受信してみましょう。
Kafkaクラスタ起動
cd /usr/local/lib/zookeeper
bin/zkServer.sh start
停止:zkServer.sh stop
クラスターの場合は、全ノードのZookeeperが起動必要。
Kafka起動
# Zookeeper起動
cd /usr/local/lib/zookeeper
bin/zkServer.sh start
# Kafkaの起動
cd /usr/local/lib/kafka
bin/kafka-server-start.sh -daemon config/server.properties
topic-mqtt-topic作成(既存の場合は不要)
bin/kafka-topics.sh --create --topic topic-mqtt-kafka --replication-factor 1 --partitions 1 --zookeeper 192.168.0.99:2181
Kafka コネクタMQTTに接続
bin/connect-standalone.sh config/connect-standalone.properties config.mqtt.properties
mosquittoより送信(MQTT送信に当たり)
cd /usr/bin/
mosquitto -c /etc/mosquitto/mosquitto.conf -v
mosquitto_pub -h 192.168.0.99 -p 1883 -t topic-mqtt -m "Hello Kafka!"
Kafka上で受信
cd /usr/local/lib/kafka
bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.99:9092 --topic topic-mqtt-karka --from-beginning
以下の表示が確認しできれば受信済になります。
('Hello Kafka!')
☆★☆ 次の文章へ ☆★☆
SparkでKafkaを経由して外部のデータを読取りします「SparkStreamingでデータ取得」