13
11

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Spark2.0をEMRで試す

Last updated at Posted at 2016-08-04

概要

一週間前くらいにSpark2.0が出ました。そして、今日確認したらEMR5.0が既にSpark2.0対応されていました。さすがAWSさんです!

ということで、Spark2.0でイマドキのSpark実行を簡単にメモしておきます。

ゴール

  • Spark2.0が動く
  • Scala2.11ビルドしたjarが動く
  • Java8で動く
  • YARN分散環境上で動く

EMR設定

Software Configurationにて以下のように設定

  • emr-5.0.0
  • hadoop2.7.2
  • Spark2.0.0
  • configurationに以下のjsonを追加
    • Java8の設定と、sparkに最適化する設定を行っています。
[
  {
    "classification": "spark",
    "properties": {
      "maximizeResourceAllocation": "true"
    }
  },
  {
    "Classification": "hadoop-env",
    "Configurations": [
      {
        "Classification": "export",
        "Configurations": [],
        "Properties": {
          "JAVA_HOME": "/usr/lib/jvm/java-1.8.0"
        }
      }
    ],
    "Properties": {}
  },
  {
    "Classification": "spark-env",
    "Configurations": [
      {
        "Classification": "export",
        "Configurations": [],
        "Properties": {
          "JAVA_HOME": "/usr/lib/jvm/java-1.8.0"
        }
      }
    ],
    "Properties": {}
  }
]

Sparkアプリ

build.sbt
//nameとかは省略
//sbt-assemblyが入ってる前提

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "2.0.0" % "provided"
)

assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)
Main.scala
import org.apache.spark.{SparkConf, SparkContext}

object Main {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    val rdd = sc.range(1, 100000, 1, 10)
    println("----Start----")

    //check for using scala-library_2.11
    //if using 2.10, this method cause Exception.
    println("hello" -> "world")

    rdd.map(i => i*2)
      .foreach(i => println(i))
  }
}

こんな感じで書いて、sbt assemblyすればOK

EMRからアプリを実行

assemblyしたjarをS3にアップロードします。

後はEMRクラスタを構築して、起動オプションを以下のように設定すればOK

# この2つは、MasterNode上で任意のshellコマンドを叩くことを示しています。
JAR location: command-runner.jar
Main class: None

# spark-submitコマンド
Arguments: spark-submit --deploy-mode cluster 
--class com.github.uryyyyyyy.Main 
--master yarn 
--num-executors 2 
--driver-memory 1g 
--executor-memory 1g 
--executor-cores 1 
s3://<your-s3-bucket>/jars/<your-assembly-0.1.0.jar> <起動オプション>

# 実行中に失敗したらクラスタをどうするか
Action on failure: Continue

こんな感じで実行できます。

もしくは、Master Nodeにsshで入ってspark-submit ~~~ を実行しても動きます。

まとめ

これだけで上記のゴールが達成できます。
Spark2系はデフォルトでscala2.11ビルドなのが嬉しいですね。

It also updates Spark (an engine for large-scale data processing) from 1.6.2 to 2.0, with a similar move to Scala 2.11.

13
11
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
13
11

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?