本稿では,YARN 上における分散処理基盤のリソース管理の仕組みと,問題となる状況,および Spark の解決方法について,Spark の例をまじえて説明します.
YARN の基礎
MapReduce v1 では,TaskTracker が MapSlot/ReduceSlot という単位でリソースを管理していましたが,YARN では,"コンテナ"という単位でリソースを確保し,その中で処理を行います.
コンテナには,CPU/メモリ/ディスク帯域幅/ネットワーク帯域幅などを割り当てることが可能です.2014/12時点では,CPU/メモリのリソース管理サポートが入っています.ディスクIO/ネットワークIOの制御も来年には入るかもしれません.
Spark on YARN におけるリソース管理の例
Apache Spark は,オンメモリ用上のデータ処理を容易に行うことができる分散処理フレームワークです.Apache Spark の githubを見ると,Mar 28, 2010 – Dec 20, 2014 の間で述べ 378名の貢献者が参加している大規模なプロジェクトです.
Spark も YARN 対応がされており,YARN 上で動作します.Spark on YARN には,2つの動作モードがあります.1つは client モード,1つは cluster モードです.client モードは,Spark Master がクライアント側で動作し,executor (Spark の用語.master-worker 型の workerと同義.) が YARN クラスタ上で動作します.一方,cluster モードでは,YARN クラスタ上で Spark の master,executor の両方を立ち上げます.
問題点
Spark 1.1 の時点では,マルチクエリを投げる環境において性能が不利であることが指摘されていました.この問題について少し詳しく見てみましょう.Spark の特性として,ジョブの実行中に生成された中間データ(Spark 用語でいうところの RDD)は全て executor が保持している必要があります.なぜなら,一旦 executor を終了してしまうと,中間データが消失してしまうからです.そのため,耐故障性を担保するためには,YARN のコンテナは Spark の master が確保し続ける必要があります.生データを処理するジョブの終盤においては,処理が IO ネックになるためリソースを全て使い切ることが難しく,計算リソースが "使われていない" にも関わらず "確保し続ける" 必要があります.これを端的に示した図が Hortonworks のブログ記事であるSpark Native YARN support に掲載されているベンチマーク結果です.
1.2 の新機能 Dynamic Resource Allocation
この問題に対処するため,リソースを動的に確保する仕組みが Cloudera の Sandy Ryza 氏により提案されました(SPARK-3174).この機能は,以下の2点から構成されます:
- リソースが一定よりも利用されていない状態において,リソースを動的に解放する仕組み.
- リソースが一定よりも利用されている状態において,リソースを動的に確保する仕組み.
- 中間データの管理を YARN 側に委譲する仕組み.
1,2 は Spark 側の設定ですので spark-default.conf に設定を書けば反映できます.spark.dynamicAllocation.enabled
は,Dynamic Resource Allocation を有効・無効を設定するフラグです.
spark.dynamicAllocation.maxExecutors
は Executor の最大数を,spark.dynamicAllocation.minExecutors
は Executor の最小数を指定します.
spark.shuffle.service.enabled
は External Shuffle Service の有効・無効を設定するフラグです.
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.shuffle.manager SORT
spark.shuffle.consolidateFiles true
spark.shuffle.spill true
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.maxExecutors 4
spark.dynamicAllocation.minExecutors 2
spark.shuffle.service.enabled true
- に関しては,Spark をコンパイルして出てきた shuffle-service の jar ファイル(spark--yarn-shuffle.jar)を YARN のクラスパス(例えば,$HADOO_HOME/share/hadoop/yarn など)に配置した後で,yarn-site.xml を更新して再起動する必要があります.
# Hadoop 2.5 以降を利用する場合には Spark のバイナリがないのでコンパイルが必須
$ cd spark
$ mvn
# コンパイルして出てきた external shuffle service plugin を YARN のクラスパスにコピー
$ cp ./network/yarn/target/scala-2.10/spark-1.2.1-SNAPSHOT-yarn-shuffle.jar $HADOOP_HOME/share/hadoop/yarn/
# YARN が Spark の external shuffle service plugin を使うように変更
$ vim etc/hadoop/yarn-site.xml
<property>
<name>yarn.nodemanager.aux-services</name>
<value>spark_shuffle,mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.mapreduce_shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
<value>org.apache.spark.network.yarn.YarnShuffleService</value>
</property>
その他設定上の注意として,Dynamic Resource Allocationを利用する場合には Executor の数などを明示的に設定することができません.
動作確認
では,spark-sql を起動して動作させてみましょう.この後は日目の advent calender とほぼ内容は同じです.
# Spark 1.2 から --maseter には yarn-client の代わりに yarn を指定するようになりました.
$ bin/spark-sql --master yarn
spark-sql > create external table if NOT EXISTS text100GB(text string) location 'hdfs:///user/ozawa/text';
(...中略...)
14/12/22 07:36:05 INFO cluster.YarnClientSchedulerBackend: Requesting to kill executor(s) 3
14/12/22 07:36:05 INFO spark.ExecutorAllocationManager: Removing executor 3 because it has been idle for 600 seconds (new desired total will be 3)
14/12/22 07:36:06 INFO cluster.YarnClientSchedulerBackend: Requesting to kill executor(s) 1
14/12/22 07:36:06 INFO spark.ExecutorAllocationManager: Removing executor 1 because it has been idle for 600 seconds (new desired total will be 2)
14/12/22 07:36:09 INFO spark.ExecutorAllocationManager: Not removing idle executor 2 because there are only 2 executor(s) left (limit 2)
実行ログから,Dynamic Resource Allocation が有効になっていることが分かります.
まとめ
本稿では,Spark on YARN を使うにあたって知っておくべき YARN におけるリソース管理の基礎について説明し,その問題点であるリソース利用率の低下について述べました.そして,リソース利用率を改善するための機能として提案された Spark 1.2 からの新機能である Dynamic Resource Allocation の概要と設定方法について述べました.