7
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

AWS EMRにspark-jobserverを構築してREST経由でJob実行させる

Last updated at Posted at 2016-04-22

Sparkを利用して、ジョブをパシパシ叩きたかったのですが、どうやってジョブ実行をしようかな、と悩むことになりました。

クラウド依存になりたくなかったので、できればKinesisストリームは使いたくないな。。と思いつつも下記のように調べてみました。

利用可否 Streamingか単発ジョブか アドホックな引数設定の可否 懸念
KinesisとSpark Streaming Streaming やや難しい。 GCPとかMS使いたくなったら書き直しが多い
spark-jobserver 単発ジョブ RestAPIのパラメータ設定なので簡単 今後も利用され続けるか疑問。
luigiとか使ってspark-submitを叩く 単発ジョブ ジョブ作りこめば簡単 Sparkのためだけのジョブサーバを構築・運用するかどうするか
Akka Stream 不可 implementされてない(2016/4/21時点) おそらくStreaming おそらく難しい。 いつ出るんだろ。。

どれもあまり乗り気になれませんでしたが、RestAPIで叩けるようにしておけば、開発関係者が気軽にバッチを叩いてMLを試したりできるかも?ということで、spark-jobserverを使ってみることにします。

構築について

sparkの利用できるEMRの起動後に、sshでmasterノードにログインします。

sshログインしたら、下記のMarkdownにしたがって、job-server.tar.gzを作成します。

m1.mediumのインスタンスを利用しましたがビルドは遅かったです。。
assemblyでjob-server.tar.gzを作成するだけなので、CI系サービスか、自前の環境で作成してartifactoryやs3等に保存してもいいかもしれません。(これだけのために毎度EMRを立ち上げる必要はなし。)

利用してみる

job-server.tar.gzを展開します。

mkdir /mnt/lib/spark-jobserver
cd /mnt/lib/spark-jobserver
tar zxf job-server.tar.gz

JAVA_OPTSのserver_start.shの中のjmx周りの設定をコメントアウトします。
これがあると動かないことがありました。

テスト用のアプリケーションを用意してみます。

build.sbt
name := "example-spark-jobs"

organization := "org.triplew.dfree"

version := "1.0-SNAPSHOT"

enablePlugins(JavaAppPackaging, sbtdocker.DockerPlugin)

scalaVersion := "2.10.6"

scalacOptions := Seq("-unchecked", "-deprecation", "-encoding", "utf8")

libraryDependencies ++= {
  val sparkV      = "1.6.1"
  Seq(
    "org.apache.spark" % "spark-core_2.10" % sparkV,
    "spark.jobserver" %% "job-server-api" % "0.6.1" % "provided",
    "com.github.seratch" %% "awscala" % "0.5.+",
    "com.github.levkhomich" %% "akka-tracing-core" % "0.4",
    "commons-configuration" % "commons-configuration" % "1.10"
  )
}

dependencyOverrides ++= Set(
  "com.fasterxml.jackson.core" % "jackson-databind" % "2.4.4"
)

Revolver.settings

resolvers += "Job Server Bintray" at "https://dl.bintray.com/spark-jobserver/maven"

以下割愛

scalaのバージョンを2.11にすると、sparkジョブは動きませんでした。
これ、大丈夫だろうか、という不安を覚えますね。。

setMasterで指定しているmasterについては、spark-shellでいったん、sc.masterで確認しちゃいました。こういうのは外だししたいですね。。

package org.triplew.dfree.example.spark.jobs

import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import spark.jobserver.{SparkJob, SparkJobInvalid, SparkJobValid, SparkJobValidation}

import scala.util.Try

object Hoge extends SparkJob {
  override def runJob(sc:SparkContext, jobConfig: Config): Any = {
    sc.parallelize(jobConfig.getString("input.string").split("-").toSeq).count()
  }

  override def validate(sc:SparkContext, config: Config): SparkJobValidation = {
    Try(config.getString("input.string"))
      .map(x => SparkJobValid)
      .getOrElse(SparkJobInvalid("No input.string config param"))
  }

  def main(args: Array[String]): Unit = {
    val appName: String = "dfree-example-spark-jobs"
    val conf = new SparkConf().setAppName(appName).setMaster("local[*]")
    val sc = new SparkContext(conf)

    val config = ConfigFactory.parseString("")
    val result = runJob(sc, config)

    println(result)
  }
}

このファイルを、sbt pacakgeでコンパイルして生成されたjarを、curlコマンドで、jobserverにputします。

また、jarに対しては名前を与えることができるようです。

curl --data-binary @target/scala-2.10/example-spark-jobs_2.10-1.0-SNAPSHOT.jar http://*.*.*.*:8090/jars/my-test

次にcontextを用意します。

curl -d "" 'http://*.*.*.*:8090/contexts/test?num-cpu-cores=1&memory-per-node=512m&spark.executor.instances=1

上記で作成したcontextとjarを指定して、sparkジョブを実行します。

curl -d "input.string = a-b-c" '*.*.*.*:8090/jobs?appName=my-test&classPath=org.triplew.dfree.example.spark.jobs.Hoge&context=test&sync=true'

そしてアウトプットとして以下を取得します。

{
  "result": 3
}

動いてるようです。

所感

  • Restのパラメータに指定するsyncで、非同期リクエストも可能のようなので、重たいバッチや機械学習をasync実行させ、jobidのみを控えておいて、ステータス繰り上がりあとに利用する、みたいな感じでしょうか。
  • 全部そこらへんもScalaでやるならFutureなどを利用するのもいいかもしれません。
  • Akka Streaming早く使ってみたい。。

本日は以上となります。

7
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?