はじめに
前回、Scala+PlayFramework2.6の通信周りをいじって見たので、今度はDB周りをいじってみることにします。
DBアクセスには、slick(play-slick)を使用し、DBにはMariaDBを使用しました。
なお、筆者はScalaもPlayFrameworkも初心者なので、おかしな点があれば指摘いただけると幸町です。
実装
前提
今回使用した各種バージョンは以下の通りです。
名前 | バージョン |
---|---|
Scala | 2.12.4 |
PlayFramework | 2.6.7 |
MariaDB | 10.2.12 |
play-slick | 3.0.3 |
HikariCP | 2.7.4 |
MariaDB
MariaDBはmaster-slave構成になっているものとします。今回は、ローカルにmasterとslaveの二つのインスタンスを立ち上げて確認しました。
種類 | IP | ポート |
---|---|---|
Master | 127.0.0.1 | 3306 |
Slave | 127.0.0.1 | 3307 |
データベース"db"に、サンプルとして使う二つのテーブルを定義しました。何らかのゲームのユーザIDテーブルと、ユーザ名と一言テーブルといった体です。
CREATE TABLE `user_id` (
`user_id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`created` timestamp NOT NULL DEFAULT current_timestamp(),
PRIMARY KEY (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1
CREATE TABLE `basic` (
`user_id` int(10) unsigned NOT NULL,
`user_name` varchar(255) NOT NULL DEFAULT '',
`one_liner` varchar(255) NOT NULL DEFAULT '',
`created` timestamp NOT NULL DEFAULT current_timestamp(),
`modified` timestamp NOT NULL DEFAULT current_timestamp() ON UPDATE current_timestamp(),
PRIMARY KEY (`user_id`),
CONSTRAINT `basic_id_fk` FOREIGN KEY (`user_id`) REFERENCES `user_id` (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
build.sbt
build.sbtに、必要なライブラリを追記します。
libraryDependencies += "org.mariadb.jdbc" % "mariadb-java-client" % "2.2.1"
libraryDependencies += "com.zaxxer" % "HikariCP" % "2.7.4"
libraryDependencies += "com.typesafe.play" %% "play-slick" % "3.0.3"
application.conf
Slick用のDB設定をapplication.confに追記します。jdbcドライバはmariadbから提供されているものを使用しています。
今回はmaster-slave構成にアクセスするので、defaultではなく、masterとslaveという二つの設定を記述しました。
masterとslaveの違いは、poolName(名前)、autoCommit(自動コミット)、readOnly(Master-Slave切替に使う)だけです。
また、connectionPoolに、今回作成したデータソースのクラス名(object名)を指定しています。
slick.dbs {
master {
profile="slick.jdbc.MySQLProfile$"
db {
connectionPool = "utils.ReplicationJdbcDataSource$"
numThreads = 2
queueSize = 1000
driver = "org.mariadb.jdbc.Driver"
user = "scott"
password = "tiger"
url = "jdbc:mariadb:replication://127.0.0.1:3306,127.0.0.1:3306/db"
poolName = "master"
autoCommit = false
}
}
slave {
profile="slick.jdbc.MySQLProfile$"
db {
connectionPool = "utils.ReplicationJdbcDataSource$"
numThreads = 2
queueSize = 1000
driver = "org.mariadb.jdbc.Driver"
user = "scott"
password = "tiger"
url = "jdbc:mariadb:replication://127.0.0.1:3306,127.0.0.1:3306/db"
poolName = "slave"
readOnly = true
}
}
}
master、slaveのどちらのurl設定にも、mariadb-jdbcドライバ独自の記述方法を使用しています(replication:)。これは、readOnlyがfalseなら先頭サーバを参照し、trueなら二番目以降のサーバを参照するというものです。
なお、master側の設定を"jdbc:mariadb://127.0.0.1:3306/db"、slave側の設定を"jdbc:mariadb://127.0.0.1:3307/db"と書いても動作します。
データソース
デフォルトのHikariCPでは、autoCommitの設定の切替えをうまく行うことができなかったため、継承させたデータソースを作成しました。
ソースを見てわかるとおり、configurationにautoCommitがあった場合、その設定を反映させるだけの実装です。
application.confのslick.dbs.[master|slave].db.connectionPoolに記載された"utils.ReplicationJdbcDataSource$"が、object utils.ReplicationJdbcDataSourceに対応します。
package utils
import java.sql.Driver
import com.typesafe.config.Config
import com.zaxxer.hikari.HikariDataSource
import slick.jdbc.JdbcDataSourceFactory
import slick.jdbc.hikaricp.HikariCPJdbcDataSource
import slick.util.ConfigExtensionMethods._
class ReplicationJdbcDataSource(ds: com.zaxxer.hikari.HikariDataSource, hconf: com.zaxxer.hikari.HikariConfig)
extends HikariCPJdbcDataSource(ds, hconf)
{
}
object ReplicationJdbcDataSource extends JdbcDataSourceFactory
{
override def forConfig(c: Config, driver: Driver, name: String, classLoader: ClassLoader): HikariCPJdbcDataSource = {
val hcpds = HikariCPJdbcDataSource.forConfig(c, driver, name, classLoader)
c.getBooleanOpt("autoCommit").foreach(hcpds.hconf.setAutoCommit)
val ds = new HikariDataSource(hcpds.hconf)
new ReplicationJdbcDataSource(ds, hcpds.hconf)
}
}
MariaDBはautoCommit=trueがデフォルトですが、個人的にはautoCommitは苦手なのでmaster側の設定をautoCommit=falseにしています。
slave側のautoCommitをfalseにすると、selectの後に毎回rollbackが走ってしまうのでslave側はそのままにしています。
なお、dbの設定がautoCommit=falseで、データソース側の設定もautoCommit=falseだったとしても、JDBCドライバさんが、丁寧に
commit
set autoCommit=1
set autoCommit=0
update..
のようにautoCommitを呼び出してくれます。"set autocommit"自体を呼び出さないようにする方法が見つからなかったので、ひとまずこのままです。
DBアクセス
調べてみるとPlayFramework+SlickでのDBアクセス方法はいろいろあるようですが、trait HasDatabaseConfig[JDBCProfile]を引っ張ってくるか、injectionされたDatabaseConfigProviderを使って取得するかがよくあるパターンのようです。
とはいえ、どちらにせよ、DatabaseConfigProviderからDatabaseConfigを取得し、そこから、Databaseインスタンスを取得するという方法のようです。
● よくあるパターン。
@Singleton
class HogehogeController @Inject()(cc: ControllerComponents, implicit val ec: ExecutionContext)
extends AbstractController(cc)
with HasDatabaseConfig[JDBCProfile]
{
db.run(DBへのアクセス....)
}
または
@Singleton
class HogehogeController @Inject(cc: ControllerComponents, dp: DatabaseConfigProvider[JDBCProfile], implicit val ec: ExecutionContext)
extends AbstractController(cc)
{
dp.get.db.run(DBへのアクセス....)
}
今回は、データベースアクセス用のシングルトンを作って、それをPlayFrameworkのControllerからアクセスする形で作って見ました。
これは、複数のControllerで同じ処理を書きたくないという理由と、少しだけひねったやり方を試してみるかという理由です。
また、今回はデータベースとして"default"ではなく、"master"と"slave"という二つの設定を使用しましたので、それぞれにアクセスするためにNamedDatabase annotationを使用しています。
package services
import javax.inject.{Inject, Singleton}
import play.api.db.slick.DatabaseConfigProvider
import play.db.NamedDatabase
import slick.basic.DatabasePublisher
import slick.dbio.{DBIOAction, Effect, NoStream, Streaming}
import scala.concurrent.{ExecutionContext, Future}
trait DatabaseConnector {
def query[R](action: DBIOAction[R, _ <: NoStream, _]): Future[R]
def stream[R](action: DBIOAction[_, Streaming[R], _]): DatabasePublisher[R]
def update[R, E <: Effect](action: DBIOAction[R, _ <: NoStream, E]): Future[R]
}
@Singleton
class SlickDatabaseConnector @Inject() (@NamedDatabase("master") masterDbConfigProvider: DatabaseConfigProvider,
@NamedDatabase("slave") slaveDbConfigProvider: DatabaseConfigProvider)
(implicit ec: ExecutionContext)
extends DatabaseConnector
{
override def query[R](action: DBIOAction[R, _ <: NoStream, _]): Future[R] = {
val db = slaveDbConfigProvider.get.db
db.run[R](action)
}
override def stream[R](action: DBIOAction[_, Streaming[R], _]): DatabasePublisher[R] = {
val db = slaveDbConfigProvider.get.db
db.stream(action)
}
override def update[R, E <: Effect](action: DBIOAction[R, _ <: NoStream, E]): Future[R] = {
val db = masterDbConfigProvider.get.db
db.run[R](action)
}
}
いろいろと試行錯誤したのですが、masterとslave用にメソッドを分ける形にしました。
最初は、actionのtype parameterのEffectを見て、Effect.WriteやEffect.Transactionalがあればmasterへ、無ければslaveのような処理も考えていたのですが、Actionの作り方次第で単純に切り分けるのが難しいケースがあることから、結局諦めました。
また、autoCommitがfalseであることから、上記updateメソッドへのactionのEffectがEffect.Transactionalでは無い場合は、自動的にtransactionallyを呼び出す処理も考えましたが、こちらも同様の理由から断念しました。なお、下記ソースの場合、Effect.Readのアクションでmasterデータベースにアクセスしようとするとコンパイル時にエラーになるという素敵仕様です。
//(試作廃棄版:上記ソースの差分のみ)
sealed trait DatabaseConnectorEffect[E <: Effect]
object DatabaseConnectorEffect {
implicit case object none extends DatabaseConnectorEffect[Effect]
implicit case object transactional extends DatabaseConnectorEffect[Effect.Transactional]
implicit case object write extends DatabaseConnectorEffect[Effect.Write]
implicit case object writeTransactional extends DatabaseConnectorEffect[Effect.Write with Effect.Transactional]
implicit case object readWrite extends DatabaseConnectorEffect[Effect.Read with Effect.Write]
implicit case object readWriteTransactional extends DatabaseConnectorEffect[Effect.Read with Effect.Write with Effect.Transactional]
}
override def update[R, E <: Effect](action: DBIOAction[R, _ <: NoStream, E])(implicit effect: DatabaseConnectorEffect[E]): Future[R] = {
val targetAction = effect match {
case DatabaseConnectorEffect.transactional
| DatabaseConnectorEffect.writeTransactional
| DatabaseConnectorEffect.readWriteTransactional => action
case _ => action.transactionally
}
val db = masterDbConfigProvider.get.db
db.run[R](targetAction)
}
Module
上記で作成したDatabaseConnectorのconcrete class(SlickDatabaseConnector)のシングルトンを登録します。
import services.{DatabaseConnector, SlickDatabaseConnector}
class Module extends AbstractModule {
override def configure(): Unit = {
// DB接続
bind(classOf[DatabaseConnector]).to(classOf[SlickDatabaseConnector])
}
}
モデル
テーブルuser_idとbasicに対応するDB用のモデルクラスを作成します。
これもいろいろとやり方があるようですが、テーブルの1レコードに対応するcase class、テーブル自体のclass、Actionを生成したりデータベース処理を実行するメソッドを提供するobjectの三つから構成することにしました。
user_idテーブルは、ユーザが新規登録したときにIDを振ることを目的としたテーブルなので、IDを振り出すアクションを生成するメソッド、それを呼び出すメソッド、現在の最大ユーザIDを取得するメソッドの三つを用意しました。
IDを振り出すアクションでは、あえて、SQLを直接記述しています。
package model
import java.sql.Timestamp
import services.DatabaseConnector
import slick.lifted.{ProvenShape, Rep, TableQuery, Tag}
import slick.jdbc.MySQLProfile.api._
import scala.concurrent.{ExecutionContext, Future}
case class UserId(userId: Int, created: Timestamp)
class UserIdTable(tag: Tag) extends Table[UserId](tag, "user_id") {
def userId: Rep[Int] = column[Int]("user_id", O.PrimaryKey, O.AutoInc)
def created: Rep[Timestamp] = column[Timestamp]("created")
override def * : ProvenShape[UserId] = (userId, created) <> (UserId.tupled, UserId.unapply)
}
object UserIdQuery extends TableQuery(new UserIdTable(_)) {
def createUserAction: DBIOAction[Vector[Int], Streaming[Int], _] = {
sqlu"insert into user_id values()".andThen(sql"select last_insert_id()".as[Int])
}
def createUser()(implicit dc: DatabaseConnector, ec: ExecutionContext): Future[Option[Int]] =
dc.update(createUserAction.transactionally).map { _.headOption }
def currentUserId(implicit dc: DatabaseConnector, ec: ExecutionContext): Future[Option[Int]] =
dc.query(this.map(_.userId).max.result)
}
続いて、basicテーブルに関するモデルです。
idによるモデル取得、upsert(insert or update)、updateの三つのメソッドを作成しました(upsertとupdateは特に分ける必要は無かったが、勉強およびテストのために記述)。
また、selectした結果(カラム列)からモデル(Basic)を生成するための暗黙的な変換器(GetResult[Basic])も定義しています。
basicテーブルのcreatedフィールドは生成時、updatedフィールドは生成時/更新時に自動的に更新されるようにしています。しかし、TableQuery#insertOrUpdateを使用するとcreatedフィールドまで更新されてしまうので、自前でsqlを記述しています。
package model
import java.sql.Timestamp
import services.DatabaseConnector
import slick.jdbc.GetResult
import slick.lifted.{ProvenShape, Rep, TableQuery, Tag}
import slick.jdbc.MySQLProfile.api._
import scala.concurrent.{ExecutionContext, Future}
case class Basic(userId: Int, userName: String, oneLiner: String, created: Timestamp, modified: Timestamp)
class BasicTable(tag: Tag) extends Table[Basic](tag, "basic") {
def userId: Rep[Int] = column[Int]("user_id", O.PrimaryKey, O.Unique)
def userName: Rep[String] = column[String]("user_name")
def oneLiner: Rep[String] = column[String]("one_liner")
def created: Rep[Timestamp] = column[Timestamp]("created")
def modified: Rep[Timestamp] = column[Timestamp]("modified")
override def * : ProvenShape[Basic] =
(userId, userName, oneLiner, created, modified) <> (Basic.tupled, Basic.unapply)
}
object BasicQuery extends TableQuery(new BasicTable(_)) {
implicit val getBasicResult:GetResult[Basic] = GetResult(rs => Basic(rs.<<, rs.<<, rs.<<, rs.<<, rs.<<))
def getById(userId: Int)(implicit dc: DatabaseConnector, ec: ExecutionContext): Future[Option[Basic]] =
dc.query(BasicQuery.filter(_.userId === userId).result).map { _.headOption }
//直接書く場合。この場合、GetResult[Basic]が暗黙的に参照される
//dc.query(sql"select user_id,user_name,created,modified from basic where user_id=$userId".as[Basic]).map { _.headOption }
def upsertNameAction(userId: Int, userName: Option[String], oneLiner: Option[String]): Option[DBIOAction[Int, NoStream, _]] =
// createdまで更新されてまう
// this.insertOrUpdate(Basic(userId, userName, oneLiner, null, null))
(userName, oneLiner) match {
case (Some(u), Some(o)) =>
Some(sqlu"""INSERT INTO basic(user_id,user_name,one_liner) VALUES($userId, $u, $o)
|ON DUPLICATE KEY UPDATE user_name=values(user_name), one_liner=values(one_liner)""")
case (Some(u), None) =>
Some(sqlu"""INSERT INTO basic(user_id,user_name) VALUES($userId, $u)
|ON DUPLICATE KEY UPDATE user_name=values(user_name)""")
case (None, Some(o)) =>
Some(sqlu"""INSERT INTO basic(user_id,one_liner) VALUES($userId, $o)
|ON DUPLICATE KEY UPDATE one_liner=values(one_liner)""")
case (None, None) => None
}
def upsertName(userId: Int, userName: Option[String], oneLiner: Option[String])
(implicit dc: DatabaseConnector, ec: ExecutionContext): Future[Int] =
upsertNameAction(userId, userName, oneLiner) match {
case Some(action) => dc.update(action.transactionally)
case None => Future(0)
}
def updateNameAction(userId: Int, userName: Option[String], oneLiner: Option[String]): Option[DBIOAction[Int, NoStream, _]] = {
(userName, oneLiner) match {
case (Some(u), Some(o)) => Some(filter(_.userId === userId).map(row => (row.userName, row.oneLiner)).update((u, o)))
case (Some(u), None) => Some(filter(_.userId === userId).map(row => row.userName).update(u))
case (None, Some(o)) => Some(filter(_.userId === userId).map(row => row.oneLiner).update(o))
case (None, None) => None
}
}
def updateName(userId: Int, userName: Option[String], oneLiner: Option[String])
(implicit dc: DatabaseConnector, ec: ExecutionContext): Future[Int] =
updateNameAction(userId, userName, oneLiner) match {
case Some(action) => dc.update(action.transactionally)
case None => Future(0)
}
}
Controller
PlayFrameworkのControllerからは、次のようにアクセスできます。
package controllers
import javax.inject._
import model.{Basic, BasicQuery, UserIdQuery}
import play.api.mvc._
import services.DatabaseConnector
import slick.basic.DatabasePublisher
import slick.jdbc.MySQLProfile.api._
import scala.concurrent.{ExecutionContext, Future}
@Singleton
class HogeController @Inject()(cc: ControllerComponents,
implicit val dc:DatabaseConnector,
implicit val ec:ExecutionContext
) extends AbstractController(cc)
{
def hoge = Action.async() {
// basicテーブルから10件取得する
dc.query(BasicQuery.take(10).result) // 結果:Future[Vector[Basic]]
// basicテーブルからStreamingで取得する
dc.stream(BasicQuery.to[Vector].result) // 結果: DatabasePublisher[Basic]
// 指定されたIDのbasicを取得する
BasicQuery.getById(userId) // 結果: Future[Option[Basic]]
// upsert
BasicQuery.upsertName(userId, Some("名前"), Some("一言"))
// 複数のupdateをtransactionで囲って実行
val a1 = BasicQuery.updateNameAction(userId1, Some("名前1"), Some("一言1"))
val a2 = BasicQuery.updateNameAction(userId2, Some("名前2"), None)
val a3 = BasicQuery.updateNameAction(userId3, None, None) // 何も起きない
dc.update(DBIO.seq(Vector(a1, a2, a3).flatten:_*).transactionally)
}
}
リファクタリング(追記)
やはり、メソッドでmaster/slave切り替えるのはちょっとあれな気がしてきたので、Master/Slaveを指定してDatabaseオブジェクトを取得できるようにしてみました。
package services
import javax.inject.{Inject, Singleton}
import play.api.db.slick.DatabaseConfigProvider
import play.db.NamedDatabase
import slick.basic.BasicProfile
import scala.concurrent.ExecutionContext
/**
* Master/Slaveのサーバ種別のための基底trait
*/
sealed trait DatabaseType
/**
* Master/Slaveのサーバ種別
* DatabaseConnector.createInstanceを呼び出す場合、import DatabaseTypes._ を呼び出しておく必要がある
*/
object DatabaseTypes {
class Master extends DatabaseType
class Slave extends DatabaseType
implicit case object DbMaster extends Master
implicit case object DbSlave extends Slave
}
/**
* Injectの際に参照されるtrait
*/
trait DatabaseConnector {
def db[T <: DatabaseType]()(implicit dbType: T, ec: ExecutionContext): BasicProfile#Backend#Database
}
/**
* Injection本体
* @param masterDbConfigProvider
* @param slaveDbConfigProvider
*/
@Singleton
class SlickDatabaseConnector @Inject()(@NamedDatabase("master") masterDbConfigProvider: DatabaseConfigProvider,
@NamedDatabase("slave") slaveDbConfigProvider: DatabaseConfigProvider)
extends DatabaseConnector
{
import DatabaseTypes._
override def db[T <: DatabaseType]()(implicit dbType: T, ec: ExecutionContext): BasicProfile#Backend#Database = {
dbType match {
case _: Master => masterDbConfigProvider.get.db
case _: Slave => slaveDbConfigProvider.get.db
}
}
}
これに伴い、呼び出し方法は以下のように変更しました (一部のみ)。
object BasicQuery extends TableQuery(new BasicTable(_)) {
implicit val getBasicResult:GetResult[Basic] = GetResult(rs => Basic(rs.<<, rs.<<, rs.<<, rs.<<, rs.<<))
def getById(userId: Int)(implicit dc: DatabaseConnector, ec: ExecutionContext): Future[Option[Basic]] =
dc.query(BasicQuery.filter(_.userId === userId).result).map { _.headOption }
object BasicQuery extends TableQuery(new BasicTable(_)) {
// importを追加
import services.DatabaseTypes._
implicit val getBasicResult:GetResult[Basic] = GetResult(rs => Basic(rs.<<, rs.<<, rs.<<, rs.<<, rs.<<))
def getById(userId: Int)(implicit dc: DatabaseConnector, ec: ExecutionContext): Future[Option[Basic]] =
// dcをdc.db[Slave]に変更。Databaseオブジェクトのrunメソッドを直接呼び出すように変更。
dc.db[Slave].run(BasicQuery.filter(_.userId === userId).result).map { _.headOption }