LoginSignup
1

More than 5 years have passed since last update.

SparkでKafkaを経由して外部のデータを読取りします「SparkStreamingでデータ取得」

Last updated at Posted at 2019-03-29

本章では、Spark単独環境上の構築方法を基に記述しております。
standaloneやyarnなどのクラスター上で動かすことがあれば、私のそのたの投稿にクラスターの構築後方がありますのでご参照ください。
SparkStreamingで、単独環境もクラスタ環境もどちでもKafkaのデータ読取するのは構いませんです。

下記は、Sparkのクラスタ環境構築に関連する私の投稿です。興味をお持ちの方にはご参照ください。

1.Sparkバージョンの問題

下図によって、「spark-streaming-kafka-0-10」の方がPythonを対応されておきませんので、「spark-streaming-kafka-0-8」の方を用いられないといけませんでした。
ただし、「spark-streaming-kafka-0-8」の方は、Spark2.3.0以降にサポートされてなくなったので、今度、Spark2.2.3のほうを用い直してみます。

※Spark2.2.3の配置は、私のそのたの投稿をご参照ください。
Spark2.2.3のダウンロード先

2.Mavenの用意

Mavenに必要なJarを用意します。
コピー対象:

/SparkStreaming/spark-streaming-kafka-0-8_2.11

コピー先:

~/.m2/repository/org/apache/

spark-streaming-kafka-0-8_2.11-2.1.0.jarのダウンロード先

3.必須なJar用意

必須になるJarを用意します。
コピー対象:

/SparkStreaming/spark-core_2.11-1.5.2.jar
/SparkStreaming/spark-streaming-kafka-assembly_2.11-1.6.3.jar

コピー先:

~/usr/work/

spark-core_2.11-1.5.2.jarのダウンロード先
spark-streaming-kafka-assembly_2.11-1.6.3.jarのダウンロード先

4.Kafka_wordcount.py用意

SparkStreamingでKafkaのトピックからセンサデータを取得してWordCountを行うファイルを用意します。
コピー対象:

/SparkStreaming/kafka_wordcount.py

コピー先:

~/usr/work/

5.WordCount実行

以下を実行します。

# no cluster
spark-submit --jars spark-streaming-kafka-assembly_2.11-1.6.3.jar,spark-core_2.11-1.5.2.jar --packages org.apache:spark-streaming-kafka-0-8_2.11:2.1.0 ./kafka_wordcount.py 192.168.0.99:2181 topic-mqtt-kafka

以下の表示が確認しできれば起動済になります。

'-------------------------------------------
Time: 2019-02-19 00:11:50
'-------------------------------------------

※受信確認できない場合は、以下を繰り返して実行してみてください。

7.MQTT→Kafka→Spark連携環境起動手順

Kafkaクラスタ起動

#Zookeeper起動
cd /usr/local/lib/zookeeper
bin/zkServer.sh start
#Kafkaの起動
cd /usr/local/lib/kafka
bin/kafka-server-start.sh -daemon config/server.properties

Topic作成

cd /usr/local/lib/kafka
# topic-mqtt-kafka確認
bin/kafka-topics.sh --describe --zookeeper  192.168.0.99:2181 --topic topic-mqtt-kafka
# 存在しない場合は作成
bin/kafka-topics.sh --create --topic topic-mqtt-kafka --replication-factor 1 --partitions 1 --zookeeper 192.168.0.99:2181

mosquittoより送信(MQTT送信に当たり)

cd /usr/bin/
mosquitto -c /etc/mosquitto/mosquitto.conf -v
mosquitto_pub -h 192.168.0.99 -p 1883 -t topic-mqtt -m "Hello Spark"

SparkStandaloneクラスタ起動

#Master起動
start-master.sh
#Slave起動
start-slaves.sh

SparkStreamingでデータ取得

cd /usr/work
#クラスタなし、かつKafka連携しない
spark-submit --master spark://192.168.0.100:7077 --deploy-mode client --jars ./spark-streaming-mqtt-assembly_2.11-1.6.3.jar --packages org.apache.bahir:spark-streaming-mqtt_2.11:2.2.1 ./mqtt_wordcount.py tcp://192.168.0.99:1883 topic-mqtt
#クラスタなし、かつKafka連携
spark-submit --jars spark-streaming-kafka-assembly_2.11-1.6.3.jar,spark-core_2.11-1.5.2.jar --packages org.apache:spark-streaming-kafka-0-8_2.11:2.1.0 ./kafka_wordcount.py 192.168.0.99:2181 topic-mqtt-kafka
#クラスタあり、かつKafka連携しない
spark-submit --master spark://192.168.0.100:7077 --deploy-mode client --jars ./spark-streaming-mqtt-assembly_2.11-1.6.3.jar --packages org.apache.bahir:spark-streaming-mqtt_2.11:2.2.1 ./mqtt_wordcount.py tcp://192.168.0.99:1883 topic-mqtt
#クラスタあり、かつKafka連携
spark-submit --master spark://192.168.0.100:7077 --deploy-mode client --jars ./spark-streaming-kafka-assembly_2.11-1.6.3.jar,./spark-core_2.11-1.5.2.jar --packages org.apache:spark-streaming-kafka-0-8_2.11:2.1.0 ./kafka_wordcount.py 192.168.0.99:2181 topic-mqtt-kafka

以下の表示が確認できればKafkaトピックからデータ取得するのは完璧になります。

'-------------------------------------------
Time: 2019-02-19 00:11:50
'-------------------------------------------
(u'Hello Spark', 1)

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1