12
10

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

spark + cassandraの始め方

Last updated at Posted at 2015-11-14

spark

sparkをdownloadして、buildする。

cd spark-1.5.0
make_distribution.sh

spark-cassandra-connector

spark-cassandra-connectorをgit cloneして build※ここで、ビルド失敗してだめ

git clone https://github.com/datastax/spark-cassandra-connector
cd spark-cassandra-connector
git checkout b1.5
sbt package
sbt assembly

これで、spark-cassandra-connector/target/scala-2.10以下にassembly jarが生成される。

spark-cassandra-connectorのビルドに失敗する場合は、maven cetralからjarを個別にdownloadして使う。

downloadしたのは、

  • guava-0.18
  • spark-cassandra-connector_2.10-1.5.0-M2.jar
  • cassandra-driver-core-2.2.0-rc2.jar

guava-0.18はなぜかないと怒られるので、追加した。

spark-shellを起動

spark-cassandra-connectorのビルドに成功した場合

bin/spark-shell --conf spark.cassandra.connection.host=127.0.0.1
--jars spark-cassandra-connector-assembly-1.5.0-M2-SNAPSHOT.jar 

spark-cassandra-connectorのビルドに失敗した場合

bin/spark-shell --jars spark-cassandra-connector_2.10-1.5.0-M2.jar,cassandra-driver-core-2.2.0-rc2.jar,guava-18.0.jar --conf spark.cassandra.connection.host=127.0.0.1

on mac and linux

  • macでlocalのcassandraでの動作確認は 127.0.0.1, cassandra.yamlのseedsには 127.0.0.1を記述
  • linuxで分散モードでcassandraでの動作確認は host=ipaddress of host, このipはcassandra.yamlのseedsに書いてあるip

cassandra keyspace, tableの作成動作確認

spark-shellで以下を実行

import com.datastax.spark.connector._ //Imports basic rdd functions
import com.datastax.spark.connector.cql._ //(Optional) Imports java driver helper functions

val c = CassandraConnector(sc.getConf)
c.withSessionDo ( session => session.execute("CREATE KEYSPACE test_from_spark WITH replication={'class':'SimpleStrategy', 'replication_factor':1}"))
c.withSessionDo ( session => session.execute("CREATE TABLE test_from_spark.fun (k int PRIMARY KEY, v int)"))

cqlshでkeyspace, tableができているかの確認

cqlsh:mykeyspace> describe keyspaces;

system_auth  mykeyspace          test           test_from_spark
system       system_distributed  system_traces

data insertの動作確認

scala> c.withSessionDo ( session => session.execute("insert into test_from_spark.fun (k,v) values(1, 10)"))
res6: com.datastax.driver.core.ResultSet = ResultSet[ exhausted: true, Columns[]]

cqlshでtableに入っているかの確認

cqlsh:test_from_spark> select * from fun;

 k | v
---+----
 1 | 10
 2 | 20

(2 rows)

cassandraTableでtableをRDDとして取得動作確認

scala> val d = sc.cassandraTable("test_from_spark", "fun");
d: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:15

scala> d.count
res4: Long = 2                                                                  
                                                                          scala> d
res7: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:15

scala> d.collect
res9: Array[com.datastax.spark.connector.CassandraRow] = Array(CassandraRow{k: 1, v: 10}, CassandraRow{k: 2, v: 20})

scala> d.collect.foreach(println)
CassandraRow{k: 1, v: 10}                                                       
CassandraRow{k: 2, v: 20}

実施中のエラー情報

  • com.google.util.concurrent.**がclasspathにありませんエラー => guave.0.18.jarを自前でjarsに追加すると治った。

  • cassandra connectの動作確認中に以下のエラー ※cassandraの設定をいじったら治った?

scala
scala> c.withSessionDo ( session => session.execute("CREATE KEYSPACE test WITH replication={'class':'SimpleStrategy', 'replication_factor':1}"))
15/11/14 11:21:10 WARN NettyUtil: Found Netty's native epoll transport, but not running on linux-based operating system. Using NIO instead.
java.io.IOException: Failed to open native connection to Cassandra at {192.168.11.2}:9042
	at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:164)
	at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:150)
	at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:150)
	at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31)
	at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56)
	at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:81)
	at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109)
	at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)
	at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:35)
	at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:37)
	at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39)
	at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41)
	at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:43)
	at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:45)
	at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:47)
	at $iwC$$iwC$$iwC$$iwC.<init>(<console>:49)
	at $iwC$$iwC$$iwC.<init>(<console>:51)
	at $iwC$$iwC.<init>(<console>:53)
	at $iwC.<init>(<console>:55)
	at <init>(<console>:57)
	at .<init>(<console>:61)
	at .<clinit>(<console>)
	at .<init>(<console>:7)
	at .<clinit>(<console>)
	at $print(<console>)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
	at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
	at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
	at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
	at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
	at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
	at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
	at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
	at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
	at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
	at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
	at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
	at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
	at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
	at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
	at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
	at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
	at org.apache.spark.repl.Main$.main(Main.scala:31)
	at org.apache.spark.repl.Main.main(Main.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:680)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /192.168.11.2:9042 (com.datastax.driver.core.TransportException: [/192.168.11.2:9042] Cannot connect))
	at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:229)
	at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:84)
	at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1264)
	at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:338)
	at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:157)
	... 56 more


scala> 

12
10
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
10

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?