SparkとYARNについて書きます。テーマ的にインフラストラクチャについての話が多くなると思います。
SparkとHadoopの関係性
SparkはHadoopクラスタへの依存はしていない。(ただし、ややこしいのだがHDFSやYARNのクライアントライブラリへの依存はある)なのでHadoopなしでも動かすことができる。しかしそれでもHadoopと一緒に動作させることが多いのは以下の理由による。
クラスタマネージャとしてのYARN
Sparkはアプリケーション(厳密にはSparkアプリケーション)ごとに下記のようなクラスタが構築される。Driver Programと呼ばれる、SparkContextオブジェクトを持ち、アプリケーションコードの主要部分を実行するアプリケーションのマスタコンポーネントと、RDDに対するオペレーションを実行するExecutor群。そして、Driver Programからの要求に応じてExecutorをアロケーションするCluster Manager。
現在のSparkでは、Cluster ManagerとしてStandalone Cluster Manager(Spark自体が用意する仕組み)、MesosそしてYARNを利用することができる。YARNを利用する場合には以下のような構成になる。(Taskの表記がここではタスク、とカタカナになっているのはご愛嬌)
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn-client \
--num-executoers 2 \
/path/to/examples.jar
例えば上記のような形でアプリケーションを呼び出した場合、以下のような流れを辿ってアプリケーション用のSparkクラスタが構築される。(アプリケーションごとにこのクラスタが構築されるので、「クラスタ」という言葉は若干違和感がある)
- クライアントマシン上でDriver Programが起動
- Driver ProgramがYARNのResourceManager(以下RM)に対してYARNアプリケーションとしてSparkのjarをサブミット
- 起動してきたアプリケーションマスタがExecutorとして2つのタスクコンテナを要求
- 結果として上記のような図の構成になる。
なおmaster
にyarn-client
を指定した場合、Driver Programはアプリケーション呼び出しを行ったクライアントマシン上で動作する。yarn-cluster
を指定した場合はDriver Program自体もYARNのタスクコンテナ上で動作する。
というのがYARNとSparkのざっくりした関係性だ。
データソースとしてのHDFS
SparkはさまざまなストレージシステムをRDDのデータソースとして取り扱うことができる。HDFSもそのひとつであり、以下の様に簡単にHDFS上からRDDを生成することができる。
val rdd = sc.textFile("hdfs://path/to/your/data")
このtextFile()
というメソッドはRDDを返すメソッドであり、呼びだされた時点ではHDFSへのReadは発生しない。RDDに対するアクションが呼び出されて各Executor上での処理が行われる際、最初のステージで各タスクからHDFSへのReadが発生することになる。
SparkはYARNで動かすべきか
HDFS上のデータに対する処理を行うような場合はYARNで動かすことのメリットが大きいと感じる。メリットが大きい、というか、HDFSがある場所にはだいたいYARNも動いていることが多いと思うので、既存の資産を活かしやすいという感じだろうか。また、下記のようにYARNがまさにリソースマネージャの役割を果たしてくれるので、アドホックなデータアプリケーションプラットフォーム的にマルチテナント化がしやすい。更にMapReduceやHiveなどの他のHadoopアプリケーションと共存させやすい、というメリットが見込める。
一方、特定の目的のためにサービスとしてSpark Streamingのアプリケーションを動作させるような場合など、マルチテナントでアプリケーションをアドホックに動かすことがないようなケースはYARNでなくてもよいのではないだろうか。
以下、YARNがSparkにもたらすメリットを紹介する。
Dynamic Resource Allocation
Spark1.2以降では、アプリケーションのリソースを動的に増減させることができるようになっており、具体的にはExecutor単位でのリソースの増減が行なわれる。現在のところYARNをクラスタマネージャとして利用したときのみにこの機能が利用可能。
まずspark.dynamicAllocation
をtrueに設定する。更にExternal Shuffle Serviceと呼ばれるサービスを利用する必要がある。これはshuffleファイルをローカルストレージではなく外部に保存し、既存のExecutorを安全に削除するためのサービスだ。 spark.shuffle.service.enabled
をtrueにすることによって、アプリケーションにこのサービスの利用を設定します。サービス自体はorg.apache.spark.yarn.network.YarnShuffleService
というクラスに実装されており、各ワーカーノードのNodeManager内で動作する。起動するためにはspark-<version>-yarn-shuffle.jar
をワーカーノードに配布し、さらにyarn-site.xml内のyarn.nodemanager.aux-services
配下にyarn.nodemanager.axu-services.spark_shuffle
というプロパティを作り、ここに先ほどのクラス名を指定する。その後NodeManagerを再起動してサービスを有効化することで利用可能になる。
この状態でアプリケーション起動後、spark.dynamicAllocation.schedulerBAcklogTimeout
の時間が過ぎるとspark.dynamicAllocationSchedulerBacklogTimeout
のインターバルで新しいExecutorの要求を開始される。初回は1つのExecutorを追加し、その後2、4、8とExponentialに追加数を増やしていく。そしてspark.dynamicAllocation.executorIdleTimeout
で指定された秒数のアイドル時間が続くとそのExecutorは破棄される。
YARNで強化するSparkのセキュリティ対策
Spark自体が提供するセキュリティ対策のほかに、YARNのCapacity Schedulerを使うことによって、RMに対してアプリケーション登録時に認証をかけることができるようになる。ちょっと時間切れになってきたので、詳細はまたあとで追記。
Spark自体のセキュリティについてはこちらも参照するとよさそう。
Secure Spark
まとめ
ということでSparkとYARN、というかHadoopの関係性や、それらがもたらしてくれるメリットの紹介でした!
Disclaimer
本記事はわたしの雇用者の意見を代表するものではありません。