はじめに
こんにちは。
前回に引き続き、SparkInternalsを訳していきます。
前回と同じく以後は下記の判例となります。
SparkInternals訳文
コメント
SparkInternals
cache and checkpoint
cache
(またはpersist
)はHadoop MapReduceには存在しない、Spark固有の重要な要素となる。この機能によって、SparkはDataの再利用が可能になり、インタラクティブな機械学習アルゴリズム、インタラクティブなデータ解析といったユースケースにおいて大きく高速化に貢献している。Hadoop MapReduceのジョブと異なり、SparkのLogicalPlan/PhysicalPlanは巨大化し、処理の連鎖も大きく、RDDに対する計算時間も長くなる。もし、不幸にもエラーや例外がTask実行中に発生した場合、処理の連鎖の全体を再実行する必要が出た場合、計算ロスのコストはかなり大きいものとなる。従って、時間がかかるRDDに対する処理についてはcheckpoint
が必要となる。この対応によって、仮に下流のRDDで何かしら問題が発生した場合でも、チェックポイントとして保存したRDDからデータを復元することで処理を継続することが可能となる。
cache()
まずはOverview章でのGroupByTest
を例として見てみると、FlatMappedRDD
がキャッシュされているため、Job 1(second count())はFlatMappedRDD
から再開してデータを処理することが出来る。そのため、cache()
が同一アプリケーションにおいて同一データが異なるJobが取得する際に再取得を可能としていることがわかる。
Q: どのような種類のRDDをキャッシュする必要があるのか?
繰り返し使用され、かつそこまで大きくないRDD
Q: どうすればRDDをキャッシュするのか?
Driverプログラムでrdd.cache()
を実行するのみでユーザがアクセス可能なRDD(transformation()
で生成されるRDD)についてはキャッシュ可能。但し、reduceByKey()
中のShuffledRDD
, MapPartitionsRDD
のように、Sparkが内部的に生成するRDDについてはキャッシュすることが出来ない。
Q: どのようにSparkはRDDをキャッシュするのか?
推測を行うことが出来るだけだが、直観的に思いつくものとして、TaskがRDDの最初のRecordを取得したタイミングで該当のRDDをキャッシュすべきかどうかの確認を行う、もしキャッシュするべきRDDであった場合は最初のRecordと以後のRecordをblockManager
のmemoryStore
に送信する、というものがある。もしmemoryStore
が全レコードを保持することが出来ない場合は、diskStore
が代わりに使用される。
**実際の実装は推測した内容と似ているが、違う点として以下がある。
SparkはRDDをキャッシュすべきか否かを最初のPartitionを計算する前に判定する。もしRDDがキャッシュするべきRDDであった場合、Partitionを計算後、メモリ上に保持される。cache
はメモリ上に保持する場合のみ使用され、checkpoint
はディスク上にも保持する動作となる。
rdd.cache()
を実行後、rdd
はpersistRDD
で、storageLevel
としてMEMORY_ONLY
を保持するRDDとなる。persistRDD
であること自体がDriverプログラムに対して該当RDDを保存する必要がある旨の通知となる。
上記の流れは下記のソースコードとなっている。
rdd.iterator()
=> SparkEnv.get.cacheManager.getOrCompute(thisRDD, split, context, storageLevel)
=> key = RDDBlockId(rdd.id, split.index)
=> blockManager.get(key)
=> computedValues = rdd.computeOrReadCheckpoint(split, context)
if (isCheckpointed) firstParent[T].iterator(split, context)
else compute(split, context)
=> elements = new ArrayBuffer[Any]
=> elements ++= computedValues
=> updatedBlocks = blockManager.put(key, elements, tellMaster = true)
rdd
において、rdd.iterator()
がPartitionを計算するために呼ばれたタイミングでblockId
が該当Partitionをキャッシュするか示すために使用され、blockId
がRDDBlockId
の型だった場合、memoryStore
においてTaskの実行結果を保持するblockId
とは別のものとして扱われる。加えて、blockManger
中のPartitionは既にチェックポイントとして保持されていないかの確認も行われる。もし保持されていた場合、既に該当Taskが実行されている旨の通知が行われ、該当Partitionに対して必要ないことがわかる。ArrayBuffer
のelements
として該当チェックポイントのPartitionが保持する各Recordを取得可能だが、取得できない場合は該当Partitionが初回計算であることがわかり、結果をelements
として保持する。最終的に、elements
はblockManager
にキャッシュ化のために登録される。
blockManager
はelements
(partition)をmemoryStore
にLinkedHashMap[BlockId, Entry]
として保持する。もし該当PartitionがmemoryStore
の容量(ヒープの60%)より大きい場合、データを保持できない旨の応答のみを返す。サイズ的に保存可能であった場合、memoryStore
は事前にキャッシュされていたRDD Partitionを新規Partitionの保存領域を確保するために削除する。もし削除された結果生成された領域が十分なサイズを保持していた場合、新規PartitionはLinkedHashMap
に登録される。もしサイズが不足していた場合、最大サイズでそもそも足りなかった場合と同様にデータを保持できない旨の応答のみを返す。ここで留意すべきは、新規Partitionが所属するRDDのPartitionは予め保持されていたとしても削除されないこと。理想的にははじめにキャッシュされたRDDははじめに削除されるという動作となる。
Q: どのようにキャッシュされたRDDを読むのか?
キャッシュされたRDDが以降のJobにおいて再計算されたタイミングで、TaskはblockManager
のmemoryStore
からデータを取得して使用する。具体的にはRDD Partitionの計算タイミング(rdd.iterator()
実行時)において、blockManager
に対して該当Partitionがキャッシュされているかの確認を行う。もしローカルにキャッシュされていた場合、blockManager.getLocal()
を用いてmemoryStore
からデータの取得を行う。もし別ノードにキャッシュされていた場合、blockManager.getRemote()
が実行され、下記のような動作となる。
キャッシュされたPartitionの保存場所
各ノードのblockManager
がPartitionをキャッシュしたタイミングでMasterノードのblockManagerMasterActorに対してキャッシュした旨の通知を行う。通知された情報は
blockMangerMasterActorの保持する
blockLocations: HashMapに保持される。TaskがキャッシュされたRDDを必要とした場合、
blockManagerMaster.getLocations(blockId)リクエストをDriverプログラムに対して送信し、Partitionの保存場所を取得する。その際、Driverプログラムは
blockLocations`から保存場所を取得し、返す。
古いPartitionが破棄された場合場所が取得できないことになりますが、これは破棄したタイミングで同様にBlockManagerMasterに対して通知しているのでしょうか?
別ノードにキャッシュされたParititionの読み込み
キャッシュされたPartitionの場所情報を受信後、getBlock(blockId)
リクエストをconnectionManager
を介して対象ノードに送信する。対象ノードはローカルのblockManager
中のmemoryStore
からPartitionを取得し、返す。
ここで取得されるPartitionのサイズは相応に大きくなるはずですので、相応のサイズの応答が返される形になりますね。
Checkpoint
Q: どのような種類のRDDをチェックポイント化する必要があるのか?
- 処理に長時間がかかる場合
- 処理の連鎖が非常に長い場合
- 多くのRDDから依存されている場合
実際の所、ShuffleMapTask
の出力をローカルディスクに保存することはcheckpoint
でもある。但し、これはPartition単位の保存であって、RDD単位の保存ではない。
Q: どのタイミングでチェックポイント化が行われるのか?
既に記述されたように、全Partitionは算出されたタイミングでキャッシュする必要がある場合はメモリ上にキャッシュされる。だが、checkpoint
は異なる原理によって扱われる。代わりにcheckpoint
は該当Jobの終了まで待ち、checkpoint
化を行うためのJobを生成する。もし異なる原理によって動作するなら、rdd.checkpoint()
の前にrdd.cache()
を実行していた場合、チェックポイント化するRDDは2回算出されるのかという疑問が生じる。このようなケースにおいては、後段のrdd.checkpoint()
においてはRDDの再算出は行われない。代わりに、キャッシュからデータを読み込む。実際、Sparkはrdd.persist(StorageLevel.DISK_ONLY)
メソッドを提供しており、初回計算時にディスク上に保持することが可能になっている。しかしながら、このpersist
とcheckpoint
は異なる動作となる。それについては後述する。
Q: どのようにチェックポイント化の処理は実装されているのか?
下記のような流れで行われる。
RDDは[ Initialized --> marked for checkpointing --> checkpointing in progress --> checkpointed ]という流れで状態遷移を行い、最終的にcheckpointedという状態になる。
Initialized
Driverプログラムにおいてrdd.checkpoint()
実行後、、RDDはRDDCheckpointData
で管理されるようになる。そのタイミングでユーザはチェックポイント化されたRDDを保持するためのHDFS上のパスを指定する必要がある。
marked for checkpointing
初期化後、RDDCheckpointData
はRDDに対してMarkedForCheckpoint
とマークする。
checkpointing in progress
Job実行が終了したタイミングで、finalRdd.doCheckpoint()
が実行される。finalRDD
は処理の連鎖を遡っていき、チェックポイント化が必要なRDDを見つけたタイミングでRDDはCheckpointingInProgress
としてマークされる。そのタイミングでHDFSへの書込み設定を保持するore-site.xml等の設定が他ノードのblockManager
に対してもブロードキャストされ、チェックポイント化のための下記のJobが起動される。
rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString, broadcastedConf))
checkpointed
チェックポイント化Job終了後、該当RDDの依存要素を削除し、該当RDDのステータスをcheckpointed
とする。その後、該当のcheckpointRDD
を親とするRDDに対して補助依存に追加し、親がcheckpointRDD
であることを設定する。The checkpointRDD
がそれ以後に使用された場合、チェックポイントとして保存されたファイルからRDD Partitionを生成する。
尚、下記のような興味深い事象もある。
2個のRDDがDriverプログラムにおいてはチェックポイント化する指定が行われている。だが、実際にはresultのみがチェックポイント化に成功する。バグなのか、意図的に下流のRDDのみチェックポイント化されているかはわからない。
これは以降で使用されるかということを考えると、プログラム中で再利用されているわけではないため、下流のRDDのみをチェックポイント化すれば途中経過はいらない、という動作になっているようには見えますね。
val data1 = Array[(Int, Char)]((1, 'a'), (2, 'b'), (3, 'c'),
(4, 'd'), (5, 'e'), (3, 'f'), (2, 'g'), (1, 'h'))
val pairs1 = sc.parallelize(data1, 3)
val data2 = Array[(Int, Char)]((1, 'A'), (2, 'B'), (3, 'C'), (4, 'D'))
val pairs2 = sc.parallelize(data2, 2)
pairs2.checkpoint
val result = pairs1.join(pairs2)
result.checkpoint
Q: どのようにチェックポイント化されたRDDは読み込まれるのか?
runJob()
はfinalRDD.partitions()
をいくつのTaskが必要になるかを確定させるために実行する。その際、rdd.partitions()
はRDDが既にチェックポイント化されているかどうかをRDDCheckpointData
の管理下にあるかどうかで判定する。もしチェックポイント化されていた場合、RDD中のPartitionを(Array[Partition]
)として返す。rdd.iterator()
が該当RDDのPartitionを算出するために呼ばれたタイミングで、 computeOrReadCheckpoint(split: Partition)
も実行され、該当RDDがチェックポイント化されていないかを確認する。もしチェックポイント化されていた場合、親RDDのiterator()
はCheckpointRDD.iterator()
として実行される。CheckpointRDD
はRDD Partitionを生成するためにファイルを読み込む。これがCheckpointRDD
がチェックポイント化されたRDDとして補助的な依存として追加される理由となる。
このあたりはCheckpointRDDがどのように同一性の判定が行われるかも気になりますが、あまりチェックポイント化について情報がないのも事実ですね。
Q: cache
とcheckpoint
の違いはどこにあるのか?
Tathagata Das氏からの回答は下記の通り。
cache
とcheckpoint
の重要な違いとして、cache
はRDDをメモリ(またはDisk)に保持しているものの、RDD処理の連鎖が再実行されたタイミングで、ノード障害が発生してキャッシュしたRDDが失われていた場合、再算出が必要になるというものがある。checkpoint
はRDDをHDFS上にファイルとして保存し、それまでの処理の連鎖を破棄することが可能となる。これは長い処理の連鎖を途中で切り捨て、信頼性のあるHDFS上にデータを保持することでレプリケーションによる耐障害性確保が可能となることを示している。
さらに、rdd.persist(StorageLevel.DISK_ONLY)
についてもcheckpoint
とは異なる。これまで記述してきたように、rdd.persist(StorageLevel.DISK_ONLY)
はRDD Partitionをディスク上に保存し、保存されたPartitionはblockManager
によって管理される。Driverプログラムが終了したタイミングで、CoarseGrainedExecutorBackend
の停止に伴いblockManager
も停止する。そのため、ディスク上に保存されたRDDも削除される。(blockManager
で管理されたローカルファイルは終了時に削除される)しかし、checkpoint
はファイルをHDFS、またはローカルディスクに永続化する。手動で明確に削除されない限り、ディスク上に保持され、次のDriverプログラムで使用することも可能になる。
おそらく、Driverプログラムを跨いでRDDを共有可能になる、いうのが最大の違いになるのだと思いますが、当然ながら次のDriverプログラムでのチェックポイントファイルの読み方が気になる所ですね。
まとめ
Hadoop MapReduceにおいてJobを実行する際、データを各Task、Job終了時にHDFSに確保する。Task実行時、それらのデータはメモリとディスクの間でスワップし続ける。Hadoop MapReduceにおける問題点はエラー発生時にデータの算出が必要になることで、Shuffleが途中で停止し、ディスク上に半分しか保存できなかったケースにおいて、保存されたデータについても再算出が必要になってしまう。Sparkの利点としては、エラー発生時にチェックポイントから処理を再開できるということであるが、チェックポイント化をすることによってJobを追加で実行する必要が出てくるというデメリットもある。
Example
package internals
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object groupByKeyTest {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("GroupByKey").setMaster("local")
val sc = new SparkContext(conf)
sc.setCheckpointDir("/Users/xulijie/Documents/data/checkpoint")
val data = Array[(Int, Char)]((1, 'a'), (2, 'b'),
(3, 'c'), (4, 'd'),
(5, 'e'), (3, 'f'),
(2, 'g'), (1, 'h')
)
val pairs = sc.parallelize(data, 3)
pairs.checkpoint
pairs.count
val result = pairs.groupByKey(2)
result.foreachWith(i => i)((x, i) => println("[PartitionIndex " + i + "] " + x))
println(result.toDebugString)
}
}
最後に
キャッシュと、チェックポイントの違いについて書かれた章でしたが、チェックポイントは相当長いアプリケーションになるか、またはアプリケーションを跨いで使用しない限り利点はそれほど活かせない、ということになりそうですね。
後は、アプリケーションを跨いでチェックポイントを使用するための具体的な実装方法ですが、こちらは後で調べてみるとします。
今回で残りは1章となったため、次回がSparkInternalsとしては最終回となりそうですね。