Posted at

SparkInternalsで知る、Sparkの内部構造概要(broadcast)

More than 3 years have passed since last update.


はじめに

こんにちは。

前回に引き続き、SparkInternalsを訳していきます。

前回と同じく以後は下記の判例となります。

SparkInternals訳文

コメント


SparkInternals


Broadcast

名前が示す通り、Broadcastの機能はクラスタ中のあるノードから他の全ノードに対してデータを送信することを示す。個の機能は多くの状況において有用で、例えばDriverにおいてテーブルを保持しており、他のノードでもルックアップ用として必要になったケースが挙げられる。Broadcastを用いれば、該当テーブルを他の全ノードに送信し、各Taskがローカルでルックアップが可能となる。実際、Broadcastメカニズムを可用性と効率性を両立して達成するのはチャレンジングな実装となる。Sparkのドキュメントにおいては下記のように記述されている。


Broadcastされた変数はSparkプログラマにとって、Taskと同様の機構で持ち運ばれるコピーではなく、各ノードで利用可能な読み取り専用の変数、という認識の方が正しい。この変数は有用で、例えば全ノードにおいて巨大な入力データセットを効率的に利用するには必須となる。SparkはこのBroadcast変数を効率的なBroadcastアルゴリズムを用い、通信コストを削減する試行を行ってきた。



何故読み取り専用なのか?

一貫性の問題からくるもの。もしBroadcastされた変数が更新可能だった場合、あるノードにおいて更新されたタイミングで他ノードのコピーも併せて更新する必要が出てくるのだろうか?もし複数のノードが各々自分が保持するコピーを更新した場合、どのようにその独立したコピーに対する更新を同期させればいいのだろうか?加えて、耐障害性の問題も関わってくる。このような一貫性に関する困難な問題を回避するため、Sparkは読み取りBroadcast変数のみサポートしている。


何故各ノードに情報をコピーする際、TaskでなくBroadcastするのか?

各Taskはスレッド中で動作しており、1プロセス中で動作している複数のTaskは同一のSparkアプリケーションに属する。そのため、各Executorに対して1個の読み取り専用コピーを分配すれば、全Taskで共有可能となるため。


どのようにBroadcast機能を利用するのか?

Driverプログラム中での例は下記の通り。

val data = List(1, 2, 3, 4, 5, 6)

val bdata = sc.broadcast(data)

val rdd = sc.parallelize(1 to 6, 2)
val observedSizes = rdd.map(_ => bdata.value.size)

Driverプログラムにおいてsc.broadcast()を対象のデータをBroadcastする旨の宣言するために使用する。

ここでbdataの型はBroadcastとなり、rdd.transformation(func)の関数中でbdataをローカル変数のように使用することが出来る。


どのようにBroadcast機能は実装されているのか?

Broadcast機能の実装は興味深いものとなっている。


Broadcast変数用のメタデータ分散

DriverプログラムはローカルディレクトリにBroadcastしたデータを保存し、ローカルディレクトリにアクセスを可能とするためのHttpServerを起動する。Broadcastしたデータが実際にローカルディレクトリに出力されるのはbroadcastメソッドを実行したタイミング(val bdata = sc.broadcast(data))となる。同時に、BroadcastしたデータはDriverのblockMangerStorageLevelがmemory + diskとして書き込まれる。BlockManagerはそのデータに対してBroadcastBlockId型のblockIdを割り振る。transformation関数においてBroadcastされた変数が使用された場合、DriverのsubmitTask()がその変数のメタデータをシリアライズしてシリアライズされた関数と共に各ノードに送信する。Akkaのメッセージサイズ制限の関係上、メタデータでなく実データをBroadcastすることは出来ない。


何故DriverはBroadcastしたデータをローカルディレクトリとBlockManager両方に登録するのか?それは、ローカルディレクトリ上のコピーはHttpServerのためで、BlockManager上のコピーはDriver中でデータを使用するためのものとなっている。


どのタイミングで実際のデータはBroadcastされるのか? Executorが受信したTaskをデシリアライズしたタイミングで、Broadcast変数のメタデータもBroadcastオブジェクトの形式で取得される。その後、メタデータオブジェクト(bdata変数)のreadObject()メソッドが実行される。readObject()メソッドははじめにローカルのBlockManagerに既にコピーが存在しないかを確認する。もし存在しない場合、データをDriverから取得する。一度データを取得したら、取得結果はローカルのBlockManagerに保持され、以後そこから使用される。

Sparkはデータ取得において2つの異なる実装方式を持っている。


HttpBroadcast

本方式はExecutor/Driver間のHTTP接続を介してデータを取得する方式。

DriverはHttpBroadcastオブジェクトを生成し、そのBroadcast用データをDriverのBlockManagerに保持する。同タイミングで前述のとおり、同一データがローカルディスク上、例として /var/folders/87/grpn1_fn4xq5wdqmxk31v0l00000gp/T/spark-6233b09c-3c72-4a4d-832b-6c0791d0eb9c/broadcast_0という名称のディレクトリ配下に出力される。


DriverとExecutorはbroadcastMangerを初期化プロセスの中で初期化する。ローカルディレクトリはHttpBroadcast.initialize()メソッド実行によって生成され、該当メソッド実行タイミングでHttpServerの起動も行われる。


実際のデータ取得は2ノード間のHTTP接続を介して行われる。

HttpBroadcast方式の問題点は、全Workerノードが同一タイミングにDriverからデータを取得する関係上、Driverノードのネットワーク帯域がボトルネックとなること。


TorrentBroadcast

Driverのネットワーク帯域がボトルネックになるというHttpBroadcastにおける問題を解決するために、SparkはTorrentBroadcastと呼ばれるBitTorrentに触発されて開発された新たなBroadcast実装を考案した。本方式の基本コンセプトは各ブロックのBroadcastを削減することであり、既にデータを取得したExecutor自体もデータの取得元とすることにある。

HttpBroadcastにおけるデータ転送と異なり、TorrentBroadcastにおいてはblockManager.getRemote() => NIO ConnectionManagerを取得のために使用する。実際の送信受信プロセスは前章で記述したキャッシュRDDに似通っている。(前章のCacheAndCheckpoint)参照。

詳細をTorrentBroadcastの詳細を確認する。


Driver

DriverはBroadcastするデータをByteArrayにシリアライズし、BLOCK_SIZE(設定spark.broadcast.blockSizeで定義され、デフォルト4MB)サイズのブロックに分割する。分割後、分割データを収集することで元のByteArrayとして利用可能になるが、一次的にメモリ上に二重にコピーが存在するタイミングも生じる。

分割後、分割されたBlockの情報(メタデータ)はDriverのBlockManagerにStorageLevelがmemory + diskとして保存する。そのタイミングで、DriverのblockManagerMasterはメタデータの保存が成功した旨の通知も受け取る。この通知は重要な手順で、何故ならblockManagerMasterは全Executorからアクセスされる関係上、この通知はメタデータがクラスタ内のグローバルなデータになったことを示すからである。

Driver側の動作はこのBlockManagerの管理下にデータを物理的に保存することによって終了する。


Executor

シリアライズされたTaskを受信した際、Executorははじめにデシリアライズを行う。このデシリアライズ処理によってTorrentBroadcast型であるBroadcastメタデータも復元される。その後、メタデータのTorrentBroadcast.readObject()メソッドが実行される。上記の図で示した一般的な手順と同様に、ローカルのBlockManagerは該当のブロックが既に取得していないかの確認を行う。もし取得していない場合、ExecutorはDriverの blockManagerMasterに対して該当ブロックのメタデータを用いて問い合わせを行う。ここから実際のデータを取得するためのBitTorrentプロセスが開始する。

BitTorrentプロセス

データを取得後、ローカルにはarrayOfBlocks = new Array[TorrentBlock](totalBlocks)という形で配置される。TorrentBlockはBlockのラッパーとなっている。実際の取得を行う順序はランダムとなっている。例えば、全部で5個の取得する必要があるブロックが存在した場合、取得順序は3-1-2-4-5、というような順序となる。Executorは対象のデータ取得を1Blockずつ実施し、下記の流れでデータを取得する。

- ローカルblockManager

- ローカルconnectionManager

- Driver、または他ExexuorのblockManager

- 実データ

各Executorがデータの取得に成功し、ローカルのBlockManagerに保持したタイミングでDriverのblockManagerMasterは取得に成功した旨の通知を受ける。 わかる通り、これはクラスタ中の全ノードが該当Block用の新たなデータ取得元が出来たと認識するため、重要な手順となっている。もし他ノードが同一Blockのデータ取得を行う場合、取得可能なデータ取得元からランダムで選んで取得が行われる。Blockが取得されれば取得されるほど、多くのデータ取得元が存在することとなり、全Broadcastは加速する。BitTorrentについての詳細はwikipedia参照。

全てのBlockがローカルに取得されたタイミングで、Array[Byte]が元のBroadcast変数を復元するために割り振られる。最終的にこのArrayがデシリアライズされ、ローカルのBlockManagerに保持される。わかる通り、Bloadcast変数がローカルBlockManagerに存在するなら、取得したDataBlockは削除可能となる。(既にローカルBlockManagerに存在しているため。)

ここでもう一つの疑問が生じる。RDDをBroadcastしてはどうだろうか?実際にこれで悪いことは何も生じないように思える。このRDDは各Executorで算出され、各ノードでコピーが算出結果のコピーが保持されるのだから。

最後の質問がいまいち何を示しているかはわからないですね・・・RDDは各ノードでPartitionが処理される関係上、異なるデータとなるはずですので。共通的に利用するRDDをそのままコピーしたい、ならわかりますが。


まとめ

Broadcast変数を共有する機能は非常に便利な機能である。Hadoop MapReduceにおいてもDistributedCacheが存在し、様々な状況で使用される。例えば、-libjarsパラメータは各ノードに対してDistributedCacheを用いて送信される。しかし、Hadoop MapReduceにおいてはBloadcast変数を用いるにはHDFS上にまずアップロードする必要があり、同一ノード上に存在する異なるTask間でデータを共有する機構も有していない。もしあるノードにおいて、あるデータを必要とするMapperが同一Jobで起動された場合、Broadcast変数はあるノードで4回保存されることとなってしまう。(各Mapperのワーキングディレクトリ配下にコピーされる) 尚、このHDFSを用いる方式の利点としては、HDFSがDataBlockをクラスタ全体に分散する関係上、ボトルネックが発生しないということがある。

Sparkにおいて、Broadcastは全てのノードに対してデータを送信するだけでなく、同一ノード上で動作するTask間でデータの共有を可能にしている。SparkのBlockManagerが同一ノードにおいて、Task間のデータ共有の課題を解決している。ローカルのBlockManagerに保存された共有データはStorageLevelがmemory + diskとして保存されるため全てのローカルTaskが共有データにアクセス可能であることを保証し、複数のコピーが発生することを防止している。Sparkは2個のBloadcast実装を有している。伝統的なHttpBroadcastにはDriverノードのネットワーク帯域がボトルネックとなる問題がある。TorrentBroadcastはこの問題を解消したものの、それなりのBlockがExecutorによって取得されることでクラスタ内に流通しないと加速しないため、立ち上がりが遅い。加えて、元のデータの再構成にあたって余計なデータ領域が必要となる。

現在、Sparkでは更に別のTreeBroadcastと呼ばれるBloadcast方式を検討中である。もし興味があるなら、技術レポートを確認してほしい。Performance and Scalability of Broadcast in Spark.

著者の意見としては、Bloadcast機能はマルチキャストプロトコルを用いて実装されるべきだと考えている。だが、マルチキャストはUDPベースとなるため、アプリケーションレイヤで信頼性を担保するための機構が必要になってしまうのが悩みどころである。


最後に

Bloadcast機能についても一通り読んでみました。

HTTPBloadcastの段階であれば非常にシンプルでわかりやすいのですが、大規模クラスタにおいてはボトルネックが深刻な状態になるため、他の方式も考案されているとのことでした。

データの用途によってはキャッシュ等に保持して取得・・という手段もありだとは思うのですが、そうすると各Transformationで1Record処理する度に実行され、そこがボトルネックになりえます。

そのあたりはアプリケーションの性質で使い分けるしかないのだとは思いますが。

で、これでSparkInternalsの訳も最後です。

もしはじめから読んでくださっている奇特な方がもしいらっしゃれば、ありがとうございました。