Spark をリモートの Linux サーバで起動しておいて、プログラムはローカル PC(Windows)で書いて実行したいなと思ったのでやりかたを調べましたが、ネットに情報がないので試行錯誤しました。
メモを残しておきます。
もっといいやり方があるという方は教えていただけると幸いです。
まずここらへんを参考に Spark 側の
- spark-env.sh の STANDALONE_SPARK_MASTER_HOST を設定
- /etc/spark/slaves に IP アドレスを設定
- /etc/hosts の設定
とかをやる。ローカル PC がサーバ側から見えないといけないらしいので、/etc/hosts にローカル PC の IP アドレスを書いたりした。
build.sbt で Spark のライブラリを読み込んでおきます。
libraryDependencies += ("org.apache.spark" %% "spark-core" % "1.1.0").
exclude("org.eclipse.jetty.orbit", "javax.servlet").
exclude("org.eclipse.jetty.orbit", "javax.transaction").
exclude("org.eclipse.jetty.orbit", "javax.mail").
exclude("org.eclipse.jetty.orbit", "javax.activation").
exclude("commons-collections", "commons-collections").
exclude("commons-beanutils", "commons-beanutils-core").
exclude("commons-logging", "commons-logging").
exclude("com.esotericsoftware.minlog", "minlog")
もし、sbt 0.13.6 以上を使ってるなら sbt-assembly の最新版を使って
libraryDependencies += ("org.apache.spark" %% "spark-core" % "1.1.0") % "provided"
で OK らしい。
プログラムをこんな感じで書く。
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SparkTest {
def rangeUntil = (x: Int) => 1 until x
def isOdd = (x: Int) => x % 2 == 1
def add = (x: Int, y: Int) => x + y
def square = (x: Int) => x * x
def main(args: Array[String]) {
val appName = "MySpark"
val master = "spark://***.***.***.***:7077"
val conf = new SparkConf()
.setAppName(appName)
.setMaster(master)
.setJars(Seq("SparkTest.jar"))
// .set("spark.executor.memory", "100m")
val sc = new SparkContext(conf)
val res0 = sc.parallelize(1 to 100).count()
val data1 = sc.parallelize(2 to 4)
val res1 = data1.map(rangeUntil).collect.toList
val res2 = data1.flatMap(rangeUntil).collect.toList
val data2 = sc.parallelize(Seq(1, 1, 2, 2, 2, 3, 3))
val res3 = data2.distinct.collect.toList
val data3 = sc.parallelize(Seq(1, 1, 2, 3, 5, 7, 8))
val res4 = data3.groupBy(isOdd).collect.toList
val data4 = sc.parallelize(1 until 10)
val res5 = data4.sum
val res6 = data4.reduce(add)
val res7 = data4.map(square).reduce(add)
val res8 = data4.fold(0)(add)
println(res0)
println(res1)
println(res2)
println(res3)
println(res4)
println(res5)
println(res6)
println(res7)
println(res8)
}
}
そして、sbt-assembly で SparkTest.jar を作成します。
このときの assembly.sbt
import AssemblyKeys._
assemblySettings
outputPath in assembly := new File(""".\SparkTest.jar""")
この jar を生成する理由は、map とかに渡す関数が無いというエラー(ClassNotFoundException) が出るためです。
ここで生成された jar をプログラム内で読み込んでいます。
jar を生成したらプログラムが実行できます。
100
List(Range(1), Range(1, 2), Range(1, 2, 3))
List(1, 1, 2, 1, 2, 3)
List(1, 2, 3)
List((false,CompactBuffer(2, 8)), (true,CompactBuffer(1, 1, 3, 5, 7)))
45.0
45
285
45
フローとしては
- プログラム中で使う関数を定義
- jar をコンパイル
- 関数を使用した Spark プログラムを書いて実行
- 別の関数が必要になったら関数定義に戻る
という感じでローカル環境で Spark プログラムを実行しながら書けます。
jar をコンパイルするのに時間がかかるのと、map 関数に直接ラムダ式を書けないというのがフラストレーションになっています。
もっといい方法があったら教えてください。
とりあえず今はコンパイル中は Bokete Player でも見て暇つぶししてます。