はじめに
こんにちは。
前回に引き続き、SparkInternalsを訳していきます。
前回と同じく以後は下記の判例となります。
SparkInternals訳文
コメント
SparkInternals
Physical Plan
本章ではDAGを基にしたPhysicalPlanについて説明する。
PhysicalPlanはStageやTaskといった内容も内包している。
本章ではどのようにLogicalPlanからどうやってStageやTaskといったPhysicalPlanを生成するかについて焦点を当てて進める。
A Complex Logical Plan
上記のアプリケーションのコードは章末を確認すること。
どのようにデータ依存グラフから適切なStageやTaskを決定しているのか?
直観的なアイディアとしてはあるRDDと親RDDの関連からStageを生成する形になる。そう考えると、上の図にある各矢印がTaskになる形になるだろう。例えば、2RDDを1RDDに統合する場合、これらの3RDDを関連付けるStageが必要となるだろう。このアプローチは動作するかもしれないが、効率的ではない。
このアプローチは大量の中間データを保持しなければならないという微妙ではあるものの、致命的な問題を抱えている。
PysicalTaskにとって、結果をローカルディスク、メモリ、またはその両方に保存する必要が出てきてしまう。もし上記の図の矢印毎にTaskを生成するのなら、全てのRDDを保存する必要が出てきてしまうだろう。これは非常に高コストとなる。
LogicalPlanをより精査してみると、実は各RDD中の各Partitionは各々独立していることがわかる。言うなれば、各RDD中の各Partitionは互いに干渉しないのだ。
この観察結果を基に考えてみると、1Stage中の全ての関連は最終的なRDD(FlatMappedValuesRDD)の各Partition毎に1Taskを定義すればいいことがわかるだろう。
その考え方を元に図示してみると以下のようになる。
上記図の太い矢印は最終的なRDDの1Partition目の結果を算出するTask1の関連を示す。
CoGroupedRDDの1Partition目を算出するためにはShuffleDependencyが存在するため、前のRDD(UnionRDD)に属するPartitionを全て算出する必要があることに留意いただきたい。
上記計算の後、CoGroupedRDDの2Partiton目(細い矢印)、3Partition目(点線矢印)を算出する流れは非常にシンプルなものとなる。
しかしながら、この考え方に2個の問題がある。
- 最初のTaskが巨大になる。ShuffleDependencyを関連として保持しているため、最初のTaskを計算するためには前のRDD(UnionRDD)の全Partitionを計算する必要が出てきてしまう。
- 後続TaskのためにどのPartitionをキャッシュする必要があるか算出するために賢いアルゴリズムを設計する必要がある。
このように、この考え方に問題はあるものの、データをパイプライン化するにあたり、データが実際に必要になったタイミングで計算するという点で優れたものを持っている。最初のTaskを例にとると、最終RDD(FlatMappedValuesRDD)の最初のPartitionから遡ってどのRDDとPartitionの計算が必要になるか計算する流れになる。さらに、RDD間の関連がNarrowDependencyとなっている場合、中間データを保持する必要はない。
データのパイプラインをより明確に理解するために、Partition同士の関連からRecordレベルの視点に落としてデータの流れを確認してみる。すると、NarrowDependencyのRDD同士の関連を下記の図のように各パターン図示することができる。
1個目のパターン (pipeline pattern)は下記のコードで示すことできる。
for (record <- records) {
g(f(record))
}
RecordのリストをStreamとして捉えた場合、中間データを保持する必要がないことがわかるだろう。
例えば、最終的な結果であるg(f(record1))を1度算出すれば、元のデータであるrecord1や中間データであるf(record1)の結果については破棄しても問題ない。その次はRecordのリストの続きであるg(f(record2))を算出すればいい。しかしながら、違うパターン(3個目のパターンなど)については下記のように事情が違う。
// The third pattern
def f(records) {
var result
for (record <- records)
result.aggregate(process(record)) // need to store the intermediate result here
result.iterator // return the iterator of newly-generated [record1, record2, record3]
}
val fResult = f(records) // newly-generated [record1, record2, record3]
for (record <- fResult) {
g(record)
}
上記のコードより、関数fの結果はどこかしら(メモリ構造など)に保持しておく必要があることがわかるだろう。
では、元々のStageとTaskの問題に戻る。前述のアイディアの主要な問題点はShuffleDependencyの関連が存在する場合にデータをパイプライン化できないことにある。
ただ、すでにNarrowDependencyについてはパイプライン化はすんでいるため、NarrowDependencyで接続されたデータ連結情報を残したままShuffleDependencyの段階でデータフローを区切る湖とは可能になっている。
実際に例を挙げると、下記のようにStageを区切ることが可能になっている。
Stageを生成するための方針は以降のとおり。最終RDDから遡って関連を確認し、各NarrowDependencyを現在のStageに追加していき、ShuffleDependencyに突き当たった段階で新たなStageを生成する。
各Stageにおいて、Task数は最終RDDのPartition数で決定される。
上記の図中では太矢印が個々のTaskを示している。Stageは遡りながら決定されていくため、最終StageのIDが0、Stage1とStage2はStage0の親Stageとなる。Stageが最終結果を生成する場合、該当StageのTaskはResultTaskとなり、それ以外のShuffleで区切られたケースはShuffleMapTaskとなる。ShuffleMapTaskは該当Taskの結果を次のStageでShuffleして用いる必要があるため、この名称となっており、構造的にはHadoop MapReduceとよく似た構成となっている。ResultTaskはReducer(親StageからShuffleされたデータを取得する場合)、またはMapper(親Stageを持たない場合)として扱われる。
ただし、1個問題は残っている。NarrowDependencyはパイプライン化されるが、実際のところサンプルアプリケーションにおいてはOneToOneDependencyとRangeDependencyしか示されていない。NarrowDependency (M:N)についてはどうなるのだろうか?
前章のCartesianオペレーションを基に複雑なNarrowDependencyの詳細を振り返ってみると下図のようになる。
太矢印は1個目のResultTaskを示している。上記の図のStageがそのまま最終結果を生成するため、6ResultTaskが生成されることになる。OneToOneDependencyと違う点は、各Taskを計算するために3RDD(RDD a、RDD b、CartesianRDD)と2個のデータ値をすべて1Task内で読む必要が出てくること。それ以外に変わりはないため、NarrowDependencyであれば1:1、N:N関係なくNarrowDependencyをパイプライン化することが可能であることがわかる。Task数は最終RDDのPartition数と同じ数となる。
この説明だけを読むと、最終RDDの各Taskを順次実行するとすると各Data blocksを複数回読むように見えてしまうわけですが、それはどうなんでしょうか・・
例えば、最終RDDのPartiton1ではData blocks1の1Partition目&Data blocks2の1Partition目、最終RDDのPartiton2ではData blocks1の1Partition目&Data blocks2の1Partition目と読むため、各Taskを独立して実行すると複数回読むことになってしまう。
それについてはわからないので、とりあえずそれは頭の片隅においておいて、この章は読み進めます。
PhysicalPlanの実行
ここまでで、どのようにStageとTaskが生成されるかはわかったため、次の問題は「どのようにTaskが実行されるのか?」となる。
まずはサンプルアプリケーションのPhysicalPlanを再確認してみる。このアプリケーションをHadoop MapReduceで実行してみると、Taskは下記の流れで実行される。
- map()がMapの結果を生成し、Partition毎に分割した上でローカルディスクに出力
- shuffle-sort-aggregateプロセスがReduce側の入力(構成になったRecord)を生成
- reduce()が最終結果を生成するため実行
このプロセスは下記の図のように図示される。
このHadoop MapReduceのPhysicalPlanはシンプルで固定化された構成であり、パイプライン化もされていないため、この実行プロセスをそのままSparkのPhysicalPlanに適用することはできない。
パイプライン化の重要な考え方として、データが後段の処理で実際に必要になった段階で処理されるというものがある。実際には、最終的な結果からRDDを遡っていき、どのRDDとPartitionが最終結果を算出するために必要になるかを確認していく流れになっている。ほとんどの場合、一番左のRDDのいくつかのPartitionに遡り、それをはじめに算出する流れになる。
親Stageを持たないStageの中で一番左に位置するRDD(つまり依存するRDDを持たないRDD)は直に評価することができ、各RecordをStreamとして評価し、続く処理に流すことでパイプラインを構築する形になる。この処理の連鎖は最終結果から遡る形で推定されるが、実際に実行する際には親RDD側から後のRDDに進める形で処理が行われる。あるRecordは次のRecordの処理が開始される前に該当Stageの処理は最後まで実施される。
親Stageを持つStageにおいては、あらかじめ親Stageの処理を実行しておき、Shuffleを通してデータを取得する必要がある。親Stageの実行とShuffleからのデータ取得を行った後は親を持たないStageと同様の処理となる。
実際のコード上においては、各RDDのgetDependency()
メソッドを実行することでデータ依存性を取得することができる。compute()
メソッドは上流(親RDDか、またはデータソース)からRecordを取得し、記述したロジックを適用することを担当している。RDDのソースコードを見ると、しばしばfirstParent[T].iterator(split, context).map(f)
というコードを見ることができる。これは、firstParent
が1個目の依存するRDDを示しており、iterator()
がRecordを1レコードずつ取得するIteratorを、map(f)
が書く処理に対するロジックfの適用を示している。compute()
メソッドはRDDに対して定義されたロジックを適用し、次の処理に渡すための計算結果を返す。
まとめると、全体の処理の連鎖は最終RDDから依存するRDDを遡る形で生成される。ShuffleDependencyが全体をStageに分割する。各Stageにおいて、各RDDのcompute()
メソッドはparentRDD.iterator()
を実行し、上流からRecordのStreamを取得する。
ただし、留意すべき点としてcompute()
メソッドは実際のロジックを宣言するためのもので、実際の依存RDDはgetDependency()
メソッド、依存Partitionはdependency.getParents()
メソッドにて宣言される。
では、CartesianRDDを例を基に見てみると下記のようになる。
// RDD x is the cartesian product of RDD a and RDD b
// RDD x = (RDD a).cartesian(RDD b)
// Defines how many partitions RDD x should have, and the type of each partition
override def getPartitions: Array[Partition] = {
// Create the cross product split
val array = new Array[Partition](rdd1.partitions.size * rdd2.partitions.size)
for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) {
val idx = s1.index * numPartitionsInRdd2 + s2.index
array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index)
}
array
}
// Defines the computation logic for each partition of RDD x (the result RDD)
override def compute(split: Partition, context: TaskContext) = {
val currSplit = split.asInstanceOf[CartesianPartition]
// s1 shows that a partition in RDD x depends on one partition in RDD a
// s2 shows that a partition in RDD x depends on one partition in RDD b
for (x <- rdd1.iterator(currSplit.s1, context);
y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
}
// Defines which are the dependent partitions and RDDs for partition i in RDD x
//
// RDD x depends on RDD a and RDD b, both are `NarrowDependency`
// For the first dependency, partition i in RDD x depends on the partition with
// index "i / numPartitionsInRDD2" in RDD a
// For the second dependency, partition i in RDD x depends on the partition with
// index "i % numPartitionsInRDD2" in RDD b
override def getDependencies: Seq[Dependency[_]] = List(
new NarrowDependency(rdd1) {
def getParents(id: Int): Seq[Int] = List(id / numPartitionsInRdd2)
},
new NarrowDependency(rdd2) {
def getParents(id: Int): Seq[Int] = List(id % numPartitionsInRdd2)
}
)
Job Creation
ここまでで、LogicalPlanとPhysicalPlanについて説明してきた。後残っているのは、「いつ、どのようにSparkジョブが作成されるのか?」となる。そもそも正確に言うとジョブとは何なのか?
下記の表は典型的なaction()
について示している。2カラム目はprocessPartition()
メソッド、各Partition中の各Recordをどのように処理し、最終結果を生成するかを示している。(実際はこの処理の結果、部分的な結果が生成される) 3カラム目はresultHandler()
メソッドで各Partitionの部分的な結果からどうやって最終的な結果を生成するかを示している。
Action | finalRDD(records) => result | compute(results) |
---|---|---|
reduce(func) | (record1, record2) => result, (result, record i) => result | (result1, result 2) => result, (result, result i) => result |
collect() | Array[records] => result | Array[result] |
count() | count(records) => result | sum(result) |
foreach(f) | f(records) => result | Array[result] |
take(n) | record (i<=n) => result | Array[result] |
first() | record 1 => result | Array[result] |
takeSample() | selected records => result | Array[result] |
takeOrdered(n, [ordering]) | TopN(records) => result | TopN(results) |
saveAsHadoopFile(path) | records => write(records) | null |
countByKey() | (K, V) => Map(K, count(K)) | (Map, Map) => Map(K, count(K)) |
ユーザが開発したDriverプログラムにおいてaction()
メソッドが実行されるたびにジョブが生成される。例えば、foreach()
Actionメソッドはsc.runJob(this, (iter: Iterator[T]) => iter.foreach(f)))
を実行し、DAGSchedulderにジョブ登録を行っている。もしDriverプログラムで他のaction()
メソッドが実行されていた場合、別ジョブとして登録が行われる。そのため、Driverプログラム中でaction()
メソッドが複数回実行されていた場合、その数だけジョブが登録されることになる。こういった形でDriverプログラムとジョブの粒度が異なるため、Driverプログラムはジョブではなく、アプリケーションと呼ばれる。
ジョブの最終Stageはジョブの実行結果を生成する。例えば、1章であったGroupByTestの場合、2個のジョブと2組の結果が存在している。ジョブが登録された際に、DAGSchedulerはApplication-LogicalPlan-PhysicalPlanの算出処理をStageを抽出し、親を持たないStageをはじめに登録するために実行する。この算出処理において、Taskの数や型も決定される。実際のStageは親Stageが終了したタイミング以降に実行される。
ジョブ登録詳細
実際にジョブを生成し、登録する箇所のコードの概要を示す。
ここで概要を示し、Architectureの章で再度この概要に立ち戻って確認する。
-
rdd.action()
がDAGScheduler.runJob(rdd, processPartition, resultHandler)
をジョブ生成のために実行 -
runJob()
メソッドがPartition数や最終RDDの型をrdd.getPartitions()
から取得し、Array[Result](partitions.size)
をPartition数を基に結果を保持するために用いる。 - 最終的にDAGScheduler 中の
runJob(rdd, cleanedFunc, partitions, allowLocal, resultHandler)
がジョブ登録のために実行される。cleanedFuncはclosureベースに変換されたPartition毎の処理関数オブジェクトとなる。この方法ではこの関数をシリアライズして各Workerノードに送信して実際の実行を行う。 - DAGSchedulerの
runJob()
は内部で実際にジョブを登録するためにsubmitJob(rdd, func, partitions, allowLocal, resultHandler)
を実行する。 -
submitJob()
からジョブIDを取得し、再度関数をラップし、ジョブ登録成功メッセージをDAGSchedulerEventProcessActorに送信している。このメッセージを受信した結果、ActorはdagScheduler.handleJobSubmitted()
を実行し、実際のジョブのハンドリングを行う。個々からわかるようにイベント駆動プログラミングモデルとして実行を行っている。 -
handleJobSubmitted()
ははじめにfinalStage = newStage()
を実行してStageの生成を行い、submitStage(finalStage)
を実行する。もしfinalStageが親Stageを保持している場合、親Stageを取得した上で先に登録が行われる。その実処理自体はsubmitWaitingStages()
似て行われる。
実際にnewStage()
はどのようにRDDの連鎖をStageに区切っているのか、については下記のようになる。
- このメソッドは最終RDDの
getParentStages()
をStage生成時に実行する。 -
getParentStages()
は最終RDDから開始してLogicalPlanを遡っていく。その過程で関連がNarrowDependencyであるRDDを現在のStageに追加していく。ShuffleDependencyに到達した場合、右側のRDD(Shuffle後に結果を受け取るRDD)を取得し、現在のStageを終了させる。その後、同様のロジックを左側のRDD(右側のRDDが受け取る結果の生成元となるRDD)に適用し、新たなStageを生成する。 - ShuffleMapStageが生成された場合、最終的なRDDを
MapOutputTrackerMaster.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size)
を実行して登録する。この登録処理はデータの出力先をMapOuputTrackerMasterから取得するために必要となるため、重要な処理となっている。
submitStage(stage)
が実際にどのようにStage、Taskを登録しているかは下記のようになる。
-
getMissingParentStages(stage)
を実行し、現在のStageの未実行親Stageがないか確認する。すでに親Stageがすべて実行されていた場合、このメソッドの実行結果は空となる。 - 未実行親Stageが存在する場合、未実行Stageを再帰的に登録していき、現在のStageをwaitingStagesに追加する。親Stageが完了した後、 waitingStagesに存在するStage画実行される。
- 未実行親Stageが存在しない場合、現在のStageが今実行可能であることを示している。実行時には
submitMissingTasks(stage, jobId)
を用いて実Taskを生成登録している。もしStageがShuffleMapStageだった場合、複数の最終RDDのPartition数分ShuffleMapTaskが生成される。ResultStageだった場合、ResultTaskが生成される。あるStage中のTask一覧はTaskSetとしてまとめられて用いられる。最終的にtaskScheduler.submitTasks(taskSet)
が実行され、TaskSetが投入される。 - 3のtaskSchedulerの型はTaskSchedulerImplとなっている。
submitTasks()
において、各taskSetは管理用変数であるTaskSetManagerにラップされ、schedulableBuilder.addTaskSetManager(manager)
画実行される。schedulableBuilder の実際の型は設定によってFIFOSchedulableBuilder、またはFairSchedulableBuilderが用いられる。submitTasks()
の最後で、実際にTaskを実行するためにbackend.reviveOffers()
に対して通知を行っている。ここでのbackendの型はSchedulerBackendとなっている。クラスタ上で実行する場合、backendの型はSparkDeploySchedulerBackendとなる。 - SparkDeploySchedulerBackendはCoarseGrainedSchedulerBackendのサブクラスとなっており、
backend.reviveOffers()
は実際にはDriverActorに対してReviveOffersメッセージを送信している。SparkDeploySchedulerBackendは起動時にDriverActorを開始している。DriverActorがReviveOffersを受信するとlaunchTasks(scheduler.resourceOffers(Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId)))))
を実行し、Taskを開始する。scheduler.resourceOffers()
はTaskSetManagerをFIFOやFair Schedulerのようなスケジューラを適用し、ソートされたTaskSetManagerを他の情報を集約した状態で取得するようになっている。を取得するようになっている。この取得したTaskSetManagerの情報はTaskDescription中に保持されている。この段階で、データ局所性も考慮された構成となっている。 - DriverActorの
launchTasks()
は各Task情報をシリアライズする。シリアライズ結果がAkkaのフレームサイズを超過していた場合、TaskはexecutorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))
という形でバッファリングした上で送信される。
まとめ
ここまでで、下記の内容について話してきた。
- Driverプログラムがどのようにジョブを起動するか?
- LogicalPlanからどのようにPhysicalPlanを生成するか?
- Sparkにおけるパイプライン化とは何か、またそれがどのように使用され、実装されているか?
- ジョブ生成、登録のコードの流れ概要
だが、下記の内容については更に掘り下げる必要がある。
- Shuffleプロセス
- Taskの実行手順と実行場所
次の章ではSparkのShuffleプロセスについて説明する。
個人的な意見としては、LogicalPlanからPhysicalPlanへの変換はまさに傑作だと考えている。
依存性、Stage、Taskの抽象化は上手く練られており、実際にどう実装すればいいかもクリアになっていると考えている。
Source Code of the Example Job
package internals
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.HashPartitioner
object complexJob {
def main(args: Array[String]) {
val sc = new SparkContext("local", "ComplexJob test")
val data1 = Array[(Int, Char)](
(1, 'a'), (2, 'b'),
(3, 'c'), (4, 'd'),
(5, 'e'), (3, 'f'),
(2, 'g'), (1, 'h'))
val rangePairs1 = sc.parallelize(data1, 3)
val hashPairs1 = rangePairs1.partitionBy(new HashPartitioner(3))
val data2 = Array[(Int, String)]((1, "A"), (2, "B"),
(3, "C"), (4, "D"))
val pairs2 = sc.parallelize(data2, 2)
val rangePairs2 = pairs2.map(x => (x._1, x._2.charAt(0)))
val data3 = Array[(Int, Char)]((1, 'X'), (2, 'Y'))
val rangePairs3 = sc.parallelize(data3, 2)
val rangePairs = rangePairs2.union(rangePairs3)
val result = hashPairs1.join(rangePairs)
result.foreachWith(i => i)((x, i) => println("[result " + i + "] " + x))
println(result.toDebugString)
}
}
終わりに
ここまでで物理的な実行計画を生成する流れがどうなっているかを読んでみましたが、論理的に生成した処理を後から遡る形で必要なStageを確定させ、必要になった段階で実行するというアプローチは興味深いものだと感じました。
ただ、1点気になるCartesian(直積)の実行時に複数回データ元を読んでいるように読める記述については実際のコードをどこかで確認しておきたいところではありますね。