9
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

SparkでJobを並列に実行する

Posted at

概要

とあるSparkアプリで、外部リソースにアクセスすることになったのだが、負荷を減らすために少しずつ処理したいというニーズが出てきた。
そこで、Sparkアプリ内の各Jobの並列実行を試してみました。

(このようにmap/reduce内で外部リソースに頻繁にアクセスするのは一般的ではないですが、重い処理に全リソースを割かれないで別の処理も並列で走らせたい、という需要はあるのではと思っています。)

やりかた

https://spark.apache.org/docs/1.6.1/job-scheduling.html
(この情報は不十分なので調べるのが面倒でした。。。)

schedulerPoolの設定

まずfairScheduler.xmlを用意して、どこかのパスに置いておく。

<?xml version="1.0"?>
<allocations>
  <pool name="production">
    <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>2</minShare>
  </pool>
  <pool name="test">
    <schedulingMode>FIFO</schedulingMode>
    <weight>2</weight>
    <minShare>3</minShare>
  </pool>
</allocations>

次に、SparkContextに、modeがFAIRであることを伝える。
(これをしないとxmlを呼んでもちゃんとpoolが作られない。デフォルトではこの値はFIFOになっている。)

conf.set("spark.scheduler.mode", "FAIR")

そして、先ほど用意したscheduler.xmlを読ませる。

conf.set("spark.scheduler.allocation.file", "/your/path/spark-fairScheduler.xml")

※ここでxmlをresourcesとしてJarの中に含めてしまうと、使うときにはjarから一度別のパスに取り出してからそのパスを指定しないといけない。

    val xml = getClass.getClassLoader.getResourceAsStream("pool.xml")
    val homeDir = System.getProperty("user.home")
    val f = new File(homeDir + "/.pool.xml")

    val in = scala.io.Source.fromInputStream(xml)
    val out = new java.io.PrintWriter(f)
    try { in.getLines().foreach(out.print) }
    finally { out.close() }
    conf.set("spark.scheduler.allocation.file", homeDir + "/.pool.xml")

それぞれのpoolにJobを投入してみる。

ただJobを投入するだけだと、Jobが生成された順番でFIFOで処理されてしまうので、Futureなどを用いて別スレッドからpoolへJob投入してあげる必要があるようです。
サンプルはこんな形です。

    val f1 = Future {
      //backend logic
      sc.setLocalProperty("spark.scheduler.pool", "backend")
      println(sc.getSchedulingMode)
      rdd.saveAsTextFile(file2)

    }
    //main logic
    sc.setLocalProperty("spark.scheduler.pool", "main")
    println(sc.getSchedulingMode)
    rdd.saveAsTextFile(file1)

    Await.ready(f1, Duration.Inf)

ここで大事なのは、saveAsTextFileのような、実際にRDDの処理が走りだすような記述を書いてあげる必要があることです。ただのmapやreduceだけだと、まだRDDの処理は始まらないのでpoolに貯められることも無いと思います。

動かしてみる

こんな感じになれば成功

[Stage 0:=>               (9 + 1) / 100][Stage 1:==>             (15 + 3) / 100]

この例だと、Stage0は1つのExecutorが、Stage1では3つのExecutorが処理をしていることになります。

また、SparkのWebUIからも対象のJobがどのpoolで実行されているかは見ることができます。

9
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?