6
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Apache Sparkを利用したWord Count

Posted at

はじめに

以下の文章についてApache Sparkを利用してWord Countを実行するコードを書いていきます。

Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.

Java、Scala、sbtのインストール

JavaとScalaとsbtをインストールします。
今回は以下のバージョンをインストールします。

$ java -version
java version "1.8.0_31"
Java(TM) SE Runtime Environment (build 1.8.0_31-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.31-b07, mixed mode)
$ scala -version
Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL
$ sbt about
[info] Set current project to App (in build file:/Users/)
[info] This is sbt 0.13.8
[info] The current project is {file:/Users/} 1.0
[info] The current project is built against Scala 2.11.6
[info] Available Plugins: sbt.plugins.IvyPlugin, sbt.plugins.JvmPlugin, sbt.plugins.CorePlugin, sbt.plugins.JUnitXmlReportPlugin
[info] sbt, sbt plugins, and build definitions are using Scala 2.10.4

実装1

以下の内容をbuild.sbtとして保存します。
今回はApache Spark 1.4.1を利用します。

build.sbt
name := "App"
version := "1.0"
scalaVersion := "2.11.6"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.4.1"
)

次に以下の内容をmain.scalaとして保存します。

main.scala
import org.apache.spark.SparkContext

object Main {
  def main(args: Array[String]) {
    val sc = new SparkContext("local[*]", "App") // ローカルモード
    val rdd = sc.textFile("sample.txt")
      .flatMap(r => r.split(" "))
      .map(r => (r, 1))
      .cache
      .reduceByKey(_ + _)
      .collect.foreach { r => 
        println(r._1.mkString("", "", "") + "\t" + r._2)
      }

    sc.stop()
  }
}

flatMapは、要素を別の型に変換します。
次のmap(r => (r, 1)) で(Apache, 1) (Spark, 1) というペアにします。
reduceByKey(_ + _) でカウントアップします。
reduceByKey(_ + _) は reduceByKey((x, y) => x + y) と等価です。

以下の内容をsample.txtとして保存します。

sample.txt
Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.

実行1

実行方法は以下のとおりです。

$ sbt run

大量の実行ログの中にprintlnの内容が表示されます。

GraphX	1
Python	1
provides	1
is	1
R,	1
higher-level	1
general	1
fast	1
Java,	1
SQL	2
Apache	1
data	1
learning,	1
cluster	1
graph	1
execution	1
MLlib	1
Scala,	1
computing	1
supports	2
engine	1
set	1
rich	1
Streaming.	1
Spark	3
graphs.	1
general-purpose	1
APIs	1
that	1
a	2
high-level	1
including	1
optimized	1
in	1
system.	1
of	1
tools	1
also	1
structured	1
It	2
for	3
an	1
machine	1
and	5
processing,	2

実装2

次に複数行ある場合を考えます。
以下の内容をsample.txtとして保存します。

sample.txt
A
B
C
A
B
A

次に以下の内容をmain.scalaとして保存します。
実装1との違いはflatMapがなくなっただけです。

main.scala
import org.apache.spark.SparkContext

object Main {
  def main(args: Array[String]) {
    val sc = new SparkContext("local[*]", "App") // ローカルモード
    val rdd = sc.textFile("sample.txt")
      .map(r => (r, 1))
      .cache
      .reduceByKey(_ + _)
      .collect.foreach { r => 
        println(r._1.mkString("", "", "") + "\t" + r._2)
      }

    sc.stop()
  }
}

実行2

実行結果は以下のとおりです。

B	2
A	3
C	1
6
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?