LoginSignup
2
2

More than 5 years have passed since last update.

Scala(phantom)でCassandra

Last updated at Posted at 2018-04-18

始めに

ScalaからCassandraを使ったことがなかったので試してみました。
最初はJavaのドライバーを使おうかとも思いましたが、phantomもGitHubのスター数に大差が無かったしScalaで書かれてるのでこちらを使ってみました。

※CassandraはDockerのオフィシャルリポジトリをシングルノードで起てています。Cassandra自体の解説はしません。

構成

Scala: 2.12
sbt: 1.1
phantom: 2.24
Cassandra: 3.11

成果物

sbt

sbtの構成です。CassandraのContactPointを設定ファイルから取るためlightbendのconfigも追加しています。

build.sbt
name := "scala-cassandra-example"

version := "0.1"

scalaVersion := "2.12.5"

val phantomVersion = "2.24.2"

libraryDependencies ++= Seq(
  "com.outworkers"  %% "phantom-dsl" % phantomVersion,
  "com.outworkers"  %% "phantom-connectors" % phantomVersion,
  "com.typesafe" % "config" % "1.3.3",
  "org.scalatest" %% "scalatest" % "3.0.5" % Test
)

テーブルモデル

phantomはO/Rマッピングが利用できます(というより直接CQLを叩こうと思ったら面倒くさい気がする)。
普通に作っても良かったんですが、せっかくCassandraだしPartition(Composite Partition Key)を意識した感じにしてみます。

model.scala
package com.example

import java.util.UUID

import scala.concurrent.Future

import com.outworkers.phantom.Table
import com.outworkers.phantom.builder.query.InsertQuery
import com.outworkers.phantom.dsl._
import com.outworkers.phantom.keys.PartitionKey

// データ
// こちらにはPartition情報を含めない
final case class Message(id: UUID, message: String, timestamp: Long)

// テーブル
abstract class Messages extends Table[Messages, Message] {

  // categoryとsubcategoryによるComposite Partition Key
  object category extends StringColumn with PartitionKey
  object subcategory extends StringColumn with PartitionKey

  object id extends UUIDColumn with PrimaryKey
  object message extends StringColumn
  object timestamp extends LongColumn

  // Partitionを指定して保存
  def store(partition: (String, String), msg: Message): InsertQuery.Default[Messages, Message] =
    insert
      .value(_.category, partition._1)
      .value(_.subcategory, partition._2)
      .value(_.id, msg.id)
      .value(_.message, msg.message)
      .value(_.timestamp, msg.timestamp)

  // Partitionを検索
  def findByPartition(partition: (String, String)): Future[List[Message]] =
    select.where(_.category eqs partition._1).and(_.subcategory eqs partition._2).fetch()

  // Partition+IDで検索
  def findById(partition: (String, String), id: UUID): Future[Option[Message]] =
    select
      .where(_.category eqs partition._1)
      .and(_.subcategory eqs partition._2)
      .and(_.id eqs id)
      .one()

}

データベース

一応Mockなり何なりできるようにプロバイダとしてデータベースを提供するようにしてます(参考)。

database.scala
package com.example

import com.outworkers.phantom.connectors.ContactPoint
import com.outworkers.phantom.database.{Database, DatabaseProvider}
import com.outworkers.phantom.dsl.{CassandraConnection, KeySpace, replication, _}
import com.typesafe.config.ConfigFactory

// コネクタ(Singleton)
object Connector {

  private val config = ConfigFactory.load()

  config.checkValid(ConfigFactory.defaultReference(), "cassandra")

  val connection: CassandraConnection =
    ContactPoint(config.getString("cassandra.host"), config.getInt("cassandra.port"))
      .noHeartbeat()
      .noHeartbeat()
      .keySpace(
        KeySpace(config.getString("cassandra.keyspace"))
          .ifNotExists()
          .`with`(
            replication eqs SimpleStrategy.replication_factor(1)
          )
      )
}

// データベース定義
class AppDatabase(override val connector: CassandraConnection)
    extends Database[AppDatabase](connector) {
  object messages extends Messages with Connector
}

// プロバイダとして提供
trait AppDatabaseProvider extends DatabaseProvider[AppDatabase]

サービス

一応サービス化しときます。Batch処理が出来るようなので入れてみました。
CassandraのBatch処理って件数に上限があった気がするんですが、どこで設定するのか不明です・・・。

MessageService.scala
package com.example

import scala.concurrent.Future

import com.outworkers.phantom.ResultSet
import com.outworkers.phantom.dsl._

trait MessageService extends AppDatabaseProvider {

  def store(partition: (String, String), msg: Message): Future[ResultSet] =
    db.messages.store(partition, msg).future()

  def findPartition(partition: (String, String)): Future[List[Message]] =
    db.messages.findByPartition(partition)

  def findById(partition: (String, String), id: UUID): Future[Option[Message]] =
    db.messages.findById(partition, id)

  def batchStore(partition: (String, String), messages: Message*): Future[ResultSet] =
    Batch.logged.add(messages.map(msg => db.messages.store(partition, msg))).future()

}

テスト

上記サービスをテストしてみます。コネクタをテスト用に定義してサービスに渡しています。
どうやらphantomは指定したモデルからテーブルを作成できるようです(messageService.database.create())。
ScalaFuturesのpatienceConfigがデフォルトだとtimeoutする可能性があるのでオーバーライドして延長しています。

MessageServiceSpec.scala
package com.example

import java.time.{LocalDateTime, ZoneId}
import java.util.UUID

import scala.language.reflectiveCalls

import com.datastax.driver.core.SocketOptions
import com.outworkers.phantom.connectors.{CassandraConnection, ContactPoint}
import com.outworkers.phantom.dsl._
import com.typesafe.config.{Config, ConfigFactory}
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.time.{Millis, Seconds, Span}
import org.scalatest.{BeforeAndAfterAll, Matchers, OptionValues, WordSpecLike}

class MessageServiceSpec
    extends WordSpecLike
    with Matchers
    with BeforeAndAfterAll
    with ScalaFutures
    with OptionValues {

  // デフォルトだとタイムアウトする可能性あり
  override implicit val patienceConfig: PatienceConfig =
    PatienceConfig(timeout = Span(5, Seconds), interval = Span(200, Millis))

  object TestConnector {
    private val config: Config = ConfigFactory.parseString("""cassandra {
        |  host: "x.x.x.x" // IP指定
        |  port: 9042
        |  keyspace: "scala_cassandra_example"
        |}
      """.stripMargin)

    val connection: CassandraConnection =
      ContactPoint(config.getString("cassandra.host"), config.getInt("cassandra.port"))
        .withClusterBuilder(
          _.withSocketOptions(
            new SocketOptions()
              .setConnectTimeoutMillis(20000)
              .setReadTimeoutMillis(20000)
          ))
        .noHeartbeat()
        .keySpace(
          KeySpace(config.getString("cassandra.keyspace"))
            .ifNotExists()
            .`with`(
              replication eqs SimpleStrategy.replication_factor(1)
            )
        )
  }

  object TestDatabase extends AppDatabase(TestConnector.connection)

  trait TestDatabaseProvider extends AppDatabaseProvider {
    override def database: AppDatabase = TestDatabase
  }

  val messageService = new MessageService with TestDatabaseProvider

  override def beforeAll(): Unit = {
    messageService.database.create()
    ()
  }

  override def afterAll(): Unit = {
    messageService.database.drop()
    ()
  }

  "message service" should {

    "store and find by id" in {
      val partition: (String, String) = ("A", "1")
      val message = Message(UUID.randomUUID(),
                            "Test",
                            LocalDateTime.now().atZone(ZoneId.systemDefault()).toEpochSecond)
      val q = for {
        _ <- messageService.store(partition, message)
        find <- messageService.findById(partition, message.id)
      } yield find

      whenReady(q) { find =>
        find shouldBe defined
        find.value shouldBe message
      }
    }

    "batch store and find by partition" in {
      val partition: (String, String) = ("B", "1")
      val messages = Seq
        .range(0, 1000)
        .map(
          i =>
            Message(UUID.randomUUID(),
                    "Test" + i,
                    LocalDateTime.now().atZone(ZoneId.systemDefault()).toEpochSecond))

      val q = for {
        _ <- messageService.batchStore(partition, messages: _*)
        res <- messageService.findPartition(partition)
      } yield res

      whenReady(q) { res =>
        res.size shouldBe 1000
      }
    }
  }
}

終わりに

phantom自体はたぶん高機能で色々出来そうなんですが、ドキュメント読むのがしんどい(&未完成?)のでこの辺りで妥協しましたとさ。

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2