経緯
最近Apache Sparkを1.4系
から1.5.1
へアップグレードしました。
ところが、ジョブを作ってjarにしてspark-submit
から実行したところ、下記のようなエラーが出てfailするようになってしまいました。
15/10/21 15:22:12 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, spark003.example.com): java.lang.IllegalArgumentException: java.net.UnknownHostException: nameservice1
at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:374)
at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:312)
at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:178)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:665)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:601)
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:148)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2596)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:169)
at org.apache.hadoop.mapred.JobConf.getWorkingDirectory(JobConf.java:656)
at org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:436)
at org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:409)
at org.apache.spark.SparkContext$$anonfun$hadoopFile$1$$anonfun$32.apply(SparkContext.scala:1016)
at org.apache.spark.SparkContext$$anonfun$hadoopFile$1$$anonfun$32.apply(SparkContext.scala:1016)
at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:176)
at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:176)
at scala.Option.map(Option.scala:145)
at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:176)
at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:220)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:216)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:101)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.UnknownHostException: nameservice1
... 41 more
nameservice1
というのはHDFSのHAクラスタにつけた論理サービス名です。
(私の環境ではHDFSのHighAvailability
を有効にしています。)
やったこと
-
同じ内容のジョブを
Spark1.4系
で実行。
→ 特に問題なくジョブが完了。
(ジョブ作成の際、Sparkへの依存は1.5.1
から1.4.1
へ変更しています) -
ジョブと同じ内容を
spark-shell
から実行
→ 問題なく実行可能。 -
HDFS HA
のサービス名が解決できていないようだったのでHDFS HA
を無効に。
→ 特に問題なくジョブが完了。 -
SparkのJIRAへissueを上げる。
https://issues.apache.org/jira/browse/SPARK-11227
→HDFS
の設定ファイルを見直せと。解決済みにされてしまいました
◇ 実行コマンド
/opt/spark/bin/spark-submit \
--class com.example.Job /jobs/job-assembly-1.0.0.jar
◇ ジョブの内容
import org.apache.spark.sql.{SaveMode, SQLContext}
object Job {
val sparkConfig = ...
def main( args: Array[String] ): Unit = {
val sc = new SparkContext( sparkConfig )
implicit val sqlContext = new SQLContext( sc )
import sqlContext.implicits._
val df = sqlContext.read.format( "com.databricks.spark.csv" ).option( "header", "true" ).load( input )
df.write.format( "json" ).mode( SaveMode.Overwrite ).save( output )
}
}
CSVファイルを読んで書き出しているだけです
◇ 環境
実行環境は下記のとおりです。
- OS ・・・ CentOS6.6
- HDFS ・・・ CDH5.4.0(
ClouderaManager
で構築) - Cluster化 ・・・ Zookeeper + Mesos 0.22.0
結論
spark-shell
との差異を探したところ、 HiveContext
を使っているかどうかの違いがありましたので、試しにspark-shell
で implicit val sqlContext = new SQLContext( sc )
を実行してみました。
すると同じエラーが出るようになったので、まあこれかなと。
ジョブ内でデフォルトの SQLContext
ではなく HiveContext
を使うようにしたらエラーは解消しました
◇ 変更後のジョブ
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.hive.HiveContext
object Job {
val sparkConfig = ...
def main( args: Array[String] ): Unit = {
val sc = new SparkContext( sparkConfig )
implicit val sqlContext = new HiveContext( sc )
import sqlContext.implicits._
val df = sqlContext.read.format( "com.databricks.spark.csv" ).option( "header", "true" ).load( input )
df.write.format( "json" ).mode( SaveMode.Overwrite ).save( output )
}
}
SQLContext
にバグがあるとおもうんだけど。。。