■ こちらの環境
OS: Ubuntu 16 or 18
Hadoop: hadoop-3.2.1.tar.gz
JDK (Java): jdk-8u202-linux-x64.tar.gz
Spark: spark-3.0.1-bin-hadoop3.2.tgz
ネームノード
192.168.76.216: h-gpu05
データノード
192.168.76.210: h-gpu03
192.168.76.210: h-gpu04
ScalaとSbtのバージョン
hadoop@h-gpu05:/mnt/data/hadoop$ scala -version
Scala code runner version 2.12.2 -- Copyright 2002-2017, LAMP/EPFL and Lightbend, Inc.
hadoop@h-gpu05:/mnt/data/hadoop$ sbt -version
sbt script version: 1.4.6
まずは、Spark-shellで Spark Streamingを試す。
今回はlocalhostのport9999番へSpark-Streamingで処理するデータを送信します。
ポートを開ける
$nc -lk localhost 9999
hadoop@h-gpu05:/mnt/data/hadoop/spark-3.0.1-bin-hadoop3.2/bin$ ./spark-shell
2021-01-07 11:01:12,652 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2021-01-07 11:01:17,935 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
2021-01-07 11:01:17,936 WARN util.Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
2021-01-07 11:01:17,936 WARN util.Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
2021-01-07 11:01:17,936 WARN util.Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
2021-01-07 11:01:17,936 WARN util.Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045.
Spark context Web UI available at http://h-gpu05:4045
Spark context available as 'sc' (master = local[*], app id = local-1609984878068).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.0.1
/_/
Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_202)
Type in expressions to have them evaluated.
Type :help for more information.
scala> import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.{SparkContext, SparkConf}
scala> import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
scala> import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel
scala> import org.apache.log4j.{Level, Logger}
import org.apache.log4j.{Level, Logger}
scala> Logger.getRootLogger.setLevel(Level.WARN)
scala> val ssc = new StreamingContext(sc, Seconds(10))
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@5008c5a
scala> val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)
lines: org.apache.spark.streaming.dstream.ReceiverInputDStream[String] = org.apache.spark.streaming.dstream.SocketInputDStream@4c9a3ae
下記以降の行で、送られてくるデータ(linesに格納)への処理内容を記述する。
scala> val words = lines.flatMap(_.split(" "))
words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@191a8997
scala> val pairs = words.map((_, 1))
pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@1d67a1ad
scala> val wordCounts = pairs.reduceByKey(_ + _)
wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@215998b6
scala> wordCounts.print()
scala> ssc.start()
-------------------------------------------
Time: 1609985280000 ms
-------------------------------------------
次のように送信する。
$ nc -lk localhost 9999
hoge hoge hoge
Spark側の反応
-------------------------------------------
Time: 1609985290000 ms
-------------------------------------------
(hoge,3)
(`ー´)b
次に、jarファイルでSpark Streamingを実行してみる。
最終的には、このような出力になる。
hadoop@h-gpu05:/mnt/data/hadoop/nii-cyber-security-admin/spark/spark-streaming-1$ ${SPARK_HOME}/bin/spark-submit --master local[*] --class StreamingFirst target/scala-2.12/app_2.12-1.4.6.jar localhost 9999
2021-01-07 21:04:56,179 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-------------------------------------------
Time: 1610021100000 ms
-------------------------------------------
-------------------------------------------
Time: 1610021110000 ms
-------------------------------------------
-------------------------------------------
Time: 1610021120000 ms
-------------------------------------------
! jarファイルを作るときの覚書
Scala 2.11以降 (Scala-2.12.2 / sbt-1.4.6)以降は、sbt-assemblyは使わない。
sbt assemblyする場合、↓ のようなplugins.sbtを使うのだが、scala 2.11などではバージョンが合わず、ハマった。
hadoop@h-gpu05:/mnt/data/hadoop/nii-cyber-security-admin/spark/spark-streaming-1$ more plugins.sbt.bak
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.2.0")
Scala-2.12.2 / sbt-1.4.6以降であれば、
$sbt clean package
でSpark Streamingで使えるJARが作成できる。
コードを見てみる。。(いたってシンプルに書ける)
1import org.apache.spark.{SparkContext, SparkConf}
2import org.apache.spark.streaming.{Seconds, StreamingContext}
3import org.apache.spark.storage.StorageLevel
4import org.apache.log4j.{Level, Logger}
5
6object StreamingFirst {
7 def main(args: Array[String]) {
8 Logger.getRootLogger.setLevel(Level.WARN)
9
10 val sparkConf = new SparkConf().setAppName("StreamingFirst")
11 val sc = new SparkContext(sparkConf)
12 val ssc = new StreamingContext(sc, Seconds(10))
13 val lines = ssc.socketTextStream(args(0),
14 args(1).toInt,
15 StorageLevel.MEMORY_AND_DISK_SER)
16 val words = lines.flatMap(_.split(" ")).filter(_.nonEmpty)
17 val wordCounts = words.map((_, 1)).reduceByKey(_ + _)
18 wordCounts.print()
19 ssc.start()
20 ssc.awaitTermination()
21
22 }
23}
https://github.com/RuoAndo/qiita/blob/master/spark/spark-streaming-1/main.scala
https://github.com/RuoAndo/qiita/tree/master/spark/spark-streaming-1
実行(Spark-submit)してみる。。。
|゚Д゚;|
hadoop@h-gpu05:/mnt/data/hadoop/nii-cyber-security-admin/spark/spark-streaming-1$ sbt clean package
[info] Updated file /mnt/data/hadoop/nii-cyber-security-admin/spark/spark-streaming-1/project/build.properties: set sbt.version to 1.4.6
[info] welcome to sbt 1.4.6 (Private Build Java 1.8.0_275)
[info] loading project definition from /mnt/data/hadoop/nii-cyber-security-admin/spark/spark-streaming-1/project
[info] loading settings for project spark-streaming-1 from build.sbt ...
[info] set current project to App (in build file:/mnt/data/hadoop/nii-cyber-security-admin/spark/spark-streaming-1/)
[success] Total time: 0 s, completed Jan 7, 2021 9:18:47 PM
[warn] There may be incompatibilities among your library dependencies; run 'evicted' to see detailed eviction warnings.
[info] compiling 1 Scala source to /mnt/data/hadoop/nii-cyber-security-admin/spark/spark-streaming-1/target/scala-2.12/classes ...
[success] Total time: 9 s, completed Jan 7, 2021 9:18:57 PM
|*゚Д゚|
hadoop@h-gpu05:~/qiita/spark/spark-streaming-1$ nc -lk localhost 9999
|゚Д゚;|
hadoop@h-gpu05:/mnt/data/hadoop/nii-cyber-security-admin/spark/spark-streaming-1$ ${SPARK_HOME}/bin/spark-submit --master local[*] --class StreamingFirst target/scala-2.12/app_2.12-1.4.6.jar localhost 9999
2021-01-07 21:20:35,882 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-------------------------------------------
Time: 1610022040000 ms
-------------------------------------------
|*゚Д゚|
hadoop@h-gpu05:~/qiita/spark/spark-streaming-1$ nc -lk localhost 9999
hoge hoge hoge
|゚Д゚;|
-------------------------------------------
Time: 1610022090000 ms
-------------------------------------------
(hoge,3)
(`ー´)b