はじめに
- Apache Spark StreamingをMacのローカル環境でサクっと動かしたい。
sparkのインストール
$ brew install apache-spark
ソースコード
build.sbt
name := "spark-sample"
scalaVersion := "2.11.12"
resolvers += "Typesafe Releases" at "https://repo.typesafe.com/typesafe/releases/"
javacOptions ++= Seq("-source", "1.8", "-target", "1.8", "-Xlint")
libraryDependencies ++= Seq("org.apache.spark" %% "spark-streaming" % "2.4.6")
Mainクラス
localhostの9999番ポートから5秒ごとに文字列を取得してカウント結果を出力するコード。
import org.apache.spark.streaming._
import org.apache.spark._
object StreamingSample {
def main(args: Array[String]) {
// SparkStreamingコンテキストを作成
val conf = new SparkConf().setAppName("Streaming Sample")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5))
// ストリームからデータを読み込み処理をする
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// 結果を出力する
wordCounts.print()
// コンテキストの起動と維持
ssc.start()
ssc.awaitTermination()
}
}
実行
ローカルにncコマンドで疑似ストリームを作り文字を入れる。
$ nc -lk 9999
aaa
aaa
bbb
bbb
ccc
jar作成
$ sbt clean package
作成したjar実行
$ /usr/local/Cellar/apache-spark/2.4.6/libexec/bin/spark-submit --class StreamingSample target/scala-2.11/spark-sample_2.11-0.1.0-SNAPSHOT.jar
出力結果
ちゃんと5秒ごとにカウント結果が出力される。
-------------------------------------------
Time: 1592726930000 ms
-------------------------------------------
(aaa,2)
(bbb,2)
(ccc,1)
まとめ
- Sparkは難しいイメージがあったが、Sparkのインストールからローカル実行まであっという間にできた。
- Scala12やJava11を使うとエラーなどでうまくいかなかったので今回はScala11・Java8で実行した。Spark2.4系はScala12用のライブラリも提供されていて動かないことはないと思うので調べて動くところまではやってみたい。