LoginSignup
1
1

More than 5 years have passed since last update.

Sparkを使ったCassandraデータロードメモ

Last updated at Posted at 2018-04-27

Sparkを使ったCassandraデータロードの簡単なメモです。

Cassandraクラスタの構築手順は以下参照してください。
https://qiita.com/48hands/items/05c2ad0ea89fe13afd57

まずは、keyspaceとtableを作成。

$ ssh nosql1

[vagrant@nosql1 ~]$ cqlsh nosql1
Connected to develop-cluster at nosql1:9042.
[cqlsh 5.0.1 | Cassandra 3.11.2 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.

cqlsh> CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2 };

cqlsh> CREATE TABLE test.kv(key text PRIMARY KEY, value int);

つづいて、SparkのコードをScalaで書いていく。中身は適当。

build.sbtを設定して

name := "CassandraLoad"

version := "0.1"

scalaVersion := "2.11.12"

resolvers += "Spark Packages Repo" at "https://dl.bintray.com/spark-packages/maven"

libraryDependencies ++= Seq(
  "datastax" % "spark-cassandra-connector" % "2.0.1-s_2.11",
  "org.apache.spark" %% "spark-sql" % "2.2.0"
)

LoadSample.scalaを作って

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.cassandra._

object LoadSample {

  def main(args: Array[String]): Unit = {
    import spark.implicits._

    val dfs = for (i <- 1 to 100) yield {
      val rdd: RDD[Int] = spark.sparkContext.parallelize(1 to 100000)
      rdd.map(line => (s"key-$i-$line", line)).toDF("key", "value")
    }

    dfs.foreach(df =>
      df.write.cassandraFormat("kv", "test").mode(SaveMode.Overwrite).save())
  }


  private val spark = SparkSession
    .builder
    .master("local[*]") // ローカルでエグゼキュータにクライアントPCのコア数と同数のスレッドを割り当て
    .appName("Load Sample")
    .config("spark.cassandra.connection.host",
      "192.168.33.41,192.168.33.42,192.168.33.43")
    .config("spark.cassandra.output.consistency.level", "ONE") // デフォルトだとLOCAL_QUORUMなので変更した。
    .getOrCreate()
}

実行すると、データが100,000 * 100 = 10,000,000件入っているはず。

$ ssh nosql1

[vagrant@nosql1 ~]$ cqlsh nosql1 -k test
Connected to develop-cluster at nosql1:9042.
[cqlsh 5.0.1 | Cassandra 3.11.2 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.

cqlsh:test> select * from kv limit 100;

 key         | value
-------------+---------
 key-4-28995 |   28995
 key-4-78904 |   78904
 key-4-61524 |   61524
 key-4-13764 |   13764
 ...

入っていることがなんとなく確認できるはず。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1