LoginSignup
34

More than 5 years have passed since last update.

Apache Sparkの概要調査とQuickStartやってみた

Last updated at Posted at 2015-08-09

Apache SparkがHadoopよりいけてそうなので調べて使ってみました。

Apache Sparkについて

概要

SparkとはHadoop(MapReduce)を拡張した次世代のビックデータ解析エンジン。

  • リッチなAPIがあるので、使うのがより簡単
  • より高速
  • 対話的クエリ、ストリーミング、マシンラーニング、グラフ処理などをサポート

HadoopのMapReduceの時はdisk上で計算が走っていたけれど、メモリ上で走らせることが高速化の要因の一つのようです。

コンポーネント

下記の代表的なコンポーネントがあるようです。

  • Spark Core
    Sparkの基本機能を含むコンポーネント
  • Spark SQL
    Apache HiveのようにSQLを通してデータにアクセス
  • Spark Streaming
    データのライブストリーミングを可能にするコンポーネント
  • MLlib
    マシンラーニングのアルゴリズムを提供するコンポーネント
  • GraphX
    グラフデータ処理のためのコンポーネント
  • Cluster Managers
    クラスタ管理

APIで使える言語

下記の言語のものが使用できるようです。

  • Python
  • Java
  • Scala
  • SQL

Spark自身はScalaで書かれています。

RDDsについて

RDDs(resilient distributed data‐ sets)とは複数のクラスタにまたがって並列実行されるアイテムのコレクションのこと。
Sparkはそれを構築、操作するAPIを使うことでプログラミングを抽象化しているとのことです。

Apache Sparkを試してみる

環境

試した環境は以下のとおりです。

環境 バージョンなど
OS Mac OS X Yosemite(10.10.4)
Java 1.7.0
Scala 2.11.7
Gradle 2.0

※Sparkはscalaを内包しているようなので、spark-shellの実行だけならScalaは必要ありません。(Gradleも)

インストール

インストールはMacであればhomebrewで。

$ brew install apache-spark

実行すると下記にインストールされました。

現在の最新は1.4.1なので、それよりは古いです。

/usr/local/Cellar/apache-spark/1.4.0

ちなみにhomebrewだとsparkというパッケージも見つかるのですが、これは別物なので注意してください。
[MAC]間違ったsparkをインストールしてみた

Spark-shellを使ってみる

自動的にPATHが通っているので、下記のコマンドでspark-shellを起動します。

$ spark-shell

すると、なんとScalaのシェルが起動します。

ちなみに、log4jのログが出ますが、デフォルトだと下記の場所にtemplateがあるので、これをlog4j.propertiesという名前で同じディレクトリにコピーすればログレベルなどいじれます。

/usr/local/Cellar/apache-spark/1.4.0/libexec/conf/log4j.properties.template

例えば、下記のようにすればWARNのみ表示します。

log4j.properties.template
log4j.rootCategory=WARN, console

また、spark-shellを起動させた状態であれば、ブラウザで下記にアクセスすればSpark UIをみることができます。

http://localhost:4040/

SparkUI.png

ちなみに、spark-shellを起動した時の出力結果は下記の通りです。

INFOのログがたくさんあったので、ログレベルをWARNのみにして表示しています。

bash-3.2$ spark-shell
15/08/09 12:17:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.4.0
      /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_60)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
15/08/09 12:18:05 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
15/08/09 12:18:05 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
15/08/09 12:18:09 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 0.13.1aa
SQL context available as sqlContext.

気になったこととして。。。

  • native-hadoop libraryがloadできていない
    hadoop自体はインストールしていたんだけれど。。。
    多分confの設定かな。
  • Scalaのバージョンは2.10.4
    2.11系ではないようです。
    最新版の1.4.1のドキュメントを見ると、下記のように書いてあり、ビルドすれば良い様子。
    Building for Scala 2.11
    ただし、JDBCコンポーネントが2.11に対応していないらしく、まだ2.10を使うのが無難なようです。
  • Spark contextなるものをscとして使える
    これは複数の計算クラスタに接続するためのオブジェクト。
    クラスタへのアクセスはRDDsを介して行うので抽象化されている。
  • SQL contextなるものをsqlContextとして使える
    使い方は未調査。

それでは実際に、spark-shellを触ってみます。

SparkのQuick Startより、README.mdの行数を調べてみます。

scala> val textFile = sc.textFile("/usr/local/Cellar/apache-spark/1.4.0/README.md")
textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at textFile at <console>:21

scala> textFile.count()
textFile.count()
res3: Long = 98

scala> textFile.first()
textFile.first()
res4: String = # Apache Spark

ここで、このtextFileがRDDと呼ばれるコレクションにあたります。

Filterも使えます。Sparkのwordが含まれるlineのみを抜き出してcountを表示させる場合は、

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at filter at <console>:23

scala> linesWithSpark.count()
linesWithSpark.count()
res6: Long = 19

scala> textFile.filter(line => line.contains("Spark")).count()
textFile.filter(line => line.contains("Spark")).count()
res7: Long = 19

Filterのよう操作もクラスタにまたがって実行されるようです。
※SparkにはActionとTransitionの関数があり、Transitionは複数のワーカーノードにまたがって実行される(8/15追記)

ちなみに、ScalaではなくPythonのspark-shellもあります。そちらを使いたい場合は下記を実行します。

$ pyspark

Standaloneアプリケーションとして実行してみる

今度はspark-shellでやったことをStandaloneなプログラムで実行してみます。

コードは https://github.com/khiraiwa/spark-learning にあります。

spark-standalone-sampleというプロジェクトです。

ソースコード

下記のようになります。DocumentのQuickStartを元にしています。

package sample

import org.apache.spark.{SparkContext, SparkConf}

object SimpleApp {
  val file = "/usr/local/Cellar/apache-spark/1.4.0/README.md"

  def main(args: Array[String]) {

    // SparkContextの初期化
    // localを指定して、ローカルで1 theadで実行
    // setMasterは省略可
    val conf = new SparkConf().setMaster("local").setAppName("spark-standalone-sample")
    val sc = new SparkContext(conf)

    // ファイルを読み込みます
    // 第二引数は最小パーティション(入力データ分割)数とのこと.省略可能.
    // cache()を呼ぶとクラスター間でin-memoryキャシュされます
    val fileData = sc.textFile(file, 2).cache()

    // カウントします
    val numAs = fileData.filter(line => line.contains("spark")).count()
    val numBs = fileData.filter(line => line.contains("scala")).count()
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
  }
}

ほとんどspark-shellでやっていたこと、そのままです。

ビルドと実行

QuickStartではsbtでやっていますが、ここではGradleでやってみました。

実行にはSpark Coreのコンポーネントが必要です。

2.11系のspark-coreライブラリがMaven Centralに上がっているので、build.gradleに下記のように書くだけで2.11系対応のsparkが使えます。

また、manifestのMain-Classを指定しておかないと、実行時に--classオプションでクラス名を渡す必要が出てきます。

spark-standalone-sample/build.gradle
dependencies {
  compile "org.scala-lang:scala-compiler:2.11.7"
  compile "org.scala-lang:scala-library:2.11.7"
  compile "org.apache.spark:spark-core_2.11:1.4.1"
}

jar {
  manifest {
    attributes "Main-Class" : "sample.SimpleApp"
  }
  from configurations.compile.collect { it.isDirectory() ? it : zipTree(it) }
}

実行は、下記のように、spark-submitコマンドを使います。

$ git clone git@github.com:khiraiwa/spark-learning.git
$ cd cd spark-learning/spark-standalone-sample/
$ gradle shadowJar
:compileJava UP-TO-DATE
:compileScala
:processResources UP-TO-DATE
:classes
:shadowJar

BUILD SUCCESSFUL

Total time: 45.596 secs
$ spark-submit build/libs/spark-standalone-sample-0.1-all.jar 
15/08/09 16:47:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Lines with a: 11, Lines with b: 1

正しく結果が表示されました。

ちなみにアプリケーション実行中も、上記のSpark UIにアクセスして進捗など見れるようです。

補足

普通にgradle jarコマンド等で作ったJARファイルを実行すると下記のExceptionが出て終了してしまいます。

bash-3.2$ spark-submit build/libs/spark-standalone-sample-0.1.jar 
spark-submit build/libs/spark-standalone-sample-0.1.jar 
Exception in thread "main" java.lang.SecurityException: Invalid signature file digest for Manifest main attributes

どうやらMETA-INFの中のsignature fileの問題のようです。

解決方法はJARファイルの中のMETA-INF/ECLIPSEF.RSAを削除して再びJARに戻せば良いので下記のようにすればよいです。

zip -d spark-standalone-sample-0.1.jar META-INF/*.RSA META-INF/*.DSA

しかし、毎回作りなおすのは面倒だし、今回はGradleを使っているので、Gradle Shadowプラグインを使ってJARファイル作成時にExcludeするようにしました。

spark-standalone-sample/build.gradle
shadowJar {
  mergeServiceFiles {
    exclude 'META-INF/ECLIPSEF.RSA'
  }
}

今日のところはここまで。

参考

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
34