Sparkを使ったCassandraデータロードの簡単なメモです。
Cassandraクラスタの構築手順は以下参照してください。
https://qiita.com/48hands/items/05c2ad0ea89fe13afd57
まずは、keyspaceとtableを作成。
$ ssh nosql1
[vagrant@nosql1 ~]$ cqlsh nosql1
Connected to develop-cluster at nosql1:9042.
[cqlsh 5.0.1 | Cassandra 3.11.2 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.
cqlsh> CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2 };
cqlsh> CREATE TABLE test.kv(key text PRIMARY KEY, value int);
つづいて、SparkのコードをScalaで書いていく。中身は適当。
build.sbt
を設定して
name := "CassandraLoad"
version := "0.1"
scalaVersion := "2.11.12"
resolvers += "Spark Packages Repo" at "https://dl.bintray.com/spark-packages/maven"
libraryDependencies ++= Seq(
"datastax" % "spark-cassandra-connector" % "2.0.1-s_2.11",
"org.apache.spark" %% "spark-sql" % "2.2.0"
)
LoadSample.scala
を作って
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.cassandra._
object LoadSample {
def main(args: Array[String]): Unit = {
import spark.implicits._
val dfs = for (i <- 1 to 100) yield {
val rdd: RDD[Int] = spark.sparkContext.parallelize(1 to 100000)
rdd.map(line => (s"key-$i-$line", line)).toDF("key", "value")
}
dfs.foreach(df =>
df.write.cassandraFormat("kv", "test").mode(SaveMode.Overwrite).save())
}
private val spark = SparkSession
.builder
.master("local[*]") // ローカルでエグゼキュータにクライアントPCのコア数と同数のスレッドを割り当て
.appName("Load Sample")
.config("spark.cassandra.connection.host",
"192.168.33.41,192.168.33.42,192.168.33.43")
.config("spark.cassandra.output.consistency.level", "ONE") // デフォルトだとLOCAL_QUORUMなので変更した。
.getOrCreate()
}
実行すると、データが100,000 * 100 = 10,000,000
件入っているはず。
$ ssh nosql1
[vagrant@nosql1 ~]$ cqlsh nosql1 -k test
Connected to develop-cluster at nosql1:9042.
[cqlsh 5.0.1 | Cassandra 3.11.2 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.
cqlsh:test> select * from kv limit 100;
key | value
-------------+---------
key-4-28995 | 28995
key-4-78904 | 78904
key-4-61524 | 61524
key-4-13764 | 13764
...
入っていることがなんとなく確認できるはず。