Edited at

Zeppelin + Spark + Hive LLAP

More than 1 year has passed since last update.


この文章について

このポストは、先日HadoopとS3というタイトルで書いた下記のようなアーキテクチャを作るにあたってのZeppelin + Spark + LLAPまわりの調査と検証状況を自分のためのメモとしてまとめたものです。

architecture


Motivation

SparkでS3上のデータを分析するにあたってレイテンシをできるだけ下げ、かつS3のスロットリングを避けるためにできるだけアクセス頻度を下げたいという背景からspark-llapを試している。

https://github.com/hortonworks-spark/spark-llap

spark-llapは単純にHiveserver2経由でHive LLAPの実行結果を受け取るわけではなく、以下のようなアーキテクチャでデータを取り出す。

Kobito.NPnFd7.png

*画像はHortonworksのHive開発者、Sergey ShelulkhinのSlideshareから拝借。

この方法のメリットは、Hiveserver2に集約されてThriftにシリアライズされた結果を受け取るのではなく、Sparkの各Executorが直接LLAP Daemonからデータを取り出すことになるので、ボトルネックがないということになる。


Build

READMEにはbuild/sbt packageしろと書いてあるがこれじゃだめで、下記のようにbuild/sbt assemblyしなければならない。(まあそりゃそうか)

https://github.com/hortonworks-spark/spark-llap/issues/9

masterブランチはSpark1.6用なのでこんな感じにビルドできる。

build/sbt -Dspark.version=1.6.2.2.5.3.0-37 -Dhadoop.version=2.7.3.2.5.3.0-37 -Dhive.version=2.1.0.2.5.3.0-37 -Dtez.version=0.8.4.2.5.3.0-37 -Drepourl=http://nexus-private.hortonworks.com:8081/nexus/content/groups/public/ clean compile assembly

Spark2.0用のbranch-2.0も、下記のような感じでビルド。

build/sbt -Dspark.version=2.0.0.2.5.3.0-37 -Dhadoop.version=2.7.3.2.5.3.0-37 -Dhive.version=2.1.0.2.5.3.0-37 -Dtez.version=0.8.4.2.5.3.0-37 -Drepourl=http://nexus-private.hortonworks.com:8081/nexus/content/groups/public/ clean compile assembly

どちらもうまくいった。Hortonworksのrepositoryを使わなきゃいけないかどうかは・・・ためしていない。


動かしてみる

READMEに従って下記をspark-default.confに設定

spark.sql.hive.hiveserver2.url=jdbc:hive2://hiveserver2:10500

spark.hadoop.hive.llap.daemon.service.hosts=@llap0
spark.hadoop.hive.zookeeper.quorum=zookeeper:2181

Spark Thrift Server使うなら他にも設定が必要だがここではスキップ。この上でspark-shell --jars PATH/TO/ASSEMBRY_JARでspark-shellを起動。下記のコマンドで動いてくれた。

import org.apache.spark.sql.hive.llap.LlapContext

var llapContext = new LlapContext(sc)

val sql = "SOME HIVE QL"

llapContext.sql(sql).show

Spark2だと下記だけで動くらしい。クエリは通るけど、実際にLLAPでジョブが走っているかどうかを

spark.sql("SOME HIVE QL").show


ちゃんと動いているか確認

Spark1での実行はいろいろデバッグメッセージが出るので動いているっぽいが、Spark2ののほうはいまいちデバッグ方法がわからない。

Sparkから動かしているからなのか(たぶんそう)、Tez Viewにはログは出てこないし、LLAP Web Service(http://llapnode:10502)にもなんの情報も出てこない


Zeppelinで動かしてみる

ZeppelinでSpark Interpreterに対してspark-llapのassembly jarを足してやったら以下のエラー。


import org.apache.spark.sql.hive.llap.LlapContext
java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)
at org.apache.spark.sql.hive.client.ClientWrapper.<init>(ClientWrapper.scala:204)
at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:238)
at org.apache.spark.sql.hive.HiveContext.executionHive$lzycompute(HiveContext.scala:225)
at org.apache.spark.sql.hive.HiveContext.executionHive(HiveContext.scala:215)
at org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:458)
at org.apache.spark.sql.SQLContext$$anonfun$4.apply(SQLContext.scala:272)
at org.apache.spark.sql.SQLContext$$anonfun$4.apply(SQLContext.scala:271)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.spark.sql.SQLContext.<init>(SQLContext.scala:271)
at org.apache.spark.sql.hive.HiveContext.<init>(HiveContext.scala:90)
at org.apache.spark.sql.hive.llap.LlapContext.<init>(LlapContext.scala:40)
at org.apache.spark.sql.hive.llap.LlapContext.<init>(LlapContext.scala:60)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:36)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:38)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:40)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:42)
at $iwC$$iwC$$iwC.<init>(<console>:44)
at $iwC$$iwC.<init>(<console>:46)
at $iwC.<init>(<console>:48)
at <init>(<console>:50)
at .<init>(<console>:54)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.zeppelin.spark.Utils.invokeMethod(Utils.java:38)
at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:717)
at org.apache.zeppelin.spark.SparkInterpreter.interpretInput(SparkInterpreter.java:928)
at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:871)
at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:864)
at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:94)
at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:341)
at org.apache.zeppelin.scheduler.Job.run(Job.java:176)
at org.apache.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:139)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1523)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMetaStoreClient.java:86)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:132)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:104)
at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:3005)
at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3024)
at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:503)
... 58 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1521)
... 64 more
Caused by: javax.jdo.JDOFatalInternalException: Unexpected exception caught.
NestedThrowables:
java.lang.reflect.InvocationTargetException
at javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1193)
at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808)
at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701)
at org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:365)
at org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:394)
at org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:291)
at org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:258)
at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:76)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:136)
at org.apache.hadoop.hive.metastore.RawStoreProxy.<init>(RawStoreProxy.java:57)
at org.apache.hadoop.hive.metastore.RawStoreProxy.getProxy(RawStoreProxy.java:66)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newRawStore(HiveMetaStore.java:593)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:571)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:624)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:461)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandler.java:66)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:72)
at org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(HiveMetaStore.java:5762)
at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:199)
at org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.<init>(SessionHiveMetaStoreClient.java:74)
... 69 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at javax.jdo.JDOHelper$16.run(JDOHelper.java:1965)
at java.security.AccessController.doPrivileged(Native Method)
at javax.jdo.JDOHelper.invoke(JDOHelper.java:1960)
at javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1166)
... 88 more
Caused by: org.datanucleus.exceptions.NucleusException: Error creating the MetaDataManager for API "JDO" :
at org.datanucleus.NucleusContext.getMetaDataManager(NucleusContext.java:1001)
at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.initialiseMetaData(JDOPersistenceManagerFactory.java:703)
at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.<init>(JDOPersistenceManagerFactory.java:511)
at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.createPersistenceManagerFactory(JDOPersistenceManagerFactory.java:301)
at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.getPersistenceManagerFactory(JDOPersistenceManagerFactory.java:202)
... 96 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.datanucleus.plugin.NonManagedPluginRegistry.createExecutableExtension(NonManagedPluginRegistry.java:631)
at org.datanucleus.plugin.PluginManager.createExecutableExtension(PluginManager.java:325)
at org.datanucleus.NucleusContext.getMetaDataManager(NucleusContext.java:995)
... 100 more
Caused by: org.datanucleus.metadata.InvalidClassMetaDataException: Class "org.apache.hadoop.hive.metastore.model.MConstraint" has MetaData yet the class cant be found. Please check your CLASSPATH specifications.
at org.datanucleus.metadata.AbstractClassMetaData.loadClass(AbstractClassMetaData.java:579)
at org.datanucleus.metadata.ClassMetaData.populate(ClassMetaData.java:166)
at org.datanucleus.metadata.MetaDataManager$1.run(MetaDataManager.java:2918)
at java.security.AccessController.doPrivileged(Native Method)
at org.datanucleus.metadata.MetaDataManager.populateAbstractClassMetaData(MetaDataManager.java:2912)
at org.datanucleus.metadata.MetaDataManager.populateFileMetaData(MetaDataManager.java:2735)
at org.datanucleus.api.jdo.metadata.JDOMetaDataManager.loadXMLMetaDataForClass(JDOMetaDataManager.java:786)
at org.datanucleus.api.jdo.metadata.JDOMetaDataManager.getMetaDataForClassInternal(JDOMetaDataManager.java:383)
at org.datanucleus.api.jdo.metadata.JDOMetaDataManager$MetaDataRegisterClassListener.registerClass(JDOMetaDataManager.java:207)
at javax.jdo.spi.JDOImplHelper.addRegisterClassListener(JDOImplHelper.java:462)
at org.datanucleus.api.jdo.metadata.JDOMetaDataManager.<init>(JDOMetaDataManager.java:194)
... 107 more
Caused by: org.datanucleus.exceptions.ClassNotResolvedException: Class "org.apache.hadoop.hive.metastore.model.MConstraint" was not found in the CLASSPATH. Please check your specification and your CLASSPATH.
at org.datanucleus.ClassLoaderResolverImpl.classForName(ClassLoaderResolverImpl.java:216)
at org.datanucleus.ClassLoaderResolverImpl.classForName(ClassLoaderResolverImpl.java:368)
at org.datanucleus.metadata.AbstractClassMetaData.loadClass(AbstractClassMetaData.java:569)
... 117 more


いまのところの結論

Zeppelinのエラーはちゃんと追えばシュートできるかもだけど、そこまで時間は掛けられそうにないので、一旦次のHDPのリリースを待つのがいいかな。LLAPをGAにして、Zeppelinから(できればSpark2で)呼べるようにしてくれるところまで整っていたら最高。

とはいいつつ、また時間を見つけてためしてみようと思う。

EMRに取り込まれてくれると更にうれしいなとも思うけど、これにはspark-llapが本家Sparkに取り込まれるか、Hortonworksからspark packageとしてリリースされるかしてからでないとできないので、まだまだ時間はかかりそう。