0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Apache SparkのStreamingを使ってみる

Posted at

■ こちらの環境
OS: Ubuntu 16 or 18
Hadoop: hadoop-3.2.1.tar.gz
JDK (Java): jdk-8u202-linux-x64.tar.gz
Spark: spark-3.0.1-bin-hadoop3.2.tgz

ネームノード
192.168.76.216: h-gpu05

データノード
192.168.76.210: h-gpu03
192.168.76.210: h-gpu04

ScalaとSbtのバージョン


hadoop@h-gpu05:/mnt/data/hadoop$ scala -version                                                                                                                                                           
Scala code runner version 2.12.2 -- Copyright 2002-2017, LAMP/EPFL and Lightbend, Inc.
hadoop@h-gpu05:/mnt/data/hadoop$ sbt -version                                                                                                                                                             
sbt script version: 1.4.6

まずは、Spark-shellで Spark Streamingを試す。
今回はlocalhostのport9999番へSpark-Streamingで処理するデータを送信します。

ポートを開ける


$nc -lk localhost 9999

hadoop@h-gpu05:/mnt/data/hadoop/spark-3.0.1-bin-hadoop3.2/bin$ ./spark-shell 
2021-01-07 11:01:12,652 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2021-01-07 11:01:17,935 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
2021-01-07 11:01:17,936 WARN util.Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
2021-01-07 11:01:17,936 WARN util.Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
2021-01-07 11:01:17,936 WARN util.Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
2021-01-07 11:01:17,936 WARN util.Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045.
Spark context Web UI available at http://h-gpu05:4045
Spark context available as 'sc' (master = local[*], app id = local-1609984878068).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.0.1
      /_/
         
Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_202)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import org.apache.spark.{SparkContext, SparkConf}                                                                                                                                                  
import org.apache.spark.{SparkContext, SparkConf}

scala> import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

scala> import org.apache.spark.storage.StorageLevel   
import org.apache.spark.storage.StorageLevel

scala> import org.apache.log4j.{Level, Logger}  
import org.apache.log4j.{Level, Logger}

scala> Logger.getRootLogger.setLevel(Level.WARN)

scala>  val ssc = new StreamingContext(sc, Seconds(10)) 
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@5008c5a

scala> val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)                                                                                                              
lines: org.apache.spark.streaming.dstream.ReceiverInputDStream[String] = org.apache.spark.streaming.dstream.SocketInputDStream@4c9a3ae

下記以降の行で、送られてくるデータ(linesに格納)への処理内容を記述する。



scala> val words = lines.flatMap(_.split(" "))                                                                                                                                                            
words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@191a8997

scala> val pairs = words.map((_, 1))                                                                                                                                                                      
pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@1d67a1ad

scala> val wordCounts = pairs.reduceByKey(_ + _)                                                                                                                                                          
wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@215998b6

scala> wordCounts.print()

scala> ssc.start()

-------------------------------------------                                     
Time: 1609985280000 ms
-------------------------------------------

次のように送信する。


$ nc -lk localhost 9999
hoge hoge hoge

Spark側の反応


-------------------------------------------                                     
Time: 1609985290000 ms
-------------------------------------------
(hoge,3)

(`ー´)b

次に、jarファイルでSpark Streamingを実行してみる。
最終的には、このような出力になる。


hadoop@h-gpu05:/mnt/data/hadoop/nii-cyber-security-admin/spark/spark-streaming-1$ ${SPARK_HOME}/bin/spark-submit --master local[*] --class StreamingFirst target/scala-2.12/app_2.12-1.4.6.jar localhost 9999
2021-01-07 21:04:56,179 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-------------------------------------------
Time: 1610021100000 ms
-------------------------------------------
-------------------------------------------
Time: 1610021110000 ms
-------------------------------------------
-------------------------------------------
Time: 1610021120000 ms
-------------------------------------------

! jarファイルを作るときの覚書

Scala 2.11以降 (Scala-2.12.2 / sbt-1.4.6)以降は、sbt-assemblyは使わない。

sbt assemblyする場合、↓ のようなplugins.sbtを使うのだが、scala 2.11などではバージョンが合わず、ハマった。


hadoop@h-gpu05:/mnt/data/hadoop/nii-cyber-security-admin/spark/spark-streaming-1$ more plugins.sbt.bak                                                                                                    
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.2.0")

Scala-2.12.2 / sbt-1.4.6以降であれば、


$sbt clean package

でSpark Streamingで使えるJARが作成できる。

コードを見てみる。。(いたってシンプルに書ける)


 1import org.apache.spark.{SparkContext, SparkConf}                                                                                                                                                       
 2import org.apache.spark.streaming.{Seconds, StreamingContext}                                                                                                                                           
 3import org.apache.spark.storage.StorageLevel                                                                                                                                                            
 4import org.apache.log4j.{Level, Logger}                                                                                                                                                                 
 5                                                                                                                                                                                                        
 6object StreamingFirst {                                                                                                                                                                                 
 7  def main(args: Array[String]) {                                                                                                                                                                       
 8    Logger.getRootLogger.setLevel(Level.WARN)                                                                                                                                                           
 9                                                                                                                                                                                                        
10    val sparkConf = new SparkConf().setAppName("StreamingFirst")                                                                                                                                        
11    val sc = new SparkContext(sparkConf)                                                                                                                                                                
12    val ssc = new StreamingContext(sc, Seconds(10))                                                                                                                                                     
13    val lines = ssc.socketTextStream(args(0),                                                                                                                                                           
14                                     args(1).toInt,                                                                                                                                                     
15                                     StorageLevel.MEMORY_AND_DISK_SER)                                                                                                                                  
16    val words = lines.flatMap(_.split(" ")).filter(_.nonEmpty)                                                                                                                                          
17    val wordCounts = words.map((_, 1)).reduceByKey(_ + _)                                                                                                                                               
18    wordCounts.print()                                                                                                                                                                                  
19    ssc.start()                                                                                                                                                                                         
20    ssc.awaitTermination()                                                                                                                                                                              
21                                                                                                                                                                                                        
22  }                                                                                                                                                                                                     
23}      

https://github.com/RuoAndo/qiita/blob/master/spark/spark-streaming-1/main.scala
https://github.com/RuoAndo/qiita/tree/master/spark/spark-streaming-1

実行(Spark-submit)してみる。。。

|゚Д゚;|


hadoop@h-gpu05:/mnt/data/hadoop/nii-cyber-security-admin/spark/spark-streaming-1$ sbt clean package
[info] Updated file /mnt/data/hadoop/nii-cyber-security-admin/spark/spark-streaming-1/project/build.properties: set sbt.version to 1.4.6                                                                  
[info] welcome to sbt 1.4.6 (Private Build Java 1.8.0_275)                                                                                                                                                
[info] loading project definition from /mnt/data/hadoop/nii-cyber-security-admin/spark/spark-streaming-1/project                                                                                          
[info] loading settings for project spark-streaming-1 from build.sbt ...                                                                                                                                  
[info] set current project to App (in build file:/mnt/data/hadoop/nii-cyber-security-admin/spark/spark-streaming-1/)                                                                                      
[success] Total time: 0 s, completed Jan 7, 2021 9:18:47 PM                                                                                                                                               
[warn] There may be incompatibilities among your library dependencies; run 'evicted' to see detailed eviction warnings.                                                                                   
[info] compiling 1 Scala source to /mnt/data/hadoop/nii-cyber-security-admin/spark/spark-streaming-1/target/scala-2.12/classes ...                                                                        
[success] Total time: 9 s, completed Jan 7, 2021 9:18:57 PM     

|*゚Д゚|


hadoop@h-gpu05:~/qiita/spark/spark-streaming-1$ nc -lk localhost 9999

|゚Д゚;|


hadoop@h-gpu05:/mnt/data/hadoop/nii-cyber-security-admin/spark/spark-streaming-1$ ${SPARK_HOME}/bin/spark-submit --master local[*] --class StreamingFirst target/scala-2.12/app_2.12-1.4.6.jar localhost 9999                                                                                                                                                                                                       
2021-01-07 21:20:35,882 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-------------------------------------------
Time: 1610022040000 ms
-------------------------------------------

|*゚Д゚|


hadoop@h-gpu05:~/qiita/spark/spark-streaming-1$ nc -lk localhost 9999
hoge hoge hoge

|゚Д゚;|


-------------------------------------------
Time: 1610022090000 ms
-------------------------------------------
(hoge,3)

(`ー´)b

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?