概要
一週間前くらいにSpark2.0が出ました。そして、今日確認したらEMR5.0が既にSpark2.0対応されていました。さすがAWSさんです!
ということで、Spark2.0でイマドキのSpark実行を簡単にメモしておきます。
ゴール
- Spark2.0が動く
- Scala2.11ビルドしたjarが動く
- Java8で動く
- YARN分散環境上で動く
EMR設定
Software Configurationにて以下のように設定
- emr-5.0.0
- hadoop2.7.2
- Spark2.0.0
- configurationに以下のjsonを追加
- Java8の設定と、sparkに最適化する設定を行っています。
[
{
"classification": "spark",
"properties": {
"maximizeResourceAllocation": "true"
}
},
{
"Classification": "hadoop-env",
"Configurations": [
{
"Classification": "export",
"Configurations": [],
"Properties": {
"JAVA_HOME": "/usr/lib/jvm/java-1.8.0"
}
}
],
"Properties": {}
},
{
"Classification": "spark-env",
"Configurations": [
{
"Classification": "export",
"Configurations": [],
"Properties": {
"JAVA_HOME": "/usr/lib/jvm/java-1.8.0"
}
}
],
"Properties": {}
}
]
Sparkアプリ
build.sbt
//nameとかは省略
//sbt-assemblyが入ってる前提
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.0.0" % "provided"
)
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)
Main.scala
import org.apache.spark.{SparkConf, SparkContext}
object Main {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val rdd = sc.range(1, 100000, 1, 10)
println("----Start----")
//check for using scala-library_2.11
//if using 2.10, this method cause Exception.
println("hello" -> "world")
rdd.map(i => i*2)
.foreach(i => println(i))
}
}
こんな感じで書いて、sbt assemblyすればOK
EMRからアプリを実行
assemblyしたjarをS3にアップロードします。
後はEMRクラスタを構築して、起動オプションを以下のように設定すればOK
# この2つは、MasterNode上で任意のshellコマンドを叩くことを示しています。
JAR location: command-runner.jar
Main class: None
# spark-submitコマンド
Arguments: spark-submit --deploy-mode cluster
--class com.github.uryyyyyyy.Main
--master yarn
--num-executors 2
--driver-memory 1g
--executor-memory 1g
--executor-cores 1
s3://<your-s3-bucket>/jars/<your-assembly-0.1.0.jar> <起動オプション>
# 実行中に失敗したらクラスタをどうするか
Action on failure: Continue
こんな感じで実行できます。
もしくは、Master Nodeにsshで入ってspark-submit ~~~
を実行しても動きます。
まとめ
これだけで上記のゴールが達成できます。
Spark2系はデフォルトでscala2.11ビルドなのが嬉しいですね。
It also updates Spark (an engine for large-scale data processing) from 1.6.2 to 2.0, with a similar move to Scala 2.11.