Scala
CentOS
Spark
ApacheSpark

Apache Spark 2.3 でWordCount

こちらを参考に、Apache Spark ScalaでWordCountを実行してみました。環境は次の通りです。

・CentOS 7.5

・Apache Spark 2.3.1

・Scala 2.12.6

・sbt 1.1.6


Spark、Scala、sbtのインストール

Apache Sparkのインストールと環境変数の設定を実施

Scalaのインストールと環境変数の設定

# wget https://downloads.lightbend.com/scala/2.12.6/scala-2.12.6.tgz

# tar xvzf scala-2.12.6.tgz
# mv scala-2.12.6 /usr/local/scala

# vi /etc/profile
# export SCALA_HOME=/usr/local/scala
# export PATH=$PATH:$SCALA_HOME/bin

source /etc/profile

Scala sbtのインストール、Version確認

# curl https://bintray.com/sbt/rpm/rpm | sudo tee /etc/yum.repos.d/bintray-sbt-rpm.repo

# sudo yum install sbt

# sbt sbtVersion


sbtビルド環境の準備

下記のディレクトリ (output以外) 及びファイルを、事前に準備する

ディレクトリ

WordCount/

├── build.sbt (ビルド方法を定義)
├── input.txt (ワードカウント対象のファイル)
├── output/  (ワードカウントの結果:sbt runにて作成される)

├── project/ (sbtの追加設定)
│ └── assembly.sbt (sbtのプラグイン)
└── src/
  └──main/
    └── scala/
     └── jp/
      └── hoge/
        └── news/
          └── WordCountApp.scala

・build.sbt

こちらを参考にscalaVersionは2.11.7、spark-coreは2.1.0を指定

※Spark 2.3.1は、Scala 2.11でコンパイルされている為、ScalaのVersionを下げる必要性

name := "WordCountApp"

version := "1.0.0"
scalaVersion := "2.11.7"
resolvers += "Atilika Open Source repository" at "http://www.atilika.org/nexus/content/repositories/atilika"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
libraryDependencies += "org.atilika.kuromoji" % "kuromoji" % "0.7.7"
assemblyMergeStrategy in assembly := {
case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
case PathList(ps @ _*) if ps.last endsWith ".properties" => MergeStrategy.first
case PathList(ps @ _*) if ps.last endsWith ".xml" => MergeStrategy.first
case PathList(ps @ _*) if ps.last endsWith ".types" => MergeStrategy.first
case PathList(ps @ _*) if ps.last endsWith ".class" => MergeStrategy.first
case "application.conf" => MergeStrategy.concat
case "unwanted.txt" => MergeStrategy.discard
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
}
mainClass in assembly := Some("WordCountApp")

・assembly.sbt

Scala JAR 、lib以下のJARをすべて含めたfat JARを下記の指定で生成

こちらより、sbt version 1.xはsbt-assembly0.14.6をサポート

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.6")    

・WordCountApp.scala

package jp.hoge.news

import java.util.regex.{Matcher, Pattern}
import scala.collection.convert.WrapAsScala._
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext._
import org.apache.spark.SparkContext
import org.atilika.kuromoji.Tokenizer
import org.atilika.kuromoji.Token

object WordCountApp{
def main(args: Array[String]) {
//スパークの環境設定
val sparkConf = new SparkConf().setMaster("local[4]").setAppName("WordCount App")
val sc = new SparkContext(sparkConf)
//kuromojiのトークナイザ
val tokenizer = Tokenizer.builder.mode(Tokenizer.Mode.NORMAL).build()
//テキストファイルから1行ずつ読み込み。名詞を配列に分解する。
//テキストファイルからRDDオブジェクトを取得する。
val input = sc.textFile("input.txt").flatMap(line => {
val tokens : java.util.List[Token] = Tokenizer.builder().build().tokenize(line)
val output : scala.collection.mutable.ArrayBuffer[String] = new collection.mutable.ArrayBuffer[String]()
tokens.foreach(token => {
if(token.getAllFeatures().indexOf("名詞") != -1) {
      output += token.getSurfaceForm()
}})
output// return
    })
//ワードカウントを行う。数える名詞をキーにし、キーを元に加算処理を行う。
val wordCounts = input.map(x => (x, 1L)).reduceByKey((x, y)=> x + y)
//降順に単語を列挙して出力する。
val output = wordCounts.map( x => (x._2, x._1)).sortByKey(false).saveAsTextFile("output")
}
}

・input.txt

Apache Sparkはオープンソースのクラスタコンピューティングフレームワークである。カリフォルニア大学バークレー校のAMPLabで開発されたコードが、管理元のApacheソフトウェア財団に寄贈された。Sparkのインタフェースを使うと、暗黙のデータ並列性と耐故障性を備えたクラスタ全体をプログラミングできる。日経BP社が発表した「ITインフラテクノロジーAWARD 2015」において、SparkはDockerに次ぐ準グランプリとされた。

フォールトトレラントシステムで管理され、複数マシンのクラスタに分散されたデータ項目の読み取り専用多重集合であるRDD(resilient distributed dataset)と呼ばれるデータ構造を中心とするアプリケーションプログラミングインターフェイスを備えている。MapReduceは、分散プログラム上で特定の線形データフロー構造を強制するクラスタコンピューティングプログラミングパラダイムの制限に対応して開発された。MapReduceは、ディスクから入力データを読み込み、データ全体に関数をマップし、削減結果をディスクに格納する。SparkのRDDは、 分散共有メモリの (意図的に)制限された形式で提供する分散プログラムのワーキングセットとして機能する。

RDDの可用性は、ループ内で複数回データセットを参照する反復法アルゴリズム、および対話型/探索型データ分析、データ反復のデータベースクエリの両方の実装を容易にする。このようなアプリケーションのレイテンシ(Apache Hadoopスタックでは一般的であったMapReduce実装と比較して)は、桁違いに低下する可能性がある。反復アルゴリズムのクラスの中には、 機械学習のための訓練アルゴリズムがあり、Apache Sparkを開発の初期の刺激となった。クラスタマネージャと分散ストレージシステムが必要であり、クラスタ管理のためにスタンドアロン(ネイティブのSparkクラスタ)、Hadoop YARN、Apache Mesosに対応している。分散ストレージの場合、Hadoop分散ファイルシステム、MapRファイルシステム(MapR-FS)、Apache Cassandra、OpenStack Swift、Amazon S3、Kudu、カスタムソリューションを実装できる。擬似分散ローカルモードも対応しており通常は開発やテスト目的でのみ使用される。分散ストレージは不要でローカルファイルシステムを代わりに使用でき、CPUマルチコアごとに1台のマシン上で実行される。


sbtの実行

bulid.batのあるプロジェクトルートで下記を実施

# sbt run

successで実行が終了し、outputフォルダ配下のpart-00000xを参照し、WordCountが実行された事を確認する

# vi output/part-00000

(9,分散)
(8,データ)
(6,Apache)
(6,Spark)
(5,クラスタ)
(5,システム)
(4,開発)
(3,反復)
(3,実装)
(3,MapReduce)
(3,RDD)
(3,対応)
(3,ファイル)
(3,ストレージ)