はじめに
こんにちは。
前回に引き続き、SparkInternalsを訳していきます。
前回と同じく以後は下記の判例となります。
SparkInternals訳文
コメント
SparkInternals
Architecture
既に3章でSparkジョブについて記述しているが、本章では全体のアーキテクチャと、Master、Worker、Driver、Executorといった要素が如何に協調してジョブを終了させるかについて記述する。
図を用いて理解したい場合はコードの部分は読み飛ばしてもらって構わない。
Deployment diagram
概要章にて、下記の図を用いて概要説明を行った。
次はこの図の詳細についてみていく。
Job submission
この図はどのようにMasterノード上に存在するDriverプログラムがジョブを生成し、Workerノードに登録するかを示している。
Driver側の振る舞いは下記のコードの流れとなっている。
finalRDD.action()
=> sc.runJob()
// generate job, stages and tasks
=> dagScheduler.runJob()
=> dagScheduler.submitJob()
=> dagSchedulerEventProcessActor ! JobSubmitted
=> dagSchedulerEventProcessActor.JobSubmitted()
=> dagScheduler.handleJobSubmitted()
=> finalStage = newStage()
=> mapOutputTracker.registerShuffle(shuffleId, rdd.partitions.size)
=> dagScheduler.submitStage()
=> missingStages = dagScheduler.getMissingParentStages()
=> dagScheduler.subMissingTasks(readyStage)
// add tasks to the taskScheduler
=> taskScheduler.submitTasks(new TaskSet(tasks))
=> fifoSchedulableBuilder.addTaskSetManager(taskSet)
// send tasks
=> sparkDeploySchedulerBackend.reviveOffers()
=> driverActor ! ReviveOffers
=> sparkDeploySchedulerBackend.makeOffers()
=> sparkDeploySchedulerBackend.launchTasks()
=> foreach task
CoarseGrainedExecutorBackend(executorId) ! LaunchTask(serializedTask)
解説:
下記のコードが評価されたタイミングで、プログラムはDriver用の通信を開始する。(例:ジョブ用のExecutor、スレッド、Actor等)
val sc = new SparkContext(sparkConf)
Job logical plan
Driverプログラムにおいてtransformation()
を実行することで処理の連鎖(つまりはRDD
の系列)が構築される。
各RDDに対して下記のことが実行される。
-
compute()
関数はPartition中のレコード群に対する処理を定義する。 -
getDependencies()
関数はRDD Partitionを跨いで依存関係を定義する。
Job physical plan
各action()
はジョブを起動する。
-
dagScheduler.runJob()
において、異なるStage群が定義される。 - During
submitStage()
において、Stageの実行に必要となるResultTasks
とShuffleMapTasks
群が生成され、TaskSet
にまとめられた上でTaskScheduler
に送信される。TaskSet
が実行されたタイミングで実Task群がsparkDeploySchedulerBackend
に登録され、実際のTaskの分散実行が行われる。
Task distribution
sparkDeploySchedulerBackend
がTaskSetを受け取ったらDriver Actorはシリアライズ化したTaskをWorkerノードに存在するCoarseGrainedExecutorBackend Actorに送信する。
Job reception
Task群を受信後、Workerは下記のことを行う。
coarseGrainedExecutorBackend ! LaunchTask(serializedTask)
=> executor.launchTask()
=> executor.threadPool.execute(new TaskRunner(taskId, serializedTask))
Executorは各TaskをtaskRunner
にまとめ、空いているスレッドをTask実行のために取得する。CoarseGrainedExecutorBackendプロセスは1Executorと対応している。
Task execution
下図はWorkerノードに受信されたタスクがどう実行されるかと、どのようにDriverプロセスがそのTaskの実行結果を処理するかを示している。
Executerはシリアライズ化されたTaskを受信したら復元を行い、Taskを実行することでdirectResult
というDriverに対して結果送信用オブジェクトを取得する。但し、Actorに対して送信可能なデータはそんなに大きく出来ないということに留意する必要がある。
- もし結果が
groupByKey
の結果のように大きすぎた場合、blockManager
を用いてメモリとディスク上に保存を行う。その結果、DriverプログラムはindirectResult
という結果格納場所を示したオブジェクトを取得するのみとなる。もしその実行結果が必要になったタイミングでDriverはHTTPリクエストで実際の結果の取得を行う。 - もし結果が大きすぎなかった場合(
spark.akka.frameSize = 10MB
未満の場合)、直接結果をDriverプログラムに送信する。
blockManager
に対しての詳細は下記の通り。
directResult
のサイズがakka.frameSize
より大きかった場合、BlockManagerのmemoryStoreは結果を保持するためのLinkedHashMapを生成し、そこに結果を保持する。ただし、その全体サイズはRuntime.getRuntime.maxMemory * 設定**spark.storage.memoryFraction(デフォルト0.6)**に収まっている必要がある。もしLinkedHashMapに入力データを保存するだけの領域が存在しない場合、データはdiskStoreに送信され、ディスクに保存される。(但し、storageLevelにdiskが含まれている必要がある)
In TaskRunner.run()
// deserialize task, run it and then send the result to
=> coarseGrainedExecutorBackend.statusUpdate()
=> task = ser.deserialize(serializedTask)
=> value = task.run(taskId)
=> directResult = new DirectTaskResult(ser.serialize(value))
=> if( directResult.size() > akkaFrameSize() )
indirectResult = blockManager.putBytes(taskId, directResult, MEMORY+DISK+SER)
else
return directResult
=> coarseGrainedExecutorBackend.statusUpdate(result)
=> driver ! StatusUpdate(executorId, taskId, result)
ShuffleMapTask
とResultTask
で出力される結果は下記のように違う。
-
ShuffleMapTask
は下記の2要素を含むMapStatus
を出力- Taskの
BlockManager
のBlockManagerId
:(executorId + host, port, nettyPort)
- Task出力結果の各
FileSegment
のサイズ
- Taskの
-
ResultTask
はあるPartitionに対するfunction
の実行結果を出力-
count()
ならPartition中のレコード数
-
ShuffleMapTask
はファイル出力にFileSegment
を必要としている関係上、OutputStream
Writerオブジェクトがっ必要となる。これらのwriterオブジェクトはshuffleBlockManager
によって生成、管理される。
In task.run(taskId)
// if the task is ShuffleMapTask
=> shuffleMapTask.runTask(context)
=> shuffleWriterGroup = shuffleBlockManager.forMapTask(shuffleId, partitionId, numOutputSplits)
=> shuffleWriterGroup.writers(bucketId).write(rdd.iterator(split, context))
=> return MapStatus(blockManager.blockManagerId, Array[compressedSize(fileSegment)])
//If the task is ResultTask
=> return func(context, rdd.iterator(split, context))
処理自体はTaskの実行結果をDriverプログラムが取得後行われる。
TaskScheduler
がTaskが終了したことを受信した後、その結果の処理が行われる。
- もし結果が
indirectResult
だった場合、BlockManager.getRemotedBytes()
を実行して実際の結果を取得する。 will be invoked to fetch actual results.- もし結果が
ResultTask
だった場合、ResultHandler()
がDriverプログラム側の処理を行う(例えば、count()
の場合、ResultTask
の値を合計する) - もし結果が
ShuffleMapTask
のMapStatus
だった場合、MapStatus
はmapOutputTrackerMaster
のmapStatuses
に投入され、ReduceフェーズのShuffleで利用しやすいようにハンドリングする。
- もし結果が
- もしDriverプログラムが受信したTaskの結果がStageでの最後のTaskのものだった場合、次のStageの登録が行われる。もし終了したStageが既に最後のStageだった場合、
dagScheduler
がジョブ完了したことを通知する。
After driver receives StatusUpdate(result)
=> taskScheduler.statusUpdate(taskId, state, result.value)
=> taskResultGetter.enqueueSuccessfulTask(taskSet, tid, result)
=> if result is IndirectResult
serializedTaskResult = blockManager.getRemoteBytes(IndirectResult.blockId)
=> scheduler.handleSuccessfulTask(taskSetManager, tid, result)
=> taskSetManager.handleSuccessfulTask(tid, taskResult)
=> dagScheduler.taskEnded(result.value, result.accumUpdates)
=> dagSchedulerEventProcessActor ! CompletionEvent(result, accumUpdates)
=> dagScheduler.handleTaskCompletion(completion)
=> Accumulators.add(event.accumUpdates)
// If the finished task is ResultTask
=> if (job.numFinished == job.numPartitions)
listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded))
=> job.listener.taskSucceeded(outputId, result)
=> jobWaiter.taskSucceeded(index, result)
=> resultHandler(index, result)
// If the finished task is ShuffleMapTask
=> stage.addOutputLoc(smt.partitionId, status)
=> if (all tasks in current stage have finished)
mapOutputTrackerMaster.registerMapOutputs(shuffleId, Array[MapStatus])
mapStatuses.put(shuffleId, Array[MapStatus]() ++ statuses)
=> submitStage(stage)
Shuffle read
既に前の説において、Taskの実行と結果の処理について記述したが、本節ではReducerがどのようにしてShuffle時の入力データを取得するかを記述する。前章のShuffle Read節において、既にReducerがどのように入力データを処理するかについては記述されているため、ここで記述するのは入力データの取得方法のみとなる。
Reducerは取得データの場所をどのように知るのか?
Reducerは親StageのShuffleMapTask
が生成したFileSegments
がどのノードに存在するかを知る必要がある。
それらの情報はShuffleMapTask
終了時にDriverプログラムのmapOutputTrackerMaster
に通知される。それらの情報はmapStatuses
としてHashMap[stageId, Array[MapStatus]]
の形で保持される。stageId
を基に、ShuffleMapTasks
が生成したFileSegments
についての情報を保持するArray[MapStatus]
が取得可能となっている。 Array[MapStatus]
はblockManagerId
と各FileSegment
のサイズを保持している。
Reducerがデータの取得が必要になったタイミングで、入力データのFileSegments
の場所を取得するためにblockStoreShuffleFetcher
を実行する。blockStoreShuffleFetcher
は実際はローカルに存在しているMapOutputTrackerWorker
へ問い合わせを行う。MapOutputTrackerWorker
はmapOutputTrackerMasterActor
からMapStatus
を取得するためにmapOutputTrackerMasterActorRef
を用いて通信を行う。blockStoreShuffleFetcher
はMapStatus
を処理し、Reducerがどうやって取得対象のFileSegment
を取得すればいいかの情報である、 blocksByAddress
を取得する。
rdd.iterator()
=> rdd(e.g., ShuffledRDD/CoGroupedRDD).compute()
=> SparkEnv.get.shuffleFetcher.fetch(shuffledId, split.index, context, ser)
=> blockStoreShuffleFetcher.fetch(shuffleId, reduceId, context, serializer)
=> statuses = MapOutputTrackerWorker.getServerStatuses(shuffleId, reduceId)
=> blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = compute(statuses)
=> basicBlockFetcherIterator = blockManager.getMultiple(blocksByAddress, serializer)
=> itr = basicBlockFetcherIterator.flatMap(unpackBlock)
basicBlockFetcherIterator
がTaskからのデータを受信後、いくつかのfetchRequest
が生成される。上記の図からわかるように、reducer-2
は3個のWorkerノードからFileSegment
(白)の取得が必要となる。データ取得時はblockByAddress
という形で表される値を用いてデータの取得を行う。Node 1からは4ブロック、Node 2からは3ブロック、Node 3からは4ブロックの取得を行う。
データ取得プロセスを高速化するためにデータ取得Taskはtasks(fetchRequest
)という形でSubTaskに分割され、各SubTaskは別々の取得スレッドを用いて取得を行う。Sparkは各Reducerに対して5個の並列スレッドを起動してデータの取得を行う(Hadoop MapReduceと同じ構成)。取得したデータはメモリ上に保持されるため、そんなに大きなデータを一度に取得することはできない。(設定spark.reducer.maxMbInFlight、デフォルト48MB) ここで注意するべきなのは、この48MBのバッファは**5個の取得スレッドで共有されるということ。**そのため、各SubTaskは1回の取得量を48MB / 5 = 9.6MB
以内に抑える必要がある。図中のNode 1においては、図中のNode 1からのデータ取得において、size(FS0-2) + size(FS1-2)
は9.6MB以内に収まっているが、 size(FS2-2)
が加わると9.6MBを超過してしまう。そのため、t1-r2
とt2-r2
の間で取得処理を分割する必要が出てくる。結果として、Node 1においては2個のデータ取得リクエストを用いることとなる。fetchRequest
のサイズが9.6MBを超過することはあるのか?答えはもちろんYesで、 FileSegment
のサイズが非常に大きい場合、1取得で埋め尽くされてしまう場合もある。他にも、ReducerがFileSegment
を取得する際に既にそれがローカルにデータが存在している場合、ローカルからデータを読み込むという対処が行われている。Shuffle Read終了時、取得したFileSegment
のデシリアライズが行われ、RDD.compute()
にレコード用のIteratorが渡される。
In basicBlockFetcherIterator:
// generate the fetch requests
=> basicBlockFetcherIterator.initialize()
=> remoteRequests = splitLocalRemoteBlocks()
=> fetchRequests ++= Utils.randomize(remoteRequests)
// fetch remote blocks
=> sendRequest(fetchRequests.dequeue()) until Size(fetchRequests) > maxBytesInFlight
=> blockManager.connectionManager.sendMessageReliably(cmId,
blockMessageArray.toBufferMessage)
=> fetchResults.put(new FetchResult(blockId, sizeMap(blockId)))
=> dataDeserialize(blockId, blockMessage.getData, serializer)
// fetch local blocks
=> getLocalBlocks()
=> fetchResults.put(new FetchResult(id, 0, () => iter))
もう少し詳細に見てみる。
どのようにReducerは対象となるデータが存在しているノードに対してfetchRequest
を送信しているのか?どのように対象となるデータを保持するノードは受信したfetchRequest
を受信し、結果を返しているのか?
もしRDD.iterator()
がShuffleDependency
に到達した場合、BasicBlockFetcherIterator
はFileSegment
を取得する。BasicBlockFetcherIterator
はblockManger
のconnectionManager
を用いて他ノード上のconnectionManager
に対してfetchRequest
を送信する。connectionManager
間の通信はNIOで行われる。他ノードにおいて、例えばNode 2のconnectionManager
がメッセージを受信した場合、blockManager
に対してメッセージを転送する。blockManager
はfetchRequest
に指定されたFileSegment
群をdiskStore
を用いて読み込み、結果をconnectionManager
に返す。もし設定spark.shuffle.consolidateFilesによってファイル集約機能が有効化されている場合、diskStore
はshuffleBolockManager
によって与えられるblockId
を用いて読み込みを行う。読み込み時、FileSegment
が設定spark.storage.memoryMapThreshold(デフォルト8KB)のサイズ内に収まっていた場合、diskStoreはメモリ上に保持する。そうでない場合、RandomAccessFile
のFileChannel
を取得する。このようにして巨大なFileSegment
をメモリ上にロードする。
BasicBlockFetcherIterator
がシリアライズ化されたFileSegments
を別ノードから受信したらデシリアライズし、 fetchResults.Queue
に投入する。ここで、fetchResults.Queue
がShuffle detials
章におけるsoftBuffer
と似ていると気づくかもしれない。BasicBlockFetcherIterator
が必要とするFileSegment
がローカルに存在した場合、ローカルのdiskStore
から取得可能であるため、そこから取得してfetchResults
に追加する。最終的にReducerは結果に保存されたFileSegment
からRecordを取得して処理を行う。
After the blockManager receives the fetch request
=> connectionManager.receiveMessage(bufferMessage)
=> handleMessage(connectionManagerId, message, connection)
// invoke blockManagerWorker to read the block (FileSegment)
=> blockManagerWorker.onBlockMessageReceive()
=> blockManagerWorker.processBlockMessage(blockMessage)
=> buffer = blockManager.getLocalBytes(blockId)
=> buffer = diskStore.getBytes(blockId)
=> fileSegment = diskManager.getBlockLocation(blockId)
=> shuffleManager.getBlockLocation()
=> if(fileSegment < minMemoryMapBytes)
buffer = ByteBuffer.allocate(fileSegment)
else
channel.map(MapMode.READ_ONLY, segment.offset, segment.length)
全ReducerはBasicBlockFetcherIterator
を保持しており、各BasicBlockFetcherIterator
毎に48MBのfetchResults
を保持する。fetchResults
からFileSegment
が取得されたタイミングで次のFileSegment
群の取得が行われ、48MBのサイズを満たすように動作する。
BasicBlockFetcherIterator.next()
=> result = results.task()
=> while (!fetchRequests.isEmpty &&
(bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
sendRequest(fetchRequests.dequeue())
}
=> result.deserialize()
まとめ
アーキテクチャ設計の章より、機能やモジュールが各々かなり独立した構成になっていることがわかると思う。BlockManager
はいい設計だとは思うが、いささか多くのものを管理しすぎているようにも思われる。(データブロック、メモリ、ローカルディスクと他ノードとのネットワーク通信)
この章ではどのようにSparkにおけるモジュールが協調し、ジョブを完了させているか、について記述してきた。(生成、登録、実行、結果集約、結果処理、シャッフル等)多くのコードと図を用いて記述してきたが、より詳細が知りたければソースコードを見て欲しい。
最後に
この章で実際に前章までの流れをモジュール同士がどのように協調して実現しているか概略が見えてきた感じですね。
章でも記述されている通り、詳細見たければソースコードを見るしかないわけですが、その足掛かりも多分に書かれているのはそれはそれでありがたい内容でした。