目的
EMR上でSparkを使いたい場合、デフォルトだとSparkはScala2.10でビルドされているし、JavaもVer.7と大変イケてない。
そこで、現在自分が開発している環境であるScala2.11&Java8に対応したSparkBatchを実行できるようにEMRをいじってみます。
構築手順
Sparkのビルド
Sparkは公式にはScala2.10でビルドされたバイナリしか配布してくれていません。2.11版を使いたい場合は自分でビルドする必要があります。(1.4から2.11も正式対応していたはずなので、いい加減2.11版も出してくれないものか。。)
このへんを参考にしてパッケージ一式を用意します。
wget http://d3kbcqa49mib13.cloudfront.net/spark-1.6.1.tgz
tar -xzvf spark-1.6.1.tgz
cd spark-1.6.1
./dev/change-scala-version.sh 2.11
./make-distribution.sh --name custom-sparkh --tgz -Pyarn -Phadoop-2.6 -Dscala-2.11 -DskipTests
- 僕の環境だとJAVA_HOMEをちゃんと設定してって怒られた。。
- メモリとCPUをかなり使います。(MacProで30分強くらいずっとコンパイルしてた)
- sparkソースの配置場所によっては、pathが長すぎるエラーが出るかも。
- Spark1.6.1だと、Hadoop2.6までしか指定できなかったが、2.7系でも今のところ問題なく動いています。
これでScala2.11、Hadoop2.6バージョンのSparkが用意出来たはずです。パッケージ化されたものの中から、lib/spark-assembly.jar
的なものを取り出してS3にUploadしましょう。
また、固められたパッケージ自体もuploadしておきましょう。後述するlocalでの動作確認に使えます。
EMR準備
デフォルトではJava7が入ってしまっていて、Java8でビルドしたアプリを動かそうとするJobが実行されなかったり失敗したりする。
こちらの方がEMRをJava8化するBootstrapActionを書いているので拝借する。
emr_bootstrap_java_8.shをS3にuploadしておく
EMR構築手順としては以下のとおりです。
- アプリケーションには、Spark1.6とHadoop2.7.2を入れる(emr-4.4.0)
- ここでインストールされるSparkはScala2.10バージョンのものだが、S3などとの連携に都合がいいので置いておく。
- BootstrapでJava8対応のshellを実行する。(s3:///emr_bootstrap_java_8.sh)
- おそらく踏み台を経由すると思うので、踏み台サーバと同じsubnetにする。
- Web画面を見たいのでpublicなsubnetにおく。(sshトンネル経由)
- sshするので、pemを用意しておく。
- logFolderはご自由に(EMRクラスタの各種ログが吐かれる)
- IAMは、とりあえずS3やDynamoにアクセスできれば。(デフォルトでOK)
- SGは、sshができればいいかな(デフォルトでOK)
EMR起動後、MasterNodeにsshしてみてjava -version
でJava8が使えることを確認。
実行サンプルの準備
sparkにはデフォルトで円周率計算をするexampleがありますが、EMRを使う場合は普通は入出力にS3を使うので、それが使えることを確かめるためのサンプルが必要です。
(S3に繋がるまでは実は色々罠があります!Hadoop/EMRのややこしさを舐めちゃいけません。)
とりあえず手っ取り早いものをここに準備しました。
(ただfileを受け取ってそのまま吐き出すだけのサンプルです。入出力の確認に持ってこいですね!)
hadoopSample/spark/scala_2.11の階層でactivator batch_hdfs/assembly
を実行するとspark上で実行可能なjarが生成されます。
jarが出来たら、同じくS3にuploadしてください。
Sparkの実行前の準備
まず、なぜかマスターノードでJAVA_HOMEがおかしなことになってるので直します。
export JAVA_HOME=/usr/java/default
ローカルモードで試す場合はsparkパッケージを落としてくる必要があるので、S3から2.11用にビルドしたパッケージを落としてくる必要があります。
aws s3 cp s3://<bucket>/spark-1.6.1-bin-scala-2.11.tar.gz ~/
tar -zxvf spark-1.6.1-bin-scala-2.11.tar.gz
Sparkの実行
localモードでの実行
(※この時、S3へのアクセスはできませんでした。。ローカルのファイルを参照してください。)
./spark-1.6.1-bin-scala-2.11/bin/spark-submit --class com.github.uryyyyyyy.hadoop.spark.batch.fileIO.Hello --master local --conf spark.hadoop.mapred.output.committer.class=org.apache.hadoop.mapred.DirectOutputCommitter ./spark2.11_batch_hdfs-assembly-1.0.jar file:///home/hadoop/dummy_in.txt file:///home/hadoop/dummy_out.txt
yarn-clusterモードでの実行
spark-submit --class com.github.uryyyyyyy.hadoop.spark.batch.fileIO.Hello --master yarn-cluster --conf spark.hadoop.mapred.output.committer.class=org.apache.hadoop.mapred.DirectOutputCommitter --conf spark.yarn.jar="s3://<bucket>/spark-assembly-1.6.1-hadoop2.7.2.jar" s3://<bucket>/spark2.11_batch_hdfs-assembly-1.0.jar s3://<bucket>/in.txt s3://<bucket>/out.txt
yarn-clientモードでの実行
後述しますが、submitを実行しているDriverNodeのspark-assemblyとExecutorNodeのspark-assemblyが異なってしまい、上手く行きません。
どうやって実現しているのか?
clusterモード実行時の仕組み
spark-submit(EMR組み込みのscala2.10ビルド版)で実行します。これだけだとScala2.10がクラスパスに入ってしまうのですが、そこでspark.yarn.jar="s3://<bucket>/spark-assembly-1.6.1-hadoop2.7.2.jar"
を使うことで、YARN上に置かれるDriverとExecutorにはscala2.11のassemblyが配布されて、Scala2.11で実行することが出来ます。
なぜscala2.11ビルド版のsparkパッケージでsubmitしないかについては、それをした場合はS3へのアクセスが上手くいかず、spark.yarn.jar="s3://<bucket>/spark-assembly-1.6.1-hadoop2.7.2.jar"
で落ちてしまいます。そのため、S3へアクセスするところまではEMR組み込み版を利用する必要があるのです。
ちなみに、これでScala2.11での実行はできたのですが、Driver/ExecutorからS3へ出力しようとすると、よくわからんエラーが出るので、以下を参考にパッチを組み込みます。
これで、実行時にspark.hadoop.mapred.output.committer.class=org.apache.hadoop.mapred.DirectOutputCommitter
を指定することで、うまくS3へ出力することが出来ます。
まとめ
以上でScala2.11、Java8のアプリケーションをEMR上で動かすことが出来ました。
yarn-clientモードで実行できないのはモヤモヤ感が残りますが、ローカルデバッグもできるし、cluster上での実行もできているのでまぁいいのかなと。