はじめに
以下の文章についてApache Sparkを利用してWord Countを実行するコードを書いていきます。
Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.
Java、Scala、sbtのインストール
JavaとScalaとsbtをインストールします。
今回は以下のバージョンをインストールします。
$ java -version
java version "1.8.0_31"
Java(TM) SE Runtime Environment (build 1.8.0_31-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.31-b07, mixed mode)
$ scala -version
Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL
$ sbt about
[info] Set current project to App (in build file:/Users/)
[info] This is sbt 0.13.8
[info] The current project is {file:/Users/} 1.0
[info] The current project is built against Scala 2.11.6
[info] Available Plugins: sbt.plugins.IvyPlugin, sbt.plugins.JvmPlugin, sbt.plugins.CorePlugin, sbt.plugins.JUnitXmlReportPlugin
[info] sbt, sbt plugins, and build definitions are using Scala 2.10.4
実装1
以下の内容をbuild.sbtとして保存します。
今回はApache Spark 1.4.1を利用します。
name := "App"
version := "1.0"
scalaVersion := "2.11.6"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.4.1"
)
次に以下の内容をmain.scalaとして保存します。
import org.apache.spark.SparkContext
object Main {
def main(args: Array[String]) {
val sc = new SparkContext("local[*]", "App") // ローカルモード
val rdd = sc.textFile("sample.txt")
.flatMap(r => r.split(" "))
.map(r => (r, 1))
.cache
.reduceByKey(_ + _)
.collect.foreach { r =>
println(r._1.mkString("", "", "") + "\t" + r._2)
}
sc.stop()
}
}
flatMap
は、要素を別の型に変換します。
次のmap(r => (r, 1))
で(Apache, 1) (Spark, 1) というペアにします。
reduceByKey(_ + _)
でカウントアップします。
reduceByKey(_ + _)
は reduceByKey((x, y) => x + y)
と等価です。
以下の内容をsample.txtとして保存します。
Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.
実行1
実行方法は以下のとおりです。
$ sbt run
大量の実行ログの中にprintlnの内容が表示されます。
GraphX 1
Python 1
provides 1
is 1
R, 1
higher-level 1
general 1
fast 1
Java, 1
SQL 2
Apache 1
data 1
learning, 1
cluster 1
graph 1
execution 1
MLlib 1
Scala, 1
computing 1
supports 2
engine 1
set 1
rich 1
Streaming. 1
Spark 3
graphs. 1
general-purpose 1
APIs 1
that 1
a 2
high-level 1
including 1
optimized 1
in 1
system. 1
of 1
tools 1
also 1
structured 1
It 2
for 3
an 1
machine 1
and 5
processing, 2
実装2
次に複数行ある場合を考えます。
以下の内容をsample.txtとして保存します。
A
B
C
A
B
A
次に以下の内容をmain.scalaとして保存します。
実装1との違いはflatMapがなくなっただけです。
import org.apache.spark.SparkContext
object Main {
def main(args: Array[String]) {
val sc = new SparkContext("local[*]", "App") // ローカルモード
val rdd = sc.textFile("sample.txt")
.map(r => (r, 1))
.cache
.reduceByKey(_ + _)
.collect.foreach { r =>
println(r._1.mkString("", "", "") + "\t" + r._2)
}
sc.stop()
}
}
実行2
実行結果は以下のとおりです。
B 2
A 3
C 1