Spark
EMR

Spark on EMRの基礎をおさらいする

この記事はOpt Technologies Advent Calendar 2017の11日目です。

概要

EMR、Hadoop、Sparkそれぞれの関係を、試しに動かしてみたりドキュメントに当たることで理解する。
それによって、何が起きているのかをログやメトリクスから把握できるようになることを目指します。

(社内勉強会にて話した内容に修正を加えたものになります。)

まず動かしてみる

何はともあれ動かせる状態にします。これをベースに色々実装を変えてSparkに慣れましょう。

最小構成

以前書いた記事はこちら

Scala版のSpark Jarの最小限の構成

今回試してみるコードはこちら

https://github.com/uryyyyyyy/sparkSample/blob/master/build.sbt

Localで動かす

  1. Sparkのバイナリをダウンロードしてくる
  2. アプリのassembly-jarを作る
    • sbt batch_helloWorld/assembly
  3. spark-submitで実行する
    • spark-2.2.0-bin-hadoop2.7/bin/spark-submit --master local[2] --class com.github.uryyyyyyy.spark.batch.helloworld.Hello batch_helloWorld/target/scala-2.11/batch_helloWorld-assembly-1.0.jar

EMRで動かす

EMRクラスタを用意する

特筆することはないのですが、Spark(やメトリクス取得のためのGanglia)を各種バージョンを確認しつつ入れておき、

Screenshot 2017-12-24 22:17:34.png

Web UIやSSHログインするためのSecurity Groupが空いているとなお良いです。

Screenshot 2017-12-24 22:18:17.png

EMR上で動かしてみる

以前書いた記事はこちら。

Spark2.0をEMRで試す

先ほど、ローカルでの動作確認に使用したJarをS3にアップして、Main Classを指定してあげると良いでしょう。

ローカルでのUnitTest

特に変わったことはなく、普通にSpark起動するだけです。
https://github.com/uryyyyyyy/sparkSample/blob/master/batch_helloWorld/src/test/scala/HelloSpec.scala

RDDのままだと分散していて比較出来ないので、最終的にはファイルに吐き出すか collect処理をしてからassertする事になると思います。

...

ここまでで、とりあえず動かしてみることは出来ました。

EMRでの全体像を理解する

ここから、EMRとSparkの関係について述べていこうと思います。座学です。

全体の関係性を一言でまとめると、「EMR管理下のHadoopクラスタ(YARN)上で、Sparkアプリケーションを実行している」という形になります。

EMR

HadoopクラスタのSaaSです。どんなHadoop系アプリ(Spark、Ganglia)、クラスタのサイズはどうするか、Jobの投入や成功失敗はどうか、などの機能を提供してくれます。

Hadoop

Hadoopは分散処理フレームワークです。
分散処理をするためには、「並列で実行するためのコンピューティングリソース」だけでは不十分で、入力・出力が効率良く出来るファイルシステムが必要になります。

  • コンピューティングリソースを管理するミドルウェア→YARN
  • 分散ファイルシステム→HDFS
  • (YARNの前身もありますが今や使われてないので割愛)

という関係性になります。
なお、クラウド環境であればS3、GCSという分散ファイルシステムが既にあるので、HDFSは使わないでそちらを使うのが一般的です。

(EMRはHA構成は出来ないので、Masterが死んだらクラスタごと死にます。この点でも永続化したいデータはS3に置き、クラスタはなるべくステートレスにしましょう。)

Spark

「分散コンピューティング環境」上で動作する分散処理フレームワークです。
分散コンピューティング環境には、mesosやYARN、StandAlone(Spark自体のクラスタ構成)が含まれますが、EMRにおいては「YARN上で実行される」ということになります。

もう少し詳しく

YARN

仕組みはこんな感じです。

from: https://jp.hortonworks.com/blog/apache-hadoop-yarn-concepts-and-applications/

まず、Resource Manager(Master)とNode Manager(Node)というプロセスがそれぞれ立っています。
MasterがJobのリクエストを受け取ったら、クラスタ上のどこかにApplication Masterを置いて、そのJobを実行します。
Application Masterは必要に応じてResource Managerへリソースを要求し、それがContainerとしてあてがわれるので、処理の一部をそこにやらせたりします。

クライアントからリクエストされてApplication Masterに割り当てられるタスクにはApplicationIDが振られ、それぞれのcontainerもContainerIDが振られます。

例)

  • ApplicationID: application_10000032000_0001
  • ContainerID: container_10000032000_0001_02_000001
    • IDの中にApplicationIDを含んでいることが分かる。
    • ContainerIDは、ApplicationIDによらずクラスタ内でユニークになります。

ログは、${Container ID}/${stdout/stderr} のような形でログとして保存されていきます。

詳しくはこちら → EMRのログをS3/Localから追う

YARNのWeb UIを見ると、全体像としては以下のようになっていて、

Screenshot 2017-09-14 01:05:12.png

ApplicationIDを選ぶと以下のように

Screenshot 2017-09-14 01:20:14.png

さらにログを見ると以下のようなUIになります。URLを見ると、どのContainerIDのログを見ているのかがわかりますね。

スクリーンショット 2017-12-25 0.12.09.png

Spark

Sparkのみの全体像

image.png

from: https://blog.cloudera.com/blog/2014/05/apache-spark-resource-management-and-yarn-app-models/

Driver Program(Master)、Executor Program(Node)という構成になっています。
すごくザックリ言うと、map的な記述の中はExecutor Program、その他全て(データを用意したり、処理の結果を待ったり)はDriver Program、という感じです。

サンプルプログラムを例に取って追ってみましょう。
(コメントにDriver, Executorのあるのが、それぞれ該当行のコードがどちらで実行されているかを示しています。)

object Hello {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("Simple Application") // Driver
    val sc = new SparkContext(conf) // Driver
    val rdd = sc.range(0, 100, 1, 10) // DriverからExecutorに配布
    val value = rdd.map(_ * 2) // Executor
      .reduce(_ + _) // Executorから集約されていってDriverに返ってくる
    value mustBe 110 // Driver
  }
}
object Hello {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("Simple Application") // Driver
    val sc = new SparkContext(conf) // Driver
    val rdd = sc.textFile("s3://opt-uryyyyyyy/spark/data/hello", 10) // DriverがExecutorに「ファイル取ってこい」と指示
    println("----Start----") // Driver
    rdd.map(_ * 2) // Executor
      .saveAsTextFile("s3://opt-uryyyyyyy/spark/data/hello") // Executor
  }
}

なお生ログについては、EMRでは実体がYARNなので、普通のYARNと同じようにContainer(Executor/Driver Node)毎に保存されます。(例えば、Driver側で吐かれたログはDriverが動いているコンテナ上に吐かれます。)

以下の図では、driverと書かれているところの右に stdout , stderr とありますが、ここをクリックすると、先ほど見たようなYARNのログに画面に飛ぶことが出来ます。

スクリーンショット 2017-12-25 0.15.06.png

Spark on YARN

Sparkのclusterモードにおいては、Application MasterでDriver Programが動き、他Container上でExecutor Programが動きます。

from: https://blog.cloudera.com/blog/2014/05/apache-spark-resource-management-and-yarn-app-models/

同じ構成ですが、AWSの方で提供されている図も合わせて見てみましょう。

image.png

from: https://aws.amazon.com/blogs/big-data/submitting-user-applications-with-spark-submit/

まず、YARNのResource Managerにjobがsubmitされます。
Application Masterがjobを実行するので、そこがDriver Programになります。
Driver Programは、RDDを作るときにリソースをResource Managerに要求して、Containerが割り当てられたらそれをExecutor Programとして分散処理をします。
結果は分散ファイルシステム(S3とか)に置いても良いし、Driverに集約して残りの処理をしても良いです。

以下はSparkのHistoryServerのWeb UIですが、App IDがあるのが見えるでしょうか?これはEMRではYARNのApplicationIDです。

Screenshot 2017-09-14 01:25:04.png

Tips

Web UIへのアクセス

Web UIはpublicアクセスを許可しないのが一般的です。(ログなども見れてしまうため)
ただ、ブラウザから見たほうが都合が良いことが多いです。
そこでプロキシを経由します。やり方は以下の通り。

http://docs.aws.amazon.com/ja_jp/emr/latest/ManagementGuide/emr-connect-master-node-proxy.html

ファイルの入出力

Hadoop上で動くアプリは、基本的にはクラスタ内部のHDFSに出力されることを想定しています。
それ以外に吐き出す場合は、それ用のアダプタと、その設定(もちろん認証情報も)が必要です。

S3は、EMRではデフォルトでアダプタとその設定が入っています。権限もEC2のものを使うことは出来ます。

アダプタとしては公開されてなかったはずですがこちら。 /usr/lib/hadoop-mapreduce/hadoop-aws-2.7.3-amzn-5.jar

設定としては、EMRクラスタの各ノードにある /etc/hadoop/conf/core-site.xml にある

<property>
  <name>fs.s3.impl</name>
  <value>com.amazon.ws.emr.hadoop.fs.EmrFileSystem</value>
</property>

このあたりの記述です。

GCSに繋ぐためには、アダプタをjarに同梱 or 配布する必要があるのと、設定を記述する必要があります。また、credentialのjsonを各ノードに配置も必要です。

僕が書いたものではないですが、こちらの資料が詳しいです。
Spark on EMRからGoogle Cloud Storageに接続する

ローカル/テスト時とクラスタ上での違い

  • ローカルでは1JVMプロセス、クラスタでは複数JVMプロセス
    • データのシリアライズ周りで挙動が違ったりする
    • ネットワークなど色んな要因でエラーになるので、リトライ可能の徹底を
  • ローカルとクラスタで、各種jarが異なったりする
    • S3接続のモジュールなどはEMRでは微妙に挙動が違った
  • データサイズ
    • データサイズが増えたりデータの偏りなどで、本番のみトラブるとかよくある

クラスタのメトリクス可視化

Gangliaは、Hadoopクラスタの物理的なメトリクスを収集します。
アプリケーション内部のメトリクスはSparkが蓄積しますが、Spark側で検知出来ないトラブルが起きていると、だいたいこちらで追えます。また、ボトルネックの特定にも役立ちます。

よくある使い方

  • なんか処理が遅い
    • CPUが振り切っている場合
    • SparkのロジックでCPUバウンドである
    • GCの処理が支配的になっている
    • ネットワークのスループットがやたら高い
    • shuffleするデータが大きい/回数が多い可能性。
    • データサイズを小さくしてからreduceなど出来ないか検討
    • 一部のノードだけ負荷が高く、他が遊んでいる
    • データの分散が上手くいってない
    • Driver Nodeだけ負荷が高い場合、分散処理以外で詰まってる
  • 急にYARNにkillされててアプリのログがほぼない
    • だいたいメモリ食い過ぎでYARNに殺されてる。メモリ使用量を見る

Web UIはこのような形です。物理Nodeごとに各種メトリクスを追うことが出来ます。

スクリーンショット 2017-12-25 0.22.05.png