Spark
YARN
More than 3 years have passed since last update.

SparkとYARNについて書きます。テーマ的にインフラストラクチャについての話が多くなると思います。


SparkとHadoopの関係性

SparkはHadoopクラスタへの依存はしていない。(ただし、ややこしいのだがHDFSやYARNのクライアントライブラリへの依存はある)なのでHadoopなしでも動かすことができる。しかしそれでもHadoopと一緒に動作させることが多いのは以下の理由による。


クラスタマネージャとしてのYARN

Sparkはアプリケーション(厳密にはSparkアプリケーション)ごとに下記のようなクラスタが構築される。Driver Programと呼ばれる、SparkContextオブジェクトを持ち、アプリケーションコードの主要部分を実行するアプリケーションのマスタコンポーネントと、RDDに対するオペレーションを実行するExecutor群。そして、Driver Programからの要求に応じてExecutorをアロケーションするCluster Manager。

components.png

現在のSparkでは、Cluster ManagerとしてStandalone Cluster Manager(Spark自体が用意する仕組み)、MesosそしてYARNを利用することができる。YARNを利用する場合には以下のような構成になる。(Taskの表記がここではタスク、とカタカナになっているのはご愛嬌)

architecture_yarn.png

./bin/spark-submit \

--class org.apache.spark.examples.SparkPi \
--master yarn-client \
--num-executoers 2 \
/path/to/examples.jar

例えば上記のような形でアプリケーションを呼び出した場合、以下のような流れを辿ってアプリケーション用のSparkクラスタが構築される。(アプリケーションごとにこのクラスタが構築されるので、「クラスタ」という言葉は若干違和感がある)


  1. クライアントマシン上でDriver Programが起動

  2. Driver ProgramがYARNのResourceManager(以下RM)に対してYARNアプリケーションとしてSparkのjarをサブミット

  3. 起動してきたアプリケーションマスタがExecutorとして2つのタスクコンテナを要求

  4. 結果として上記のような図の構成になる。

なおmasteryarn-clientを指定した場合、Driver Programはアプリケーション呼び出しを行ったクライアントマシン上で動作する。yarn-clusterを指定した場合はDriver Program自体もYARNのタスクコンテナ上で動作する。

というのがYARNとSparkのざっくりした関係性だ。


データソースとしてのHDFS

SparkはさまざまなストレージシステムをRDDのデータソースとして取り扱うことができる。HDFSもそのひとつであり、以下の様に簡単にHDFS上からRDDを生成することができる。

val rdd = sc.textFile("hdfs://path/to/your/data")

このtextFile()というメソッドはRDDを返すメソッドであり、呼びだされた時点ではHDFSへのReadは発生しない。RDDに対するアクションが呼び出されて各Executor上での処理が行われる際、最初のステージで各タスクからHDFSへのReadが発生することになる。


SparkはYARNで動かすべきか

HDFS上のデータに対する処理を行うような場合はYARNで動かすことのメリットが大きいと感じる。メリットが大きい、というか、HDFSがある場所にはだいたいYARNも動いていることが多いと思うので、既存の資産を活かしやすいという感じだろうか。また、下記のようにYARNがまさにリソースマネージャの役割を果たしてくれるので、アドホックなデータアプリケーションプラットフォーム的にマルチテナント化がしやすい。更にMapReduceやHiveなどの他のHadoopアプリケーションと共存させやすい、というメリットが見込める。

spark_and_yarn.jpg

一方、特定の目的のためにサービスとしてSpark Streamingのアプリケーションを動作させるような場合など、マルチテナントでアプリケーションをアドホックに動かすことがないようなケースはYARNでなくてもよいのではないだろうか。

以下、YARNがSparkにもたらすメリットを紹介する。


Dynamic Resource Allocation

Spark1.2以降では、アプリケーションのリソースを動的に増減させることができるようになっており、具体的にはExecutor単位でのリソースの増減が行なわれる。現在のところYARNをクラスタマネージャとして利用したときのみにこの機能が利用可能。

まずspark.dynamicAllocationをtrueに設定する。更にExternal Shuffle Serviceと呼ばれるサービスを利用する必要がある。これはshuffleファイルをローカルストレージではなく外部に保存し、既存のExecutorを安全に削除するためのサービスだ。 spark.shuffle.service.enabledをtrueにすることによって、アプリケーションにこのサービスの利用を設定します。サービス自体はorg.apache.spark.yarn.network.YarnShuffleServiceというクラスに実装されており、各ワーカーノードのNodeManager内で動作する。起動するためにはspark-<version>-yarn-shuffle.jarをワーカーノードに配布し、さらにyarn-site.xml内のyarn.nodemanager.aux-services配下にyarn.nodemanager.axu-services.spark_shuffleというプロパティを作り、ここに先ほどのクラス名を指定する。その後NodeManagerを再起動してサービスを有効化することで利用可能になる。

この状態でアプリケーション起動後、spark.dynamicAllocation.schedulerBAcklogTimeoutの時間が過ぎるとspark.dynamicAllocationSchedulerBacklogTimeoutのインターバルで新しいExecutorの要求を開始される。初回は1つのExecutorを追加し、その後2、4、8とExponentialに追加数を増やしていく。そしてspark.dynamicAllocation.executorIdleTimeoutで指定された秒数のアイドル時間が続くとそのExecutorは破棄される。


YARNで強化するSparkのセキュリティ対策

Spark自体が提供するセキュリティ対策のほかに、YARNのCapacity Schedulerを使うことによって、RMに対してアプリケーション登録時に認証をかけることができるようになる。ちょっと時間切れになってきたので、詳細はまたあとで追記。

Spark自体のセキュリティについてはこちらも参照するとよさそう。

Secure Spark


まとめ

ということでSparkとYARN、というかHadoopの関係性や、それらがもたらしてくれるメリットの紹介でした!


Disclaimer

本記事はわたしの雇用者の意見を代表するものではありません。