spark
sparkをdownloadして、buildする。
cd spark-1.5.0
make_distribution.sh
spark-cassandra-connector
spark-cassandra-connectorをgit cloneして build※ここで、ビルド失敗してだめ
git clone https://github.com/datastax/spark-cassandra-connector
cd spark-cassandra-connector
git checkout b1.5
sbt package
sbt assembly
これで、spark-cassandra-connector/target/scala-2.10以下にassembly jarが生成される。
spark-cassandra-connectorのビルドに失敗する場合は、maven cetralからjarを個別にdownloadして使う。
downloadしたのは、
- guava-0.18
- spark-cassandra-connector_2.10-1.5.0-M2.jar
- cassandra-driver-core-2.2.0-rc2.jar
guava-0.18はなぜかないと怒られるので、追加した。
spark-shellを起動
spark-cassandra-connectorのビルドに成功した場合
bin/spark-shell --conf spark.cassandra.connection.host=127.0.0.1
--jars spark-cassandra-connector-assembly-1.5.0-M2-SNAPSHOT.jar
spark-cassandra-connectorのビルドに失敗した場合
bin/spark-shell --jars spark-cassandra-connector_2.10-1.5.0-M2.jar,cassandra-driver-core-2.2.0-rc2.jar,guava-18.0.jar --conf spark.cassandra.connection.host=127.0.0.1
on mac and linux
- macでlocalのcassandraでの動作確認は 127.0.0.1, cassandra.yamlのseedsには 127.0.0.1を記述
- linuxで分散モードでcassandraでの動作確認は host=ipaddress of host, このipはcassandra.yamlのseedsに書いてあるip
cassandra keyspace, tableの作成動作確認
spark-shellで以下を実行
import com.datastax.spark.connector._ //Imports basic rdd functions
import com.datastax.spark.connector.cql._ //(Optional) Imports java driver helper functions
val c = CassandraConnector(sc.getConf)
c.withSessionDo ( session => session.execute("CREATE KEYSPACE test_from_spark WITH replication={'class':'SimpleStrategy', 'replication_factor':1}"))
c.withSessionDo ( session => session.execute("CREATE TABLE test_from_spark.fun (k int PRIMARY KEY, v int)"))
cqlshでkeyspace, tableができているかの確認
cqlsh:mykeyspace> describe keyspaces;
system_auth mykeyspace test test_from_spark
system system_distributed system_traces
data insertの動作確認
scala> c.withSessionDo ( session => session.execute("insert into test_from_spark.fun (k,v) values(1, 10)"))
res6: com.datastax.driver.core.ResultSet = ResultSet[ exhausted: true, Columns[]]
cqlshでtableに入っているかの確認
cqlsh:test_from_spark> select * from fun;
k | v
---+----
1 | 10
2 | 20
(2 rows)
cassandraTableでtableをRDDとして取得動作確認
scala> val d = sc.cassandraTable("test_from_spark", "fun");
d: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:15
scala> d.count
res4: Long = 2
scala> d
res7: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:15
scala> d.collect
res9: Array[com.datastax.spark.connector.CassandraRow] = Array(CassandraRow{k: 1, v: 10}, CassandraRow{k: 2, v: 20})
scala> d.collect.foreach(println)
CassandraRow{k: 1, v: 10}
CassandraRow{k: 2, v: 20}
実施中のエラー情報
-
com.google.util.concurrent.**がclasspathにありませんエラー => guave.0.18.jarを自前でjarsに追加すると治った。
-
cassandra connectの動作確認中に以下のエラー ※cassandraの設定をいじったら治った?
scala
scala> c.withSessionDo ( session => session.execute("CREATE KEYSPACE test WITH replication={'class':'SimpleStrategy', 'replication_factor':1}"))
15/11/14 11:21:10 WARN NettyUtil: Found Netty's native epoll transport, but not running on linux-based operating system. Using NIO instead.
java.io.IOException: Failed to open native connection to Cassandra at {192.168.11.2}:9042
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:164)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:150)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:150)
at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31)
at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56)
at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:81)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:35)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:37)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:43)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:45)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:47)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:49)
at $iwC$$iwC$$iwC.<init>(<console>:51)
at $iwC$$iwC.<init>(<console>:53)
at $iwC.<init>(<console>:55)
at <init>(<console>:57)
at .<init>(<console>:61)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:680)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /192.168.11.2:9042 (com.datastax.driver.core.TransportException: [/192.168.11.2:9042] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:229)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:84)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1264)
at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:338)
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:157)
... 56 more
scala>