Apache SparkがHadoopよりいけてそうなので調べて使ってみました。
Apache Sparkについて
概要
SparkとはHadoop(MapReduce)を拡張した次世代のビックデータ解析エンジン。
- リッチなAPIがあるので、使うのがより簡単
- より高速
- 対話的クエリ、ストリーミング、マシンラーニング、グラフ処理などをサポート
HadoopのMapReduceの時はdisk上で計算が走っていたけれど、メモリ上で走らせることが高速化の要因の一つのようです。
コンポーネント
下記の代表的なコンポーネントがあるようです。
- Spark Core
Sparkの基本機能を含むコンポーネント - Spark SQL
Apache HiveのようにSQLを通してデータにアクセス - Spark Streaming
データのライブストリーミングを可能にするコンポーネント - MLlib
マシンラーニングのアルゴリズムを提供するコンポーネント - GraphX
グラフデータ処理のためのコンポーネント - Cluster Managers
クラスタ管理
APIで使える言語
下記の言語のものが使用できるようです。
- Python
- Java
- Scala
- SQL
Spark自身はScalaで書かれています。
RDDsについて
RDDs(resilient distributed data‐ sets)とは複数のクラスタにまたがって並列実行されるアイテムのコレクションのこと。
Sparkはそれを構築、操作するAPIを使うことでプログラミングを抽象化しているとのことです。
Apache Sparkを試してみる
環境
試した環境は以下のとおりです。
環境 | バージョンなど |
---|---|
OS | Mac OS X Yosemite(10.10.4) |
Java | 1.7.0 |
Scala | 2.11.7 |
Gradle | 2.0 |
※Sparkはscalaを内包しているようなので、spark-shellの実行だけならScalaは必要ありません。(Gradleも)
インストール
インストールはMacであればhomebrewで。
$ brew install apache-spark
実行すると下記にインストールされました。
現在の最新は1.4.1なので、それよりは古いです。
/usr/local/Cellar/apache-spark/1.4.0
ちなみにhomebrewだとsparkというパッケージも見つかるのですが、これは別物なので注意してください。
[MAC]間違ったsparkをインストールしてみた
Spark-shellを使ってみる
自動的にPATHが通っているので、下記のコマンドでspark-shellを起動します。
$ spark-shell
すると、なんとScalaのシェルが起動します。
ちなみに、log4jのログが出ますが、デフォルトだと下記の場所にtemplateがあるので、これをlog4j.propertiesという名前で同じディレクトリにコピーすればログレベルなどいじれます。
/usr/local/Cellar/apache-spark/1.4.0/libexec/conf/log4j.properties.template
例えば、下記のようにすればWARNのみ表示します。
log4j.rootCategory=WARN, console
また、spark-shellを起動させた状態であれば、ブラウザで下記にアクセスすればSpark UIをみることができます。
http://localhost:4040/
ちなみに、spark-shellを起動した時の出力結果は下記の通りです。
INFOのログがたくさんあったので、ログレベルをWARNのみにして表示しています。
bash-3.2$ spark-shell
15/08/09 12:17:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.4.0
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_60)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
15/08/09 12:18:05 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
15/08/09 12:18:05 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
15/08/09 12:18:09 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 0.13.1aa
SQL context available as sqlContext.
気になったこととして。。。
- native-hadoop libraryがloadできていない
hadoop自体はインストールしていたんだけれど。。。
多分confの設定かな。 - Scalaのバージョンは2.10.4
2.11系ではないようです。
最新版の1.4.1のドキュメントを見ると、下記のように書いてあり、ビルドすれば良い様子。
Building for Scala 2.11
ただし、JDBCコンポーネントが2.11に対応していないらしく、まだ2.10を使うのが無難なようです。 - Spark contextなるものをscとして使える
これは複数の計算クラスタに接続するためのオブジェクト。
クラスタへのアクセスはRDDsを介して行うので抽象化されている。 - SQL contextなるものをsqlContextとして使える
使い方は未調査。
それでは実際に、spark-shellを触ってみます。
SparkのQuick Startより、README.mdの行数を調べてみます。
scala> val textFile = sc.textFile("/usr/local/Cellar/apache-spark/1.4.0/README.md")
textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at textFile at <console>:21
scala> textFile.count()
textFile.count()
res3: Long = 98
scala> textFile.first()
textFile.first()
res4: String = # Apache Spark
ここで、このtextFileがRDDと呼ばれるコレクションにあたります。
Filterも使えます。Sparkのwordが含まれるlineのみを抜き出してcountを表示させる場合は、
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at filter at <console>:23
scala> linesWithSpark.count()
linesWithSpark.count()
res6: Long = 19
scala> textFile.filter(line => line.contains("Spark")).count()
textFile.filter(line => line.contains("Spark")).count()
res7: Long = 19
Filterのよう操作もクラスタにまたがって実行されるようです。
※SparkにはActionとTransitionの関数があり、Transitionは複数のワーカーノードにまたがって実行される(8/15追記)
ちなみに、ScalaではなくPythonのspark-shellもあります。そちらを使いたい場合は下記を実行します。
$ pyspark
Standaloneアプリケーションとして実行してみる
今度はspark-shellでやったことをStandaloneなプログラムで実行してみます。
コードは https://github.com/khiraiwa/spark-learning にあります。
spark-standalone-sampleというプロジェクトです。
ソースコード
下記のようになります。DocumentのQuickStartを元にしています。
package sample
import org.apache.spark.{SparkContext, SparkConf}
object SimpleApp {
val file = "/usr/local/Cellar/apache-spark/1.4.0/README.md"
def main(args: Array[String]) {
// SparkContextの初期化
// localを指定して、ローカルで1 theadで実行
// setMasterは省略可
val conf = new SparkConf().setMaster("local").setAppName("spark-standalone-sample")
val sc = new SparkContext(conf)
// ファイルを読み込みます
// 第二引数は最小パーティション(入力データ分割)数とのこと.省略可能.
// cache()を呼ぶとクラスター間でin-memoryキャシュされます
val fileData = sc.textFile(file, 2).cache()
// カウントします
val numAs = fileData.filter(line => line.contains("spark")).count()
val numBs = fileData.filter(line => line.contains("scala")).count()
println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
}
}
ほとんどspark-shellでやっていたこと、そのままです。
ビルドと実行
QuickStartではsbtでやっていますが、ここではGradleでやってみました。
実行にはSpark Coreのコンポーネントが必要です。
2.11系のspark-coreライブラリがMaven Centralに上がっているので、build.gradleに下記のように書くだけで2.11系対応のsparkが使えます。
また、manifestのMain-Classを指定しておかないと、実行時に--classオプションでクラス名を渡す必要が出てきます。
dependencies {
compile "org.scala-lang:scala-compiler:2.11.7"
compile "org.scala-lang:scala-library:2.11.7"
compile "org.apache.spark:spark-core_2.11:1.4.1"
}
jar {
manifest {
attributes "Main-Class" : "sample.SimpleApp"
}
from configurations.compile.collect { it.isDirectory() ? it : zipTree(it) }
}
実行は、下記のように、spark-submitコマンドを使います。
$ git clone git@github.com:khiraiwa/spark-learning.git
$ cd cd spark-learning/spark-standalone-sample/
$ gradle shadowJar
:compileJava UP-TO-DATE
:compileScala
:processResources UP-TO-DATE
:classes
:shadowJar
BUILD SUCCESSFUL
Total time: 45.596 secs
$ spark-submit build/libs/spark-standalone-sample-0.1-all.jar
15/08/09 16:47:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Lines with a: 11, Lines with b: 1
正しく結果が表示されました。
ちなみにアプリケーション実行中も、上記のSpark UIにアクセスして進捗など見れるようです。
補足
普通にgradle jarコマンド等で作ったJARファイルを実行すると下記のExceptionが出て終了してしまいます。
bash-3.2$ spark-submit build/libs/spark-standalone-sample-0.1.jar
spark-submit build/libs/spark-standalone-sample-0.1.jar
Exception in thread "main" java.lang.SecurityException: Invalid signature file digest for Manifest main attributes
どうやらMETA-INFの中のsignature fileの問題のようです。
解決方法はJARファイルの中のMETA-INF/ECLIPSEF.RSAを削除して再びJARに戻せば良いので下記のようにすればよいです。
zip -d spark-standalone-sample-0.1.jar META-INF/*.RSA META-INF/*.DSA
しかし、毎回作りなおすのは面倒だし、今回はGradleを使っているので、Gradle Shadowプラグインを使ってJARファイル作成時にExcludeするようにしました。
shadowJar {
mergeServiceFiles {
exclude 'META-INF/ECLIPSEF.RSA'
}
}
今日のところはここまで。