Edited at

PlayFramework2.6とSlickでDBにアクセスしてみた

More than 1 year has passed since last update.


はじめに

前回、Scala+PlayFramework2.6の通信周りをいじって見たので、今度はDB周りをいじってみることにします。

DBアクセスには、slick(play-slick)を使用し、DBにはMariaDBを使用しました。

なお、筆者はScalaもPlayFrameworkも初心者なので、おかしな点があれば指摘いただけると幸町です。


実装


前提

今回使用した各種バージョンは以下の通りです。

名前
バージョン

Scala
2.12.4

PlayFramework
2.6.7

MariaDB
10.2.12

play-slick
3.0.3

HikariCP
2.7.4


MariaDB

MariaDBはmaster-slave構成になっているものとします。今回は、ローカルにmasterとslaveの二つのインスタンスを立ち上げて確認しました。

種類
IP
ポート

Master
127.0.0.1
3306

Slave
127.0.0.1
3307

データベース"db"に、サンプルとして使う二つのテーブルを定義しました。何らかのゲームのユーザIDテーブルと、ユーザ名と一言テーブルといった体です。


user_id.sql

CREATE TABLE `user_id` (

`user_id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`created` timestamp NOT NULL DEFAULT current_timestamp(),
PRIMARY KEY (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1


basic.sql

CREATE TABLE `basic` (

`user_id` int(10) unsigned NOT NULL,
`user_name` varchar(255) NOT NULL DEFAULT '',
`one_liner` varchar(255) NOT NULL DEFAULT '',
`created` timestamp NOT NULL DEFAULT current_timestamp(),
`modified` timestamp NOT NULL DEFAULT current_timestamp() ON UPDATE current_timestamp(),
PRIMARY KEY (`user_id`),
CONSTRAINT `basic_id_fk` FOREIGN KEY (`user_id`) REFERENCES `user_id` (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4


build.sbt

build.sbtに、必要なライブラリを追記します。


build.sbt

libraryDependencies += "org.mariadb.jdbc" % "mariadb-java-client" % "2.2.1"

libraryDependencies += "com.zaxxer" % "HikariCP" % "2.7.4"
libraryDependencies += "com.typesafe.play" %% "play-slick" % "3.0.3"


application.conf

Slick用のDB設定をapplication.confに追記します。jdbcドライバはmariadbから提供されているものを使用しています。

今回はmaster-slave構成にアクセスするので、defaultではなく、masterとslaveという二つの設定を記述しました。

masterとslaveの違いは、poolName(名前)、autoCommit(自動コミット)、readOnly(Master-Slave切替に使う)だけです。

また、connectionPoolに、今回作成したデータソースのクラス名(object名)を指定しています。


application.conf

slick.dbs {

master {
profile="slick.jdbc.MySQLProfile$"
db {
connectionPool = "utils.ReplicationJdbcDataSource$"
numThreads = 2
queueSize = 1000
driver = "org.mariadb.jdbc.Driver"
user = "scott"
password = "tiger"
url = "jdbc:mariadb:replication://127.0.0.1:3306,127.0.0.1:3306/db"
poolName = "master"
autoCommit = false
}
}

slave {
profile="slick.jdbc.MySQLProfile$"
db {
connectionPool = "utils.ReplicationJdbcDataSource$"
numThreads = 2
queueSize = 1000
driver = "org.mariadb.jdbc.Driver"
user = "scott"
password = "tiger"
url = "jdbc:mariadb:replication://127.0.0.1:3306,127.0.0.1:3306/db"
poolName = "slave"
readOnly = true
}
}
}


master、slaveのどちらのurl設定にも、mariadb-jdbcドライバ独自の記述方法を使用しています(replication:)。これは、readOnlyがfalseなら先頭サーバを参照し、trueなら二番目以降のサーバを参照するというものです。

なお、master側の設定を"jdbc:mariadb://127.0.0.1:3306/db"、slave側の設定を"jdbc:mariadb://127.0.0.1:3307/db"と書いても動作します。


データソース

デフォルトのHikariCPでは、autoCommitの設定の切替えをうまく行うことができなかったため、継承させたデータソースを作成しました。

ソースを見てわかるとおり、configurationにautoCommitがあった場合、その設定を反映させるだけの実装です。

application.confのslick.dbs.[master|slave].db.connectionPoolに記載された"utils.ReplicationJdbcDataSource$"が、object utils.ReplicationJdbcDataSourceに対応します。


utils/ReplicationJdbcDataSource.scala

package utils

import java.sql.Driver

import com.typesafe.config.Config
import com.zaxxer.hikari.HikariDataSource
import slick.jdbc.JdbcDataSourceFactory
import slick.jdbc.hikaricp.HikariCPJdbcDataSource
import slick.util.ConfigExtensionMethods._

class ReplicationJdbcDataSource(ds: com.zaxxer.hikari.HikariDataSource, hconf: com.zaxxer.hikari.HikariConfig)
extends HikariCPJdbcDataSource(ds, hconf)
{
}

object ReplicationJdbcDataSource extends JdbcDataSourceFactory
{
override def forConfig(c: Config, driver: Driver, name: String, classLoader: ClassLoader): HikariCPJdbcDataSource = {
val hcpds = HikariCPJdbcDataSource.forConfig(c, driver, name, classLoader)

c.getBooleanOpt("autoCommit").foreach(hcpds.hconf.setAutoCommit)
val ds = new HikariDataSource(hcpds.hconf)

new ReplicationJdbcDataSource(ds, hcpds.hconf)
}
}


MariaDBはautoCommit=trueがデフォルトですが、個人的にはautoCommitは苦手なのでmaster側の設定をautoCommit=falseにしています。

slave側のautoCommitをfalseにすると、selectの後に毎回rollbackが走ってしまうのでslave側はそのままにしています。

なお、dbの設定がautoCommit=falseで、データソース側の設定もautoCommit=falseだったとしても、JDBCドライバさんが、丁寧に

commit

set autoCommit=1
set autoCommit=0
update..

のようにautoCommitを呼び出してくれます。"set autocommit"自体を呼び出さないようにする方法が見つからなかったので、ひとまずこのままです。


DBアクセス

調べてみるとPlayFramework+SlickでのDBアクセス方法はいろいろあるようですが、trait HasDatabaseConfig[JDBCProfile]を引っ張ってくるか、injectionされたDatabaseConfigProviderを使って取得するかがよくあるパターンのようです。

とはいえ、どちらにせよ、DatabaseConfigProviderからDatabaseConfigを取得し、そこから、Databaseインスタンスを取得するという方法のようです。

● よくあるパターン。

@Singleton
class HogehogeController @Inject()(cc: ControllerComponents, implicit val ec: ExecutionContext)
extends AbstractController(cc)
with HasDatabaseConfig[JDBCProfile]
{
db.run(DBへのアクセス....)
}

または

@Singleton
class HogehogeController @Inject(cc: ControllerComponents, dp: DatabaseConfigProvider[JDBCProfile], implicit val ec: ExecutionContext)
extends AbstractController(cc)
{
dp.get.db.run(DBへのアクセス....)
}

今回は、データベースアクセス用のシングルトンを作って、それをPlayFrameworkのControllerからアクセスする形で作って見ました。

これは、複数のControllerで同じ処理を書きたくないという理由と、少しだけひねったやり方を試してみるかという理由です。

また、今回はデータベースとして"default"ではなく、"master"と"slave"という二つの設定を使用しましたので、それぞれにアクセスするためにNamedDatabase annotationを使用しています。


services/DatabaseConnector.scala


package services

import javax.inject.{Inject, Singleton}

import play.api.db.slick.DatabaseConfigProvider
import play.db.NamedDatabase
import slick.basic.DatabasePublisher
import slick.dbio.{DBIOAction, Effect, NoStream, Streaming}

import scala.concurrent.{ExecutionContext, Future}

trait DatabaseConnector {
def query[R](action: DBIOAction[R, _ <: NoStream, _]): Future[R]
def stream[R](action: DBIOAction[_, Streaming[R], _]): DatabasePublisher[R]
def update[R, E <: Effect](action: DBIOAction[R, _ <: NoStream, E]): Future[R]
}

@Singleton
class SlickDatabaseConnector @Inject() (@NamedDatabase("master") masterDbConfigProvider: DatabaseConfigProvider,
@NamedDatabase("slave") slaveDbConfigProvider: DatabaseConfigProvider)
(implicit ec: ExecutionContext)
extends DatabaseConnector
{
override def query[R](action: DBIOAction[R, _ <: NoStream, _]): Future[R] = {
val db = slaveDbConfigProvider.get.db
db.run[R](action)
}

override def stream[R](action: DBIOAction[_, Streaming[R], _]): DatabasePublisher[R] = {
val db = slaveDbConfigProvider.get.db
db.stream(action)
}

override def update[R, E <: Effect](action: DBIOAction[R, _ <: NoStream, E]): Future[R] = {
val db = masterDbConfigProvider.get.db
db.run[R](action)
}
}


いろいろと試行錯誤したのですが、masterとslave用にメソッドを分ける形にしました。

最初は、actionのtype parameterのEffectを見て、Effect.WriteやEffect.Transactionalがあればmasterへ、無ければslaveのような処理も考えていたのですが、Actionの作り方次第で単純に切り分けるのが難しいケースがあることから、結局諦めました。

また、autoCommitがfalseであることから、上記updateメソッドへのactionのEffectがEffect.Transactionalでは無い場合は、自動的にtransactionallyを呼び出す処理も考えましたが、こちらも同様の理由から断念しました。なお、下記ソースの場合、Effect.Readのアクションでmasterデータベースにアクセスしようとするとコンパイル時にエラーになるという素敵仕様です。


services/DatabaseConnector.scala

//(試作廃棄版:上記ソースの差分のみ)

sealed trait DatabaseConnectorEffect[E <: Effect]

object DatabaseConnectorEffect {
implicit case object none extends DatabaseConnectorEffect[Effect]
implicit case object transactional extends DatabaseConnectorEffect[Effect.Transactional]
implicit case object write extends DatabaseConnectorEffect[Effect.Write]
implicit case object writeTransactional extends DatabaseConnectorEffect[Effect.Write with Effect.Transactional]
implicit case object readWrite extends DatabaseConnectorEffect[Effect.Read with Effect.Write]
implicit case object readWriteTransactional extends DatabaseConnectorEffect[Effect.Read with Effect.Write with Effect.Transactional]
}

override def update[R, E <: Effect](action: DBIOAction[R, _ <: NoStream, E])(implicit effect: DatabaseConnectorEffect[E]): Future[R] = {
val targetAction = effect match {
case DatabaseConnectorEffect.transactional
| DatabaseConnectorEffect.writeTransactional
| DatabaseConnectorEffect.readWriteTransactional => action
case _ => action.transactionally
}

val db = masterDbConfigProvider.get.db
db.run[R](targetAction)
}



Module

上記で作成したDatabaseConnectorのconcrete class(SlickDatabaseConnector)のシングルトンを登録します。


Module.scala

import services.{DatabaseConnector, SlickDatabaseConnector}

class Module extends AbstractModule {
override def configure(): Unit = {
// DB接続
bind(classOf[DatabaseConnector]).to(classOf[SlickDatabaseConnector])
}
}



モデル

テーブルuser_idとbasicに対応するDB用のモデルクラスを作成します。

これもいろいろとやり方があるようですが、テーブルの1レコードに対応するcase class、テーブル自体のclass、Actionを生成したりデータベース処理を実行するメソッドを提供するobjectの三つから構成することにしました。

user_idテーブルは、ユーザが新規登録したときにIDを振ることを目的としたテーブルなので、IDを振り出すアクションを生成するメソッド、それを呼び出すメソッド、現在の最大ユーザIDを取得するメソッドの三つを用意しました。

IDを振り出すアクションでは、あえて、SQLを直接記述しています。


model/UserId.scala

package model

import java.sql.Timestamp

import services.DatabaseConnector
import slick.lifted.{ProvenShape, Rep, TableQuery, Tag}
import slick.jdbc.MySQLProfile.api._

import scala.concurrent.{ExecutionContext, Future}

case class UserId(userId: Int, created: Timestamp)

class UserIdTable(tag: Tag) extends Table[UserId](tag, "user_id") {
def userId: Rep[Int] = column[Int]("user_id", O.PrimaryKey, O.AutoInc)
def created: Rep[Timestamp] = column[Timestamp]("created")

override def * : ProvenShape[UserId] = (userId, created) <> (UserId.tupled, UserId.unapply)
}

object UserIdQuery extends TableQuery(new UserIdTable(_)) {
def createUserAction: DBIOAction[Vector[Int], Streaming[Int], _] = {
sqlu"insert into user_id values()".andThen(sql"select last_insert_id()".as[Int])
}

def createUser()(implicit dc: DatabaseConnector, ec: ExecutionContext): Future[Option[Int]] =
dc.update(createUserAction.transactionally).map { _.headOption }

def currentUserId(implicit dc: DatabaseConnector, ec: ExecutionContext): Future[Option[Int]] =
dc.query(this.map(_.userId).max.result)
}


続いて、basicテーブルに関するモデルです。

idによるモデル取得、upsert(insert or update)、updateの三つのメソッドを作成しました(upsertとupdateは特に分ける必要は無かったが、勉強およびテストのために記述)。

また、selectした結果(カラム列)からモデル(Basic)を生成するための暗黙的な変換器(GetResult[Basic])も定義しています。

basicテーブルのcreatedフィールドは生成時、updatedフィールドは生成時/更新時に自動的に更新されるようにしています。しかし、TableQuery#insertOrUpdateを使用するとcreatedフィールドまで更新されてしまうので、自前でsqlを記述しています。


model/Basic.scala

package model

import java.sql.Timestamp

import services.DatabaseConnector
import slick.jdbc.GetResult
import slick.lifted.{ProvenShape, Rep, TableQuery, Tag}
import slick.jdbc.MySQLProfile.api._

import scala.concurrent.{ExecutionContext, Future}

case class Basic(userId: Int, userName: String, oneLiner: String, created: Timestamp, modified: Timestamp)

class BasicTable(tag: Tag) extends Table[Basic](tag, "basic") {
def userId: Rep[Int] = column[Int]("user_id", O.PrimaryKey, O.Unique)
def userName: Rep[String] = column[String]("user_name")
def oneLiner: Rep[String] = column[String]("one_liner")
def created: Rep[Timestamp] = column[Timestamp]("created")
def modified: Rep[Timestamp] = column[Timestamp]("modified")

override def * : ProvenShape[Basic] =
(userId, userName, oneLiner, created, modified) <> (Basic.tupled, Basic.unapply)
}

object BasicQuery extends TableQuery(new BasicTable(_)) {
implicit val getBasicResult:GetResult[Basic] = GetResult(rs => Basic(rs.<<, rs.<<, rs.<<, rs.<<, rs.<<))

def getById(userId: Int)(implicit dc: DatabaseConnector, ec: ExecutionContext): Future[Option[Basic]] =
dc.query(BasicQuery.filter(_.userId === userId).result).map { _.headOption }

//直接書く場合。この場合、GetResult[Basic]が暗黙的に参照される
//dc.query(sql"select user_id,user_name,created,modified from basic where user_id=$userId".as[Basic]).map { _.headOption }

def upsertNameAction(userId: Int, userName: Option[String], oneLiner: Option[String]): Option[DBIOAction[Int, NoStream, _]] =
// createdまで更新されてまう
// this.insertOrUpdate(Basic(userId, userName, oneLiner, null, null))
(userName, oneLiner) match {
case (Some(u), Some(o)) =>
Some(sqlu"""INSERT INTO basic(user_id,user_name,one_liner) VALUES($userId, $u, $o)
|ON DUPLICATE KEY UPDATE user_name=values(user_name), one_liner=values(one_liner)"""
)
case (Some(u), None) =>
Some(sqlu"""INSERT INTO basic(user_id,user_name) VALUES($userId, $u)
|ON DUPLICATE KEY UPDATE user_name=values(user_name)"""
)
case (None, Some(o)) =>
Some(sqlu"""INSERT INTO basic(user_id,one_liner) VALUES($userId, $o)
|ON DUPLICATE KEY UPDATE one_liner=values(one_liner)"""
)
case (None, None) => None
}

def upsertName(userId: Int, userName: Option[String], oneLiner: Option[String])
(implicit dc: DatabaseConnector, ec: ExecutionContext): Future[Int] =
upsertNameAction(userId, userName, oneLiner) match {
case Some(action) => dc.update(action.transactionally)
case None => Future(0)
}

def updateNameAction(userId: Int, userName: Option[String], oneLiner: Option[String]): Option[DBIOAction[Int, NoStream, _]] = {
(userName, oneLiner) match {
case (Some(u), Some(o)) => Some(filter(_.userId === userId).map(row => (row.userName, row.oneLiner)).update((u, o)))
case (Some(u), None) => Some(filter(_.userId === userId).map(row => row.userName).update(u))
case (None, Some(o)) => Some(filter(_.userId === userId).map(row => row.oneLiner).update(o))
case (None, None) => None
}
}

def updateName(userId: Int, userName: Option[String], oneLiner: Option[String])
(implicit dc: DatabaseConnector, ec: ExecutionContext): Future[Int] =
updateNameAction(userId, userName, oneLiner) match {
case Some(action) => dc.update(action.transactionally)
case None => Future(0)
}
}



Controller

PlayFrameworkのControllerからは、次のようにアクセスできます。


controllers/HogeController.scala

package controllers

import javax.inject._

import model.{Basic, BasicQuery, UserIdQuery}
import play.api.mvc._
import services.DatabaseConnector
import slick.basic.DatabasePublisher
import slick.jdbc.MySQLProfile.api._

import scala.concurrent.{ExecutionContext, Future}

@Singleton
class HogeController @Inject()(cc: ControllerComponents,
implicit val dc:DatabaseConnector,
implicit val ec:ExecutionContext
) extends AbstractController(cc)
{
def hoge = Action.async() {
// basicテーブルから10件取得する
dc.query(BasicQuery.take(10).result) // 結果:Future[Vector[Basic]]

// basicテーブルからStreamingで取得する
dc.stream(BasicQuery.to[Vector].result) // 結果: DatabasePublisher[Basic]

// 指定されたIDのbasicを取得する
BasicQuery.getById(userId) // 結果: Future[Option[Basic]]

// upsert
BasicQuery.upsertName(userId, Some("名前"), Some("一言"))

// 複数のupdateをtransactionで囲って実行
val a1 = BasicQuery.updateNameAction(userId1, Some("名前1"), Some("一言1"))
val a2 = BasicQuery.updateNameAction(userId2, Some("名前2"), None)
val a3 = BasicQuery.updateNameAction(userId3, None, None) // 何も起きない
dc.update(DBIO.seq(Vector(a1, a2, a3).flatten:_*).transactionally)
}
}



リファクタリング(追記)

やはり、メソッドでmaster/slave切り替えるのはちょっとあれな気がしてきたので、Master/Slaveを指定してDatabaseオブジェクトを取得できるようにしてみました。


services/DatabaseConnector.scala(新)

package services

import javax.inject.{Inject, Singleton}

import play.api.db.slick.DatabaseConfigProvider
import play.db.NamedDatabase
import slick.basic.BasicProfile

import scala.concurrent.ExecutionContext

/**
* Master/Slaveのサーバ種別のための基底trait
*/
sealed trait DatabaseType

/**
* Master/Slaveのサーバ種別
* DatabaseConnector.createInstanceを呼び出す場合、import DatabaseTypes._ を呼び出しておく必要がある
*/
object DatabaseTypes {
class Master extends DatabaseType
class Slave extends DatabaseType

implicit case object DbMaster extends Master
implicit case object DbSlave extends Slave
}

/**
* Injectの際に参照されるtrait
*/
trait DatabaseConnector {
def db[T <: DatabaseType]()(implicit dbType: T, ec: ExecutionContext): BasicProfile#Backend#Database
}

/**
* Injection本体
* @param masterDbConfigProvider
* @param slaveDbConfigProvider
*/
@Singleton
class SlickDatabaseConnector @Inject()(@NamedDatabase("master") masterDbConfigProvider: DatabaseConfigProvider,
@NamedDatabase("slave") slaveDbConfigProvider: DatabaseConfigProvider)
extends DatabaseConnector
{
import DatabaseTypes._

override def db[T <: DatabaseType]()(implicit dbType: T, ec: ExecutionContext): BasicProfile#Backend#Database = {
dbType match {
case _: Master => masterDbConfigProvider.get.db
case _: Slave => slaveDbConfigProvider.get.db
}
}
}


これに伴い、呼び出し方法は以下のように変更しました (一部のみ)。


model/Basic.scala(旧)

object BasicQuery extends TableQuery(new BasicTable(_)) {

implicit val getBasicResult:GetResult[Basic] = GetResult(rs => Basic(rs.<<, rs.<<, rs.<<, rs.<<, rs.<<))

def getById(userId: Int)(implicit dc: DatabaseConnector, ec: ExecutionContext): Future[Option[Basic]] =
dc.query(BasicQuery.filter(_.userId === userId).result).map { _.headOption }



model/Basic.scala(新)

object BasicQuery extends TableQuery(new BasicTable(_)) {

// importを追加
import services.DatabaseTypes._

implicit val getBasicResult:GetResult[Basic] = GetResult(rs => Basic(rs.<<, rs.<<, rs.<<, rs.<<, rs.<<))

def getById(userId: Int)(implicit dc: DatabaseConnector, ec: ExecutionContext): Future[Option[Basic]] =
// dcをdc.db[Slave]に変更。Databaseオブジェクトのrunメソッドを直接呼び出すように変更。
dc.db[Slave].run(BasicQuery.filter(_.userId === userId).result).map { _.headOption }