概要
とあるSparkアプリで、外部リソースにアクセスすることになったのだが、負荷を減らすために少しずつ処理したいというニーズが出てきた。
そこで、Sparkアプリ内の各Jobの並列実行を試してみました。
(このようにmap/reduce内で外部リソースに頻繁にアクセスするのは一般的ではないですが、重い処理に全リソースを割かれないで別の処理も並列で走らせたい、という需要はあるのではと思っています。)
やりかた
https://spark.apache.org/docs/1.6.1/job-scheduling.html
(この情報は不十分なので調べるのが面倒でした。。。)
schedulerPoolの設定
まずfairScheduler.xmlを用意して、どこかのパスに置いておく。
<?xml version="1.0"?>
<allocations>
<pool name="production">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
<minShare>2</minShare>
</pool>
<pool name="test">
<schedulingMode>FIFO</schedulingMode>
<weight>2</weight>
<minShare>3</minShare>
</pool>
</allocations>
次に、SparkContextに、modeがFAIRであることを伝える。
(これをしないとxmlを呼んでもちゃんとpoolが作られない。デフォルトではこの値はFIFOになっている。)
conf.set("spark.scheduler.mode", "FAIR")
そして、先ほど用意したscheduler.xmlを読ませる。
conf.set("spark.scheduler.allocation.file", "/your/path/spark-fairScheduler.xml")
※ここでxmlをresourcesとしてJarの中に含めてしまうと、使うときにはjarから一度別のパスに取り出してからそのパスを指定しないといけない。
val xml = getClass.getClassLoader.getResourceAsStream("pool.xml")
val homeDir = System.getProperty("user.home")
val f = new File(homeDir + "/.pool.xml")
val in = scala.io.Source.fromInputStream(xml)
val out = new java.io.PrintWriter(f)
try { in.getLines().foreach(out.print) }
finally { out.close() }
conf.set("spark.scheduler.allocation.file", homeDir + "/.pool.xml")
それぞれのpoolにJobを投入してみる。
ただJobを投入するだけだと、Jobが生成された順番でFIFOで処理されてしまうので、Futureなどを用いて別スレッドからpoolへJob投入してあげる必要があるようです。
サンプルはこんな形です。
val f1 = Future {
//backend logic
sc.setLocalProperty("spark.scheduler.pool", "backend")
println(sc.getSchedulingMode)
rdd.saveAsTextFile(file2)
}
//main logic
sc.setLocalProperty("spark.scheduler.pool", "main")
println(sc.getSchedulingMode)
rdd.saveAsTextFile(file1)
Await.ready(f1, Duration.Inf)
ここで大事なのは、saveAsTextFile
のような、実際にRDDの処理が走りだすような記述を書いてあげる必要があることです。ただのmapやreduceだけだと、まだRDDの処理は始まらないのでpoolに貯められることも無いと思います。
動かしてみる
こんな感じになれば成功
[Stage 0:=> (9 + 1) / 100][Stage 1:==> (15 + 3) / 100]
この例だと、Stage0は1つのExecutorが、Stage1では3つのExecutorが処理をしていることになります。
また、SparkのWebUIからも対象のJobがどのpoolで実行されているかは見ることができます。