この文章について
このポストは、先日HadoopとS3というタイトルで書いた下記のようなアーキテクチャを作るにあたってのZeppelin + Spark + LLAPまわりの調査と検証状況を自分のためのメモとしてまとめたものです。
Motivation
SparkでS3上のデータを分析するにあたってレイテンシをできるだけ下げ、かつS3のスロットリングを避けるためにできるだけアクセス頻度を下げたいという背景からspark-llapを試している。
spark-llapは単純にHiveserver2経由でHive LLAPの実行結果を受け取るわけではなく、以下のようなアーキテクチャでデータを取り出す。
*画像はHortonworksのHive開発者、Sergey ShelulkhinのSlideshareから拝借。
この方法のメリットは、Hiveserver2に集約されてThriftにシリアライズされた結果を受け取るのではなく、Sparkの各Executorが直接LLAP Daemonからデータを取り出すことになるので、ボトルネックがないということになる。
Build
READMEにはbuild/sbt package
しろと書いてあるがこれじゃだめで、下記のようにbuild/sbt assembly
しなければならない。(まあそりゃそうか)
masterブランチはSpark1.6用なのでこんな感じにビルドできる。
build/sbt -Dspark.version=1.6.2.2.5.3.0-37 -Dhadoop.version=2.7.3.2.5.3.0-37 -Dhive.version=2.1.0.2.5.3.0-37 -Dtez.version=0.8.4.2.5.3.0-37 -Drepourl=http://nexus-private.hortonworks.com:8081/nexus/content/groups/public/ clean compile assembly
Spark2.0用のbranch-2.0も、下記のような感じでビルド。
build/sbt -Dspark.version=2.0.0.2.5.3.0-37 -Dhadoop.version=2.7.3.2.5.3.0-37 -Dhive.version=2.1.0.2.5.3.0-37 -Dtez.version=0.8.4.2.5.3.0-37 -Drepourl=http://nexus-private.hortonworks.com:8081/nexus/content/groups/public/ clean compile assembly
どちらもうまくいった。Hortonworksのrepositoryを使わなきゃいけないかどうかは・・・ためしていない。
動かしてみる
READMEに従って下記をspark-default.confに設定
spark.sql.hive.hiveserver2.url=jdbc:hive2://hiveserver2:10500
spark.hadoop.hive.llap.daemon.service.hosts=@llap0
spark.hadoop.hive.zookeeper.quorum=zookeeper:2181
Spark Thrift Server使うなら他にも設定が必要だがここではスキップ。この上でspark-shell --jars PATH/TO/ASSEMBRY_JAR
でspark-shellを起動。下記のコマンドで動いてくれた。
import org.apache.spark.sql.hive.llap.LlapContext
var llapContext = new LlapContext(sc)
val sql = "SOME HIVE QL"
llapContext.sql(sql).show
Spark2だと下記だけで動くらしい。クエリは通るけど、実際にLLAPでジョブが走っているかどうかを
spark.sql("SOME HIVE QL").show
ちゃんと動いているか確認
Spark1での実行はいろいろデバッグメッセージが出るので動いているっぽいが、Spark2ののほうはいまいちデバッグ方法がわからない。
Sparkから動かしているからなのか(たぶんそう)、Tez Viewにはログは出てこないし、LLAP Web Service(http://llapnode:10502)にもなんの情報も出てこない
Zeppelinで動かしてみる
ZeppelinでSpark Interpreterに対してspark-llapのassembly jarを足してやったら以下のエラー。
import org.apache.spark.sql.hive.llap.LlapContext
java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)
at org.apache.spark.sql.hive.client.ClientWrapper.<init>(ClientWrapper.scala:204)
at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:238)
at org.apache.spark.sql.hive.HiveContext.executionHive$lzycompute(HiveContext.scala:225)
at org.apache.spark.sql.hive.HiveContext.executionHive(HiveContext.scala:215)
at org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:458)
at org.apache.spark.sql.SQLContext$$anonfun$4.apply(SQLContext.scala:272)
at org.apache.spark.sql.SQLContext$$anonfun$4.apply(SQLContext.scala:271)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.spark.sql.SQLContext.<init>(SQLContext.scala:271)
at org.apache.spark.sql.hive.HiveContext.<init>(HiveContext.scala:90)
at org.apache.spark.sql.hive.llap.LlapContext.<init>(LlapContext.scala:40)
at org.apache.spark.sql.hive.llap.LlapContext.<init>(LlapContext.scala:60)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:36)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:38)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:40)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:42)
at $iwC$$iwC$$iwC.<init>(<console>:44)
at $iwC$$iwC.<init>(<console>:46)
at $iwC.<init>(<console>:48)
at <init>(<console>:50)
at .<init>(<console>:54)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.zeppelin.spark.Utils.invokeMethod(Utils.java:38)
at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:717)
at org.apache.zeppelin.spark.SparkInterpreter.interpretInput(SparkInterpreter.java:928)
at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:871)
at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:864)
at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:94)
at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:341)
at org.apache.zeppelin.scheduler.Job.run(Job.java:176)
at org.apache.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:139)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1523)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMetaStoreClient.java:86)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:132)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:104)
at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:3005)
at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3024)
at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:503)
... 58 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1521)
... 64 more
Caused by: javax.jdo.JDOFatalInternalException: Unexpected exception caught.
NestedThrowables:
java.lang.reflect.InvocationTargetException
at javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1193)
at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808)
at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701)
at org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:365)
at org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:394)
at org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:291)
at org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:258)
at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:76)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:136)
at org.apache.hadoop.hive.metastore.RawStoreProxy.<init>(RawStoreProxy.java:57)
at org.apache.hadoop.hive.metastore.RawStoreProxy.getProxy(RawStoreProxy.java:66)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newRawStore(HiveMetaStore.java:593)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:571)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:624)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:461)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandler.java:66)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:72)
at org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(HiveMetaStore.java:5762)
at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:199)
at org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.<init>(SessionHiveMetaStoreClient.java:74)
... 69 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at javax.jdo.JDOHelper$16.run(JDOHelper.java:1965)
at java.security.AccessController.doPrivileged(Native Method)
at javax.jdo.JDOHelper.invoke(JDOHelper.java:1960)
at javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1166)
... 88 more
Caused by: org.datanucleus.exceptions.NucleusException: Error creating the MetaDataManager for API "JDO" :
at org.datanucleus.NucleusContext.getMetaDataManager(NucleusContext.java:1001)
at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.initialiseMetaData(JDOPersistenceManagerFactory.java:703)
at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.<init>(JDOPersistenceManagerFactory.java:511)
at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.createPersistenceManagerFactory(JDOPersistenceManagerFactory.java:301)
at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.getPersistenceManagerFactory(JDOPersistenceManagerFactory.java:202)
... 96 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.datanucleus.plugin.NonManagedPluginRegistry.createExecutableExtension(NonManagedPluginRegistry.java:631)
at org.datanucleus.plugin.PluginManager.createExecutableExtension(PluginManager.java:325)
at org.datanucleus.NucleusContext.getMetaDataManager(NucleusContext.java:995)
... 100 more
Caused by: org.datanucleus.metadata.InvalidClassMetaDataException: Class "org.apache.hadoop.hive.metastore.model.MConstraint" has MetaData yet the class cant be found. Please check your CLASSPATH specifications.
at org.datanucleus.metadata.AbstractClassMetaData.loadClass(AbstractClassMetaData.java:579)
at org.datanucleus.metadata.ClassMetaData.populate(ClassMetaData.java:166)
at org.datanucleus.metadata.MetaDataManager$1.run(MetaDataManager.java:2918)
at java.security.AccessController.doPrivileged(Native Method)
at org.datanucleus.metadata.MetaDataManager.populateAbstractClassMetaData(MetaDataManager.java:2912)
at org.datanucleus.metadata.MetaDataManager.populateFileMetaData(MetaDataManager.java:2735)
at org.datanucleus.api.jdo.metadata.JDOMetaDataManager.loadXMLMetaDataForClass(JDOMetaDataManager.java:786)
at org.datanucleus.api.jdo.metadata.JDOMetaDataManager.getMetaDataForClassInternal(JDOMetaDataManager.java:383)
at org.datanucleus.api.jdo.metadata.JDOMetaDataManager$MetaDataRegisterClassListener.registerClass(JDOMetaDataManager.java:207)
at javax.jdo.spi.JDOImplHelper.addRegisterClassListener(JDOImplHelper.java:462)
at org.datanucleus.api.jdo.metadata.JDOMetaDataManager.<init>(JDOMetaDataManager.java:194)
... 107 more
Caused by: org.datanucleus.exceptions.ClassNotResolvedException: Class "org.apache.hadoop.hive.metastore.model.MConstraint" was not found in the CLASSPATH. Please check your specification and your CLASSPATH.
at org.datanucleus.ClassLoaderResolverImpl.classForName(ClassLoaderResolverImpl.java:216)
at org.datanucleus.ClassLoaderResolverImpl.classForName(ClassLoaderResolverImpl.java:368)
at org.datanucleus.metadata.AbstractClassMetaData.loadClass(AbstractClassMetaData.java:569)
... 117 more
いまのところの結論
Zeppelinのエラーはちゃんと追えばシュートできるかもだけど、そこまで時間は掛けられそうにないので、一旦次のHDPのリリースを待つのがいいかな。LLAPをGAにして、Zeppelinから(できればSpark2で)呼べるようにしてくれるところまで整っていたら最高。
とはいいつつ、また時間を見つけてためしてみようと思う。
EMRに取り込まれてくれると更にうれしいなとも思うけど、これにはspark-llapが本家Sparkに取り込まれるか、Hortonworksからspark packageとしてリリースされるかしてからでないとできないので、まだまだ時間はかかりそう。