はじめに
こんにちは。
前回に引き続き、SparkInternalsを訳していきます。
前回と同じく以後は下記の判例となります。
SparkInternals訳文
コメント
SparkInternals
Shuffle Process
ここまででSparkのPhysicalPlanと、それをどう実行するかの詳細を書いてきた。だが、ShuffleDependencyを通して次のStageがどのようにデータを取得するかについては触れられていなかったため、本章でこれを記述する。
Shuffle Comparison between Hadoop and Spark
Hadoop MapReduceとSparkのShuffleプロセスには共通点と相違点が存在する。
抽象化したレベルで見ると、両者は共通している。
両者は共にMapper(Sparkの場合、ShuffleMapTask)の出力を分割し、対応するReducer(Sparkの場合、次のStageのShuffleMapTaskか、ResultTask)に送信するという構造を持っている。Reducer側はそのデータをメモリ上に保持し、Shuffle&Aggregateを行ったうえで実際のreduce()
を適用することでデータの統合を行う。
より実装レベルに近づいた視点で見ると、両者には違いがある。
Hadoop MapReduceにおけるShuffleはSortベースになっており、combine()
、reduce()
実行前にあらかじめソートが完了している必要がある。そのソート処理は外部ソートアルゴリズムで実施されている。
対して、現状SparkのShuffleのデフォルトの動作はHashベースとなっている。通常はその結合処理はHashMapを用いて実施され、ソートが事前に行われるということはない。もしデータをソートしてから利用する必要がある場合、明示的にsortByKey()
を実行する必要がある。ただし、Spark1.1からSortベースのShuffleプロセスも実行可能になっており、Spark1.2以降はデフォルトのShuffleプロセスのアルゴリズムもSortベースとなっている。
Spark1.2以降のShuffleはSort版となっています。Sort版Shuffleの詳細はSpark Architecture:Shuffleで説明されています。
実装面で見た場合、同様に違いが存在する。
既にわかっているように、Hadoop MapReduceにおいては明確なmap() > spill > merge > shuffle > sort > reduce()
という流れが存在しており、各Stepは事前定義されており、手続き型のプログラミングと相性のいいものとなっている。
しかしながら、Sparkにおいてはそのような固定化されたStepは存在せず、代わりにStage群と一連のTransformationが存在している。そのため、spill、merge、aggregateといった処理は一連のTransformationに内包される形になっている。
Mapper側のPartition分割、およびデータ保持プロセスを"Shuffle Write"、Reducer側のデータ読み込み、集計プロセスを"Shuffle Read"と名づける。すると、Shuffle Write/Shuffle ReadをどのようにSparkのLogicalPlan/PhysicalPlanと統合すればいいのか、という問題が生じる。Shuffle Write/Shuffle Readを効率的に実行するにはどのように実装すればいいのだろうか?
Shuffle Write
Shuffle WriteはSortされた出力が必要ない場合、相対的なシンプルな構成のTaskとなる。Shuffle Writeはデータを分割し、出力するのみとなる。このようなデータ永続化は「ヒープ領域使用量削減」「耐障害性」の2個の利点を持つ。
本機能の実装はシンプルとなる。ShuffleMapStage(のShuffleMapTask)の最後にShuffle Writeを行うロジックを追加すればいい。最終RDDに含まれる各出力レコードは下図のように分割され出力される。
上記の図においては4個のShuffleMapTasksが2個のCoreを保持する同一Workerノード上で実行されている。Task実行結果(Stageの最終RDD中のRecordリスト)はローカルディスクに出力される。各TaskはR個(=次Stageに存在するReducer Taskの数)のバッファを保持する。このバッファ群はSparkではbucketsと呼ばれる。デフォルトでは各bucketのサイズは32KB(Spark1.1以前は100KB)となっており、このサイズは設定spark.shuffle.file.buffer.kbで設定可能となっている。
bucketはSparkにおけるShuffleMapTaskのデータ分割結果の出力先を示す共通的な概念となっている。ただ、ここでは単純化のためにbucketはメモリ上のバッファとして扱う。
ShuffleMapTaskは最終RDDの結果を算出するためにパイプライン化技術を使用している。各Recordはpartitioner.partition(record.getKey())
で決定される所属bucketに対して送信される。これらのbucket中のデータは継続的にローカルファイル上にShuffleBlockFile、または略してFileSegmentとして呼ばれるファイル形態として継続的に出力される。ReducersはShuffle Readフェーズにおいて、これらのFileSegmentからデータを読み込む。
このような実装は非常にシンプルに済むが、いくつかの検討事項も持つ。
- 非常に数多くのFileSegmentを出力する危険性があるということ。各ShuffleMapTaskはR(=次Stageに存在するReducer Taskの数)個のFileSegmentを出力する、つまりM個のShuffleMapTaskはM * R個のファイルを出力する。大きなデータセットに対して処理を行う場合にはMもRも巨大な値となり、それだけ大量の中間データファイルを保持することになってしまう。
- これらのバッファは大量のスペースを確保するケースがあるということ。Workerノードにおいて、Sparkに用いることが出来るCore毎にR * M個のbucketを確保される計算となる。SparkではShuffleMapTask実行後にこのバッファを再利用するのだが、R * n 個のbucketはメモリ上に残ってしまう。例えば8Coreを保持して1000ReducerJobが存在するケースを考えると、bucketの総容量は256MBにも達する。(R * cores * 32KB)
現在は2個目の問題に対してあまりよろしくない解決策が適用されている。とにかくバッファに書き込む必要がある関係上、バッファが小さすぎる場合、IO速度に影響が出る。1個目の問題に対してはファイルを集約する方式がすでに実装済みとなっている。その方式について見てみよう。
上記の図から明白だとは思うが、ShuffleMapTasksが1Core上で連続して実行される場合、Shuffle用のファイルを共有する方式となっている。各Taskは出力データを前Taskが出力したShuffleBlock iに対してShuffleBlock i'として追記する。ここでShuffleBlockはFileSegmentと呼ばれる。この方式においては次StageのReducer群はこの出力されたファイルを丸々読み込むだけでよく、各Workerノードで出力されるファイル数をCores * Rまで削減することができる。ファイル集約機能は 設定spark.shuffle.consolidateFilesにて有効化可能。
Shuffle Read
まずはShuffleDependencyを含むreduceBykeyのPhysicalPlanを見てみよう。
直感的にShuffleRDDを構成するためにMapPartitionRDDのデータを取得する必要があることはわかる。
ただ、下記のような問題がある。
いつデータを取得するのか?各ShuffleMapTaskが実行される毎に取得するのか?それともすべてのShuffleMapTasksの実行完了後にまとめて取得するのか?
取得と取得したレコードの処理は同時に実行するのか、それとも別々のタイミングで行うのか?
取得したデータはどこに保持するのか?
次StageのTaskはどのように取得されたデータの場所を特定するのか?
Sparkでの方式は以下のとおりとなる。
いつデータを取得するのか?
全ShuffleMapTasksの終了を待ち、その後データの取得を行う。既にわかっているとおり、Stageは親Stageがすべて実行された後に実行される。そのため、データ取得処理が前Stageの全ShuffleMapTasks実行後に始まることは直感的にわかるだろう。取得したFileSegmentsはメモリ上に保持する必要があるが、それだけだと取得したデータをディスクに出力する前に大量のデータを取得することはできないことになる。Sparkでは設定spark.reducer.maxMbInFlightによってこの取得用バッファのサイズを設定している。デフォルトは48MBとなっている。このバッファ(SoftBuffer)は普段は複数の取得されたFileSegmentsを保持するが、データサイズ次第では1個のFileSegmentsで占有されてしまうケースもある。
取得と取得したレコードの処理は同時に実行するのか、それとも別々のタイミングで行うのか?
Recordの取得と処理は同じタイミングで行われる。MapReduceの場合、Shuffle Stageはデータの取得と、combine()
ロジックの適用が同じタイミングで行われる。しかし、MapReduceにおいてはReducerの入力データはあらかじめソートされている必要があるため、reduce()
ロジックはshuffle-sortプロセスの後に適用される。SparkではReducerの入力データがソートされている必要はないため、データの取得の完了を処理開始まで待つ必要がない。
このような機構を実現するために、Sparkにおいてはshuffleと、処理の適用がどのように実装されているのか?実際のSparkにおいては、ジョブ実行用のデータ構造としてHashMapのような構造を用いることで解決している。Shuffleプロセスから取得された各のペアはHashMapに順次追加される。既にKeyに対応するデータが存在する場合、func(hashMap.get(Key), Value)
を適用し、データを統合していく。前述のWordCountサンプルの場合、この関数はhashMap.get(Key) + Value
で表すことができ、この結果がHashMapの値として更新される。この統合関数はMapReduceにおけるreduce()
と似た立ち位置だが、詳細は異なる。違いについてを下記のコードコードスニペットにて示している。
// MapReduce
reduce(K key, Iterable<V> values) {
result = process(key, values)
return result
}
// Spark
reduce(K key, Iterable<V> values) {
result = null
for (V value : values)
result = func(result, value)
return result
}
Hadoop MapReduceにおいては、Process関数内で任意のデータ構造を定義することが可能で、パラメータとしてはIterableをとる。データを先の処理のためにキャッシュすることも可能。
Sparkにおいては、高階関数に似た機構をProcess関数適用に用いている。
差の例として、Hadoop MapReduceにおいては全体の値の平均値や合計値、個数を出すことが非常に容易になっているが、Sparkはそうではない。詳細は後述する。
取得したデータはどこに保持するのか?
取得したFileSegmentsはsoftBufferに保持される。データが処理されたタイミングで、結果は設定で定義された箇所に出力される。もし設定spark.shuffle.spillがfalseの場合、この出力先はメモリ上のみに制約される。メモリ上のデータ保持のためにAppendOnlyMapというデータ構造が用いられる。メモリ上のみでない保持の場合、メモリとディスク上に出力されるが、その場合はExternalAppendOnlyMapというデータ構造が用いられる。このデータ構造を用いることにより、メモリ上に収まらない場合にソートされたKeyとValueのペアをディスク上に出力することができる。メモリとディスクを両方使用する場合の主要な問題点として、どのようにこの両者のバランスを取るか、がある。Hadoop MapReduceにおいては、デフォルトでメモリの70%がShuffle用の領域として確保されている。メモリの使用量が66%に達したタイミングでmerge-combine-spillプロセスを開始する。Sparkでも類似の方式をとっているが、詳細については本章にて後述する。
次StageのTaskはどのように取得されたデータの場所を特定するのか?
前章にて、ShuffleMapStageの中の重要なステップとして、MapOutputTrackerMaster.registerShuffle(shuffleId, rdd.partitions.size)
を実行して最終的なRDDを登録するというものがある。そのため、Shuffleのプロセスにおいて、Reducerはデータの場所をDriverプロセスのMapOutputTrackerMasterに問い合わせることで取得している。ShuffleMapTaskが終了した際に、FileSegmentの場所をMapOutputTrackerMasterに登録している。
ここまででShuffle Write/Shuffle Readで用いられるメインの方式について実装例を基に説明してきた。以後は他の興味深い事例について詳細を見てみよう。
典型的なTransformationにおけるShuffle Read詳細
reduceByKey(func)
ここまでで簡単にreduceByKey()
のデータ取得、Reduceプロセスについて記述してきた。ここで注意するべきは、あるタイミングにおいて、RDD中の全データがメモリ上に存在するわけではないということである。このロジック適用処理はRecordベースで行われ、既に処理が完了した元Recordはメモリ上から除去される。Recordレベルの視点においてはreduce()
の処理は下記の図のように示される。
この図からわかるように、取得されたRecordはHashMapを用いて集計される。全Recordを集計したタイミングで結果が取得できる。そのため、funcは可換律を満たす必要がある。
上記の図におけるMapPartitionsWithContextオペレーションはShuffledRDDをMapPartitionsRDDに変換するために用いられる。
ノード間のネットワーク通信を削減するため、Hadoop MapReduceにおいてはMapSideCombineを行うことができる。これはSparkでも実行可能。そのために必要なことは、ShuffleMapStageにおいてmapPartitionsWithContext
を実行するのみ。reduceByKeyの一例として、 ParallelCollectionRDDからMapPartitionsRDDへの変換はMapSideCombineと同等の処理を行っている。
Hadoop MapReduceにおけるmap()->reduce()とSparkにおける reduceByKeyの比較
MapSide:
両者に違いはない。combine()
ロジックにおいて、Hadoop MapReduceはあらかじめデータがSortされていることを課している。Spark側はHashMapを用いてcombine()
を行う。
Reduce side:
Hadoop MapReduceのReduceプロセスはShuffle用のデータをすべて取得した後combine()
を適用する。その後、データのマージソートをreduce()
の前準備として実行する。
Sparkにおいてはデータの取得とreduce()
はHashMapを用いて同時に実行される。その際、reduce関数は可換律を満たす必要がある。
メモリ使用における比較
MapSide:
Hadoop MapReduceはmap()
の出力データを保持してソートを行うために大量の循環バッファが必要になる。しかし、combine()
は追加の領域は必要としない。
Sparkはcombine()
実行にはHashMapを必要にし、Recordをローカルディスクに出力するためにバッファ(buckets)が必要となる。
Rereduce side:
Hadoop MapReduceはShuffleされたデータを保持する領域がメモリ上に必要となる。combine()
とreduce()
については既にデータがソート、グループ化済みのため、追加の領域は必要としない構成となっている。
SparkにおいてはsoftBuffer用の領域がデータ取得のために必要となる。HashMapはcombine()
とreduce()
の結果保持のためにメモリのみ使用という設定がされている場合メモリ上に確保される。しかしながら、データの一部は設定でメモリとディスクの使用が有効になっている場合はディスク上にも保持される構成となっている。
groupByKey(numPartitions)
実行過程はreduceByKey()
と似ている。集計のための関数がresult = result ++ result.value
となり、各キーに対応する値が集計されること無くグループ化される点が違いとなっている。
distinct(numPartitions)
こちらも実行過程はreduceByKey()
と似ている。集計のための関数がresult = result == null ? record.value : result
となりKeyに対応する値がHashMap上に存在するかをチェックするためだけの処理となっている。もともとKeyに対応する値が存在する場合、Recordは破棄され、存在しない場合はHashMap上に保持される。reduceByKey()
と同様に、MapSideCombineも適用されている。
cogroup(otherRDD, numPartitions)
cogroupを実行するにあたり、0個、1個、またはそれ以上のShuffleDependencyによる関連が存在しえる。しかし、Shuffleプロセスにおいて各ShuffleDependencyに対してHashMapを生成することはせず、1個のHashMapをすべてのShuffle実行に用いる。reduceByKey()
との違いとしては、HashMapはmapPartitionsWithContextでなく、RDDのcompute()
実行のタイミングで生成される点がある。
RDDに対するTask実行時にArray[ArrayBuffer]
を生成する。このArray[ArrayBuffer]
は入力RDDと同じ数の空ArrayBufferを保持する。そのため、図示された例においてはRDD a、RDD b二対応した2個のArrayBufferが各Taskに対して生成される。RDD aからのKey-Valueのペアが1個目のArrayBufferに追加され、RDD bからのKey-Valueのペアが2個目のArrayBufferに追加される。最終的にmapValues()
オペレーションにおいてArray[ArrayBuffer]
が(ArrayBuffer, ArrayBuffer) => (Iterable[V], Iterable[W])
の変換で最終的な型に変換される。
intersection(otherRDD) and join(otherRDD, numPartitions)
この2個のオペレーションは内部的にcogroupを使用しており、内部のShuffleプロセスはcogroupと同一の構成となっている。
sortByKey(ascending, numPartition)
sortByKey()
の処理プロセスはreduceByKey()
と若干異なり、取得した入力Recordを扱うためにHashMapを使用しない。代わりに、Key-Valueの値をRangeベースで分割する。同一Partition中の値はKeyでソートされた状態で出力される。
coalesce(numPartitions, shuffle = true)
coalesce()
はShuffleDependencyを生成するが、実際は取得したデータの集計は必要ないため、HashMapの使用もされない。
HashMap in Shuffle Read
ここまででで述べられてきたように、HashMapはSparkのShuffleプロセスにおいてしばしば使用されるデータ構造となっている。Sparkは2バージョンの特別なHashMapを保持しており、メモリ上にのみデータを保持する場合はAppendOnlyMapが、メモリとディスクの両方を用いてデータを保持する場合はExternalAppendOnlyMapが使用される。
以後ではこの2個のHashMap実装について詳細を述べる。
AppendOnlyMap
Sparkのドキュメントにおいて、AppendOnlyMapは「シンプルなHashTableで、追加専用のユースケースに最適化されている。キーは追加された後は削除されず、各キーに対応する値は更新されることを想定して最適化されている。」と記述されている。実装方式はシンプルで、下図のようにオブジェクトの巨大な配列を確保し、Keyが青い領域に保持され、Valueが白い領域に保持される。
put(K, V)
が実行された場合、hash(K)の値を用いて配列の中のスロットを割り当てる。既に該当の場所が使用されていた場合、二次プロービング技術を用いて次のスロットを見つける。図中に書かれた追加処理を例とすれば、Key=K6、Value=V6の値を追加する場合、3番目のプロービングで空きスロット(K4の次のスロット)を発見し、そこにV6(K6の次のスロット)と共に保存する。get(K6)
実行時、同様のフローで値を保持するスロットを発見する。Keyに対応した新たな値を保存する場合、V6をK6の次のスロットから取得し、新たな値を算出した上で算出結果をV6が保存されていたスロットに保存する。
AppendOnlyMapをIteratorで走査する場合、単に配列をスキャンするだけですむ。
もし割り振られたオブジェクト配列領域が70%使用された場合、倍のサイズのオブジェクト配列を作成し、そこに元々のオブジェクト配列中の値を再構築しながら保存する。
AppendOnlyMapにおいてはdestructiveSortedIterator()
というIterator[(K, V)]を返すメソッドが存在する。このメソッドはソートされた状態のKey-Valueペアを返す。実装方式はまずはじめに全てのKey-Valueペアを各Key-Valueペアが1スロットに保持された配列を作成し、Array.sort()
メソッドで要素を整列した上で用いている。メソッド名が示しているように、個のメソッドはオブジェクト配列の構造を壊す攻勢となっている。
ExternalAppendOnlyMap
AppendOnlyMapと比べて、ExternalAppendOnlyMapはより複雑な構成となっている。実装コンセプトはHadoop MapReduceのshuffle-merge-combine-sortプロセスと似たものとなっている。
ExternalAppendOnlyMapはAppendOnlyMapを保持しており、追加されたKey-ValueペアはAppendOnlyMapに追加される。AppendOnlyMapのサイズが増大した場合、まずはメモリ上に確保可能な領域があるか確認を行う。もし十分な領域がメモリ上に存在する場合、AppendOnlyMapのサイズを2倍にすることで対応する。そうでない場合、AppendOnlyMapに保持される全てのKey-Valueペアはソートされた上でディスクに出力される。(その際、destructiveSortedIterator()
を用いて行われる) 図中において、対象Mapの4回のファイル出力が行われている。各ディスク出力において、spillMap fileが生成され、空のAppendOnlyMapがその後追加されるKey-Valueペアに対応するために初期化される。ExternalAppendOnlyMapにおいて、Key-Valueペアが追加された場合、メモリ上に保持されたAppendOnlyMapのみを用いて集計が行われる。そのため、最終的な結果算出フェーズにおいて、ディスクに出力されたMapとメモリ上に存在するAppendOnlyMapをマージするglobal merge-aggregate処理を実行する必要が出てくる。
Global merge-aggregateは以下のようにして実行される。
まずはじめに、メモリ上に保持されたAppendOnlyMapをソートし、sortedMapを生成する。その後、DestructiveSortedIterator (sortedMap用)か、またはDiskMapIterator (ディスク上に出力されたDisk Spill Map用for on disk spillMap)がKey-Valueペアの一部分をStreamBufferに読み込むために実行される。StreamBufferはmergeHeapに追加され、各StreamBufferに保持されるKey-Valueペアは全てKeyに対するハッシュ値(hash(key)
)が同一の値となっている。図の構成を例に挙げるとすると、hash(K1) == hash(K2) == hash(K3) < hash(K4) < hash(K5)
が満たされている。
結果として、はじめに出力された1st spillのはじめの3Recordは同じStreamBufferに読み込まれる。マージのプロセスはシンプルで、同一Hash値を持つキーを保持するStreamBufferをヒープ上に確保し、それらの値をマージするためにArrayBuffer[StreamBuffer](mergedBuffers)に追加する。はじめに追加されたStreamBufferをminBufferと呼び、minBufferに保持されているはじめのKey-ValueペアのKey値をminKeyと呼ぶ。あるマージ処理はmergedBuffer中に保持されたminKeyをKeyとして保持するKey-Valueペアを集計し、結果として出力することを行う。mergedBuffer中のマージ処理が完了した時に残っているKey-ValueペアをmergeHeapに返し、空になったStreamBufferをメモリ、またはディスク出力されたMapから
ここで、3つまだ述べられていない点がある。
- メモリ利用可能チェック:Hadoop MapReduceはReducerのメモリ領域の70%をShuffle-Sort用に割り振っている。同じように、Sparkは設定spark.shuffle.memoryFraction * spark.shuffle.safetyFraction (デフォルトは0.3 * 0.8)をExternalAppendOnlyMap用に割り振っている。この値を見るに、Sparkはより保守的と思われる。また、この24%のメモリ領域は同一Executorで実行される全てのReducerで共有されている。ExecutorはShuffleMemoryMap: HashMap[threadId, occupiedMemory]を保持しており、保持している各Reducer中のExternalAppendOnlyMapsのメモリ使用量を監視している。AppendOnlyMapのサイズを増大させる前に、増大後の総メモリ使用量をShuffleMemoryMapの情報を基に算出し、十分な領域が確保可能かどうかの確認を行っている。ただし、はじめの1000Recordを保持する間にはディスク出力チェックが行われないことに留意すること。
- AppendOnlyMapのサイズ算出:AppendOnlyMapのサイズはAppendOnlyMapのサイズを増大させる際にAppendOnlyMapから参照されている全オブジェクトのメモリ量を計算することで可能となる。しかし、それは計算時間がかかりすぎる。Sparkは計算量O(1)で実行可能なサイズ算出アルゴリズムを保持している。この方式のコンセプトはAppendOnlyMapに対してデータの追加や保持データの集計を行う際にMapサイズの変動をRecordのデータ構造サイズを算出することで行うというもの。詳細はSizeTrackingAppendOnlyMapやSizeEstimatorにて確認可能。
- ディスク出力プロセス:Shuffle Writeプロセスのように、SparkはRecordをディスク出力する際にはBufferの生成を行う。Bufferのサイズは設定spark.shuffle.file.buffer.kbで設定可能で、デフォルトは32KBとなっている。シリアライザも同様にディスク出力のためにバッファを割り振っているが、大量のRecordを同時にディスク出力しようとした際に問題が発生しえる。そのため、Sparkでは同時にディスク出力可能なRecord数を設定spark.shuffle.spill.batchSizeで制限している。デフォルト値は10000となっている。
まとめ
本章のここまでで、SparkがHadoop MapReduceの固定されたshuffle-combine-merge-reduceモデルと比べてより柔軟なShuffleプロセスを持っていることがわかったはず。このShuffleプロセスにより、Sparkは実際のデータ変換の意味に基づいて異なるデータ構造と異なるShuffle戦略を組み合わせることが可能になっている。
ここまでで、SparkのShuffleプロセスのSortを行わないケースを基にどのように実際のRDDの連鎖の中に組み込まれるかを記述した。併せて、メモリやディスクの使用方式についてHadoop MapReduceと比較しながら記述した。次の章ではプロセス間通信の視点から、どのようにジョブが実行されるかを記述する。その中で、Shuffle時のデータ保存先の問題についても触れる。
終わりに
この章でShuffleをどのように実現するかが触れられましたが、前のStageが全て完了しないと次のStageが開始できず、オーバーラップした実行ができないなどの性能的に問題が発生しそうな要素についても見えてきました。
この章まででデータがどのように移動し、処理されるかの流れは見えてきたため、後はその流れをどのように実現しているか、が以後の章で語られているようです。