LoginSignup
1
1

More than 5 years have passed since last update.

SparkでKafkaを経由して外部のデータを読取りします「KafkaコネクトMQTT」

Last updated at Posted at 2019-03-29

本章では、Kafkaコネクタライブラリを使ってMQTTのトピックにアクセスしてMQTTに生産されたメッセージをKafkaのトピックに書き込む処理を説明します。
Kafkaコネクタを使う前に以下を確認してください。

  • Kafkaクラスタの動作環境がセットアップされていたかどうかを確認してください。
  • MQTTブローカーを準備しておく必要があります。

1.connect-standalone.propertiesの編集

KafkaでMQTTへコネクトする時に、connect-standalone.propertiesを使います。
以下のように、connect-standalone.propertiesを編集することが必要です。

cd /usr/local/lib/kafka/config
vi connect-standalone.properties
# 編集
bootstrap.servers=192.168.0.97:9092,192.168.0.98:9092,192.168.0.99:9092
offset.storage.file.filename=/var/lib/connect.offsets

オプションの使い方

オプション 解説
bootstrap.servers 各ノードのIPで設定します。ポートは変わらず。
offset.storage.file.filename Offset情報格納ファイルです。
このファイルは一度、空のファイルで作成しないといけません。
書込み権限付与も必須です。

2.Kafkaコネクタライブラリのセットアップ

Kafkaコネクタを利用してMQTTブローカーかMQTTのセンサデータをApache Kafkaに流し込んでいきます。
このKafkaコネクタはApache Kafkaの正式なコネクタではなく、コミュニティからのものです。
Githubからソースコード見るときには、以下のリンクを利用してください。
kafka-connect-mqttリソースのダウンロード先

KafkaコネクMQTTのライブラリーをKafkaのlibsにコピーします。
対象Jars:

kafka-connect-mqtt-1.0-SNAPSHOT.jar
org.eclipse.paho.client.mqttv3-1.0.2.jar

コピー先:

/usr/local/lib/kafka/libs

KafkaコネクMQTT用の構成ファイルをKafkaのconfigにコピーします。
対象File:

mqtt.properties

コピー先:

/usr/local/lib/kafka/config

mqtt.propertiesの内容

##
# Basic
##
name=mqtt
connector.class=com.evokly.kafka.connect.mqtt.MqttSourceConnector
tasks.max=1
##
#Settings
##
# Where to put processed messages - default to mqtt
kafka.topic=topic-mqtt-kafka
# What client id to use - defaults to null which means random client_id
mqtt.client_id=mqtt-kafka-99
# Use clean session in connection? - default true
mqtt.clean_session=true
# What mqtt connection timeout to use - defaults to 30 seconds
mqtt.connection_timeout=30
# What mqtt connection keep alive to use - defaults to 60 seconds
mqtt.keep_alive_interval=60
# Mqtt broker address to use - defaults to tcp://localhost:1883
# if using TLS then certs can be used
mqtt.server_uris=tcp://192.168.0.99:1883
# Mqtt topic to listen to - defaults to # (wildcard - all)
mqtt.topic=topic-mqtt
# エンコード無しようになる設定
key.converter.schemas.enable=false
value.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.json.JsonConverter

Kafkaコネクタライブラリのセットアップは以上で完了です。
以上の設定ができたらMQTTブローカーのトピック「topic-mqtt」->Apache Kafkaのトピック「topic-mqtt-kafka」に流し込んでくるのが出来たはずです。

3.KafkaコネクタMQTTセンサデータ取得手順

Kafkaコネクタのセットアップされていた上で、以下の手順の通りに、
KafkaのコンシューマーMQTTブローカーから流し込んできたメッセージを受信してみましょう。
Kafkaクラスタ起動

cd /usr/local/lib/zookeeper
bin/zkServer.sh start

停止:zkServer.sh stop
クラスターの場合は、全ノードのZookeeperが起動必要。
Kafka起動

#Zookeeper起動
cd /usr/local/lib/zookeeper
bin/zkServer.sh start
#Kafkaの起動
cd /usr/local/lib/kafka
bin/kafka-server-start.sh -daemon config/server.properties

topic-mqtt-topic作成(既存の場合は不要)

bin/kafka-topics.sh --create --topic topic-mqtt-kafka --replication-factor 1 --partitions 1 --zookeeper 192.168.0.99:2181

Kafka コネクタMQTTに接続

bin/connect-standalone.sh config/connect-standalone.properties config.mqtt.properties

mosquittoより送信(MQTT送信に当たり)

cd /usr/bin/
mosquitto -c /etc/mosquitto/mosquitto.conf -v
mosquitto_pub -h 192.168.0.99 -p 1883 -t topic-mqtt -m "Hello Kafka!"

Kafka上で受信

cd /usr/local/lib/kafka
bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.99:9092 --topic topic-mqtt-karka --from-beginning

以下の表示が確認しできれば受信済になります。
('Hello Kafka!')


☆★☆ 次の文章へ ☆★☆
SparkでKafkaを経由して外部のデータを読取りします「SparkStreamingでデータ取得」

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1