5
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

リモートの Spark でローカル PC で書いたプログラムを動かす

Last updated at Posted at 2014-12-26

Spark をリモートの Linux サーバで起動しておいて、プログラムはローカル PC(Windows)で書いて実行したいなと思ったのでやりかたを調べましたが、ネットに情報がないので試行錯誤しました。
メモを残しておきます。
もっといいやり方があるという方は教えていただけると幸いです。

まずここらへんを参考に Spark 側の

  • spark-env.sh の STANDALONE_SPARK_MASTER_HOST を設定
  • /etc/spark/slaves に IP アドレスを設定
  • /etc/hosts の設定

とかをやる。ローカル PC がサーバ側から見えないといけないらしいので、/etc/hosts にローカル PC の IP アドレスを書いたりした。

build.sbt で Spark のライブラリを読み込んでおきます。

build.sbt
libraryDependencies += ("org.apache.spark" %% "spark-core" % "1.1.0").
    exclude("org.eclipse.jetty.orbit", "javax.servlet").
    exclude("org.eclipse.jetty.orbit", "javax.transaction").
    exclude("org.eclipse.jetty.orbit", "javax.mail").
    exclude("org.eclipse.jetty.orbit", "javax.activation").
    exclude("commons-collections", "commons-collections").
    exclude("commons-beanutils", "commons-beanutils-core").
    exclude("commons-logging", "commons-logging").
    exclude("com.esotericsoftware.minlog", "minlog")

もし、sbt 0.13.6 以上を使ってるなら sbt-assembly の最新版を使って

build.sbt
libraryDependencies += ("org.apache.spark" %% "spark-core" % "1.1.0") % "provided"

で OK らしい。

プログラムをこんな感じで書く。

Scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SparkTest {
  def rangeUntil = (x: Int) => 1 until x
  def isOdd = (x: Int) => x % 2 == 1
  def add = (x: Int, y: Int) => x + y
  def square = (x: Int) => x * x

  def main(args: Array[String]) {

    val appName = "MySpark"
    val master = "spark://***.***.***.***:7077"
    val conf = new SparkConf()
      .setAppName(appName)
      .setMaster(master)
      .setJars(Seq("SparkTest.jar"))
    //      .set("spark.executor.memory", "100m")

    val sc = new SparkContext(conf)

    val res0 = sc.parallelize(1 to 100).count()

    val data1 = sc.parallelize(2 to 4)
    val res1 = data1.map(rangeUntil).collect.toList
    val res2 = data1.flatMap(rangeUntil).collect.toList

    val data2 = sc.parallelize(Seq(1, 1, 2, 2, 2, 3, 3))
    val res3 = data2.distinct.collect.toList

    val data3 = sc.parallelize(Seq(1, 1, 2, 3, 5, 7, 8))
    val res4 = data3.groupBy(isOdd).collect.toList

    val data4 = sc.parallelize(1 until 10)
    val res5 = data4.sum
    val res6 = data4.reduce(add)
    val res7 = data4.map(square).reduce(add)
    val res8 = data4.fold(0)(add)

    println(res0)
    println(res1)
    println(res2)
    println(res3)
    println(res4)
    println(res5)
    println(res6)
    println(res7)
    println(res8)
  }
}

そして、sbt-assembly で SparkTest.jar を作成します。

このときの assembly.sbt

assembly.sbt
import AssemblyKeys._

assemblySettings

outputPath in assembly := new File(""".\SparkTest.jar""")

この jar を生成する理由は、map とかに渡す関数が無いというエラー(ClassNotFoundException) が出るためです。

ここで生成された jar をプログラム内で読み込んでいます。

jar を生成したらプログラムが実行できます。

結果
100
List(Range(1), Range(1, 2), Range(1, 2, 3))
List(1, 1, 2, 1, 2, 3)
List(1, 2, 3)
List((false,CompactBuffer(2, 8)), (true,CompactBuffer(1, 1, 3, 5, 7)))
45.0
45
285
45

フローとしては

  • プログラム中で使う関数を定義
  • jar をコンパイル
  • 関数を使用した Spark プログラムを書いて実行
  • 別の関数が必要になったら関数定義に戻る

という感じでローカル環境で Spark プログラムを実行しながら書けます。

jar をコンパイルするのに時間がかかるのと、map 関数に直接ラムダ式を書けないというのがフラストレーションになっています。
もっといい方法があったら教えてください。

とりあえず今はコンパイル中は Bokete Player でも見て暇つぶししてます。

5
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?