このたびはScalaMatsuriのセッションに投票していただき、ありがとうございました。
今回はそのScalaMatsuriのセッションで発表予定の内容の一つであるドワンゴ秘蔵のトランザクションモナドについて解説したいと思います。
このトランザクションモナドは基本的な機能だけなら30行ほどの短いコードで記述できてしまうのですが、なかなか説明が難しい代物でして、
ScalaMatsuriの自分の発表時間内に聴衆のみなさんに理解していただくのは難しいだろうということで、先に解説記事を書くことにしました。
このトランザクションモナドは作者の名前から通称Fujitaskと呼ばれているのですが、作者の方は周りから「天才」と言われてまして、彼は常人が思いつかないようなコードを書かれるんですね。
Fujitaskは短いながらも、モナドと、サブタイピング(変位指定)と、アドホックポリモーフィズムの三つの機能を使っており、Scalaでなければ実現できない機能を持っています。
特に変位指定の部分は、Fujitaskで一番難解なところであり、そして、まさに「発明」と呼ぶべき発想があります。
それでは、その天才が産み出したFujitaskを解説していきたいと思います。
Fujitaskの機能
Fujitaskの機能はおおまかに言って以下のような機能があります。
- トランザクションの抽象化ができる
- トランザクションの合成ができる
- そのトランザクションが読み込みなのか、読み込みと書き込みの両方ができるのかを 合成により 判断する
1と2は「トランザクションモナド」って言うんだから、それくらいはできるでしょという感じだと思いますが、3がこのFujitaskのポイントです。
たとえば、Fujitaskで読み込みしかできないReadトランザクションと、読み込みと書き込みができるReadWriteトランザクションを合成すると、ReadWriteのトランザクションになります。
これはMaster/Slave構成のストレージでは特に重要になります。
Master/Slave構成のストレージの場合、基本的にReadトランザクションの場合はSlaveに問い合わせに行き、ReadWriteトランザクションの場合はMasterに問い合わせに行くわけですが、
このような構成の場合、以下のような問題がよくあるのではないでしょうか。
- Masterに行くか、Slaveに行くかを個々のケースについていちいち記述しなければいけなくて面倒くさい
- 間違って本来はMasterに行くべき更新のクエリーをSlaveに発行しても、コンパイル時ではなく実行時エラーにより判明する
- Slaveに問い合わせに行くべき処理をMasterに問い合わせてしまっても、エラーにはならないので気づきにくい
Fujitaskにはこれらの問題を解決する機能があります。
したがって、トランザクション機能がないストレージ、たとえばMaster/Slave構成のRedisなどでもFujitaskは有効です。
サンプルコード
今回解説するサンプルコードは hexx/fujitask-simple にあります。
このコードには3つのsbtプロジェクトがあります。
-
fujitaskはFujitaskの基本部分です -
fujitask-scalikejdbcはScalikeJDBCによるFujitaskのトランザクションの実装です -
domainはfujitask-scalikejdbcを使ったサンプルコードです
上から順番に解説していきたいと思います。
(ちなみにFujitaskとScalikeJDBCを組み合わせたDB処理の実装はまだなかったので、今回の説明のために自分が実装しました。
今回の実装に何か問題があっても自分の不手際であり、ドワンゴのプロダクトコードの問題ではないことをご了承ください)
Fujitaskの基本コード
それではサンプルコードのfujitaskプロジェクトにあるFujitaskの基本機能を取り出したコードをご覧いただきましょう。
// fujitask/Task.scala
package fujitask
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
/**
* 『PofEAA』の「Unit of Work」パターンの実装
*
* トランザクションとはストレージに対するまとまった処理である
* トランザクションオブジェクトとはトランザクションを表現するオブジェクトで、
* 具体的にはデータベースライブラリのセッションオブジェクトなどが該当する
*
* @tparam Resource トランザクションオブジェクトの型
* @tparam A トランザクションを実行して得られる値の型
*/
trait Task[-Resource, +A] { lhs =>
/**
* トランザクションの内部で実行される個々の処理の実装
* このメソッドを実装することでTaskが作られる
*
* @param resource トランザクションオブジェクト
* @param ec ExecutionContext
* @return トランザクションの内部で実行される個々の処理で得られる値
*/
def execute(resource: Resource)(implicit ec: ExecutionContext): Future[A]
/**
* Taskモナドを合成する
* その際、変位指定によりResourceの型は両方のTaskのResourceの共通のサブクラスの型になる
*
* @param f モナド関数
* @tparam ExtendedResource トランザクションオブジェクトの型
* @tparam B 合成されたTaskを実行すると得られる値の型
* @return 合成されたTask
*/
def flatMap[ExtendedResource <: Resource, B](f: A => Task[ExtendedResource, B]): Task[ExtendedResource, B] =
new Task[ExtendedResource, B] {
def execute(resource: ExtendedResource)(implicit ec: ExecutionContext): Future[B] =
lhs.execute(resource).map(f).flatMap(_.execute(resource))
}
/**
* 関数をTaskの結果に適用する
*
* @param f 適用したい関数
* @tparam B 関数を適用して得られた値の型
* @return 関数が適用されたTask
*/
def map[B](f: A => B): Task[Resource, B] = flatMap(a => Task(f(a)))
/**
* TaskRunnerを使ってTaskを実行する
* implicitによりResourceに合ったTaskRunnerが選ばれる
*
* @param runner Taskを実行するためのTaskRunner
* @tparam ExtendedResource トランザクションオブジェクトの型
* @return 個々のTaskの処理の結果得られる値
*/
def run[ExtendedResource <: Resource]()(implicit runner: TaskRunner[ExtendedResource]): Future[A] = runner.run(this)
}
object Task {
/**
* Taskのデータコンストラクタ
*
* @param a Taskの値
* @tparam Resource トランザクションオブジェクトの型
* @tparam A Taskの値の型
* @return 実行するとaの値を返すTask
*/
def apply[Resource, A](a: => A): Task[Resource, A] =
new Task[Resource, A] {
def execute(resource: Resource)(implicit executor: ExecutionContext): Future[A] =
Future(a)
}
}
/**
* Taskを実行する
* トランザクションオブジェクトの型ごとにインスタンスを作成すること
*
* @tparam Resource トランザクションオブジェクトの型
*/
trait TaskRunner[Resource] {
/**
* Taskを実行する
*
* @param task 実行するTask
* @tparam A Task実行すると得られる値の型
* @return Task実行して得られた値
*/
def run[A](task: Task[Resource, A]): Future[A]
}
前述のとおり、Fujitask本体はこれだけで記述できてしまいます。コメントを除くと30行ほどになります。
しかし、正直、このコードを見るだけでは、どう使うのかまったく想像できないと思います。
勘のいい方でも気づくのは以下のようなことくらいじゃないでしょうか。
-
mapとflatMapがあるので、なんとなくモナドっぽい - 実行(
run)するとFutureになるらしい - 型変数に反変と共変の変位指定がある
- ということは、
TaskとはResourceを受け取りAを返す関数を表現しているのか? - ということは、Readerモナドに近い?
-
TaskRunnerって何だ?分かれている意味があるのか?
そんな疑問を具体的にFujitaskをScalikeJDBCを使ってDB処理を実装することで解消していきましょう。
トランザクションオブジェクトの型を定義する
Fujitaskを使うためにはまずトランザクションオブジェクトの型を定義する必要があります。
Task[-Resource, +A]の左の型変数Resourceに入るものです。
// fujitask/Transaction.scala
package fujitask
trait Transaction
trait ReadTransaction extends Transaction
trait ReadWriteTransaction extends ReadTransaction
-
ReadTransactionは読み込みだけができるトランザクションを表現しています(Master/Slave構成ではSlaveに問い合わせます) -
ReadWriteTransactionは読み込みと書き込みの両方ができるトランザクションを表現しています(Master/Slave構成ではMasterに問い合わせます)
重要な点は ReadWriteTransactionがReadTransactionを継承する ということです。
これが後々合成するときに利いてきます。
ScalikeJDBC版Fujitask
ScalikeJDBC版のトランザクションオブジェクトを定義する
ここからはfujitask-scalikejdbcプロジェクトのScalikeJDBC版のFujitaskを実装を説明してきます。
まずはScalikeJDBC版のトランザクションオブジェクトを定義しましょう。
// fujitask/scalikejdbc/Transaction.scala
package fujitask.scalikejdbc
import fujitask.ReadTransaction
import fujitask.ReadWriteTransaction
import scalikejdbc._
abstract class ScalikeJDBCTransaction(val session: DBSession)
class ScalikeJDBCReadTransaction(session: DBSession) extends ScalikeJDBCTransaction(session) with ReadTransaction
class ScalikeJDBCReadWriteTransaction(session: DBSession) extends ScalikeJDBCTransaction(session) with ReadWriteTransaction
これらのクラスは要するに抽象型のReadTransactionとReadWriteTransactionを継承し、ScalikeJDBCのDBSessionを包むものです。
このようにTask[-Resource, +A]のResourceには各ストレージライブラリのトランザクションオブジェクト(セッションオブジェクト)を入れます。
ScalikeJDBC版のTaskRunnerを定義する
次にScalikeJDBCを使って以下のメソッドとオブジェクトを作成します。
- トランザクションオブジェクトを取得する
askメソッド - トランザクションを実行するimplicitの
TaskRunnerオブジェクト
以下、実装コードです
// fujitask/scalikejdbc/package.scala
package fujitask
import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import _root_.scalikejdbc._
package object scalikejdbc {
def ask: Task[Transaction, DBSession] =
new Task[Transaction, DBSession] {
def execute(transaction: Transaction)(implicit ec: ExecutionContext): Future[DBSession] =
Future.successful(transaction.asInstanceOf[ScalikeJDBCTransaction].session)
}
implicit def readRunner[R >: ReadTransaction] : TaskRunner[R] =
new TaskRunner[R] {
def run[A](task: Task[R, A]): Future[A] = {
val session = DB.readOnlySession()
val future = task.execute(new ScalikeJDBCReadTransaction(session))
future.onComplete(_ => session.close())
future
}
}
implicit def readWriteRunner[R >: ReadWriteTransaction]: TaskRunner[R] =
new TaskRunner[R] {
def run[A](task: Task[R, A]): Future[A] = {
DB.futureLocalTx(session => task.execute(new ScalikeJDBCReadWriteTransaction(session)))
}
}
}
askメソッドは作者の方に伺ったところ、一般的なReaderモナドのask関数と同等の機能だそうです。
やっぱりFujitaskはReaderモナドの機能を持っているわけですね。
つまりそれぞれのTaskはScalikeJDBCのDBSessionを取得できるということになります。
そして、ReadとReadWriteの二つのTaskRunnerが定義されています。
ScalikeJDBCをご存知の方ならわかると思いますが、それぞれのTaskRunnerにはScalikeJDBCのトランザクション処理が書かれています。
つまり、実際にトランザクション処理を実行し、結果を得るのがTaskRunnerということになります。
ここまで来るとFujitaskがどう動作するのかだんだん予想できてくるのではないでしょうか。
ScalikeJDBC版Fujitaskを使ってDB処理を実装してみる
ではdomainプロジェクトに移り、ScalikeJDBC版Fujitaskを使ってDB処理を書いてみましょう。
ここではUserとMessageのエンティティとそれぞれのCRUD処理を作成してみます。
エンティティ
UserはIDと名前を持ちます。
// domain/entity/User.scala
package domain.entity
case class User(id: Long, name: String)
MessageはIDとメッセージとメッセージを書き込んだユーザー名を持ちます。
// domain/entity/Message.scala
package domain.entity
case class Message(id: Long, message: String, userName: String)
DDLは以下のようになります。
create table if not exists `users` (
`id` bigint not null auto_increment,
`name` varchar(64) not null
)
create table if not exists `messages` (
`id` bigint not null auto_increment,
`message` varchar(256) not null,
`user_name` varchar(64) not null
)
リポジトリ(インターフェース)
次に先ほど定義したUserとMessageのCRUD処理をおこなうリポジトリのインターフェースを定義します。
個々の処理はTaskを返すようにします。このタスクを実行(run)すると処理がおこなわれ、実際の結果が返されます。
// domain/repository/MessageRepository.scala
package domain.repository
import fujitask.Task
import fujitask.ReadTransaction
import fujitask.ReadWriteTransaction
import domain.entity.User
trait UserRepository {
def create(name: String): Task[ReadWriteTransaction, User]
def read(id: Long): Task[ReadTransaction, Option[User]]
def readAll: Task[ReadTransaction, List[User]]
def update(user: User): Task[ReadWriteTransaction, Unit]
def delete(id: Long): Task[ReadWriteTransaction, Unit]
}
MessageRepositoryも似たような感じなので省略します。
ここで注目してほしいのは 個々のトランザクションの指定はそれぞれ自分の機能だけを考えればよい という点です。
たとえばreadメソッドは読み込みなのでReadTransactionを指定し、deleteメソッドはReadWriteTransactionを指定します。
そして、これらのTaskを組み合わせるときにはトランザクションの種類を指定する必要はありません。
最終的に全体がReadTransactionなのか(Slaveに問い合わせるのか)、ReadWriteトランザクションなのか(Masterに問い合わせるか)という 全体のトランザクションの判断はTaskの合成結果により決定される わけです。
もう一つ注目していただきたいのは、このインターフェースを記述した時点ではScalikeJDBCへの依存が発生していないという点です。
つまり トランザクションが抽象化されている と言えるでしょう。
リポジトリ(実装)
それでは次にScalikeJDBCを使って、このインターフェースに実装を与えます。
先ほどfujitask-scalikejdbcプロジェクトで作ったaskメソッドを使います。
// domain/repository/scalikejdbc
package domain.repository.scalikejdbc
import fujitask.Task
import fujitask.ReadTransaction
import fujitask.ReadWriteTransaction
import fujitask.scalikejdbc._
import domain.entity.User
import domain.repository.UserRepository
import scalikejdbc._
object UserRepositoryImpl extends UserRepository {
def create(name: String): Task[ReadWriteTransaction, User] =
ask.map { implicit session =>
val sql = sql"""insert into users (name) values ($name)"""
val id = sql.updateAndReturnGeneratedKey.apply()
User(id, name)
}
def read(id: Long): Task[ReadTransaction, Option[User]] =
ask.map { implicit session =>
val sql = sql"""select * from users where id = $id"""
sql.map(rs => User(rs.long("id"), rs.string("name"))).single.apply()
}
def readAll: Task[ReadTransaction, List[User]] =
ask.map { implicit session =>
val sql = sql"""select * from users"""
sql.map(rs => User(rs.long("id"), rs.string("name"))).list.apply()
}
def update(user: User): Task[ReadWriteTransaction, Unit] =
ask.map { implicit session =>
val sql = sql"""update users set name = ${user.name} where id = ${user.id}"""
sql.update.apply()
}
def delete(id: Long): Task[ReadWriteTransaction, Unit] =
ask.map { implicit session =>
val sql = sql"""delete users where id = $id"""
sql.update.apply()
}
}
ScalikeJDBCを知っているなら理解していただけると思いますが、この実装は素のScalikeJDBCの記述にかなり近いものになっていると思います。
fujitask-scalikejdbcプロジェクトのところで説明したようにaskでScalikeJDBCのDBSessionを取得したあとは、通常のScalikeJDBCの処理と同じように書くだけで個々のTaskを実装することができます。
MessageRepositoryImplも同じような感じなので省略します。
サービス
では、このリポジトリを使ってサービスを作ってみましょう。
UserService
UserServiceはリポジトリのTaskを素直にrunで実行するだけにしましょう。
ちゃんと実装するならUserRepositoryやTaskRunnerインスタンスはDIしたほうがいいと思いますが、コードを単純にするため今回はそのまま使います。
package domain.service
import domain.entity.User
import domain.repository.UserRepository
import domain.repository.scalikejdbc.UserRepositoryImpl
import fujitask.scalikejdbc._
import scala.concurrent.Future
object UserService {
val userRepository: UserRepository = UserRepositoryImpl
def create(name: String): Future[User] =
userRepository.create(name).run()
def read(id: Long): Future[Option[User]] =
userRepository.read(id).run()
def readAll: Future[List[User]] =
userRepository.readAll.run()
def update(user: User): Future[Unit] =
userRepository.update(user).run()
def delete(id: Long): Future[Unit] =
userRepository.delete(id).run()
}
Taskの実行にはfujitask-scalikejdbcプロジェクトで定義されたTaskRunnerインスタンスが使われます。
MessageService
MessageServiceのほうはCRUD処理に加えてTaskを合成する処理のcreateByUserIdメソッドも入れてみましょう。
package domain.service
import domain.entity.Message
import domain.repository.UserRepository
import domain.repository.MessageRepository
import domain.repository.scalikejdbc.UserRepositoryImpl
import domain.repository.scalikejdbc.MessageRepositoryImpl
import fujitask.scalikejdbc._
import scala.concurrent.Future
object MessageService {
val userRepository: UserRepository = UserRepositoryImpl
val messageRepository: MessageRepository = MessageRepositoryImpl
def create(message: String, userName: String): Future[Message] =
messageRepository.create(message, userName).run()
def read(id: Long): Future[Option[Message]] =
messageRepository.read(id).run()
def readAll: Future[List[Message]] =
messageRepository.readAll.run()
def update(message: Message): Future[Unit] =
messageRepository.update(message).run()
def delete(id: Long): Future[Unit] =
messageRepository.delete(id).run()
def createByUserId(message: String, userId: Long): Future[Message] = {
val task =
for {
userOpt <- userRepository.read(userId)
user = userOpt.getOrElse(throw new IllegalArgumentException("User Not Found"))
message <- messageRepository.create(message, user.name)
} yield message
task.run()
}
}
createByUserIdはまずuserRepository.readを使ってユーザー名を取得します。
ユーザーが存在しなかった場合は例外を投げます。
そしてmessageRepository.createに取得できたユーザー名とメッセージをあたえてMessageを作成します。
例によってTaskはモナドなのでScalaのfor式を使ってこのような合成処理を記述できます。
最後に合成してできたTaskをrunして結果を取得します。
このようにサービスの記述は非常に簡単ですね。
実行確認
それでは作成したサービスの動作を確認してみましょう。
わかりやすいようにTaskRunnerインスタンスにデバッグプリントを追加します。
readRunnerが使われた場合は"ReadRunner"という文字列が出力されます。
readWriteRunnerが使われた場合は"ReadWriteRunner"という文字列が出力されます。
またFutureの結果確認のために以下のメソッドも追加します。
def getValue[A](f: Future[A]): A = Await.result(f, Duration(300, "seconds"))
それでは動かしてみます。
まずユーザーを作成してみます。
scala> getValue(UserService.create("Randy"))
ReadWriteRunner
res1: domain.entity.User = User(1,Randy)
ReadWriteRunnerが使われて"Randy"という名前のUserが作成されました。
次にユーザーを取得してみます。
scala> getValue(UserService.read(1))
ReadRunner
res2: Option[domain.entity.User] = Some(User(1,Randy))
ReadRunnerが使われてUserオブジェクトを取得することができました。
Taskに応じてTaskRunnerが選択されていることがわかります。
次に上で合成したMessageService.createByUserIdメソッドを使ってみましょう。
scala> getValue(MessageService.createByUserId("つらい", 1))
ReadWriteRunner
res3: domain.entity.Message = Message(1,Randy,つらい)
Randyさんの「つらい」というメッセージを作成できました。
ここではUserRepository.readのTask[ReadTransaction, Option[User]]とMessageRepository.createのTask[ReadWriteTransaction, Message]が合成された結果、
Task[ReadWriteTransaction, Message]が作られ、実行時にはTaskRunnerとしてReadWriteRunnerが選ばれて実行されていることがわかります。
わざとトランザクションを失敗させてみる
最後にわざとらしくトランザクションを失敗させてみましょう。
3回連続ユーザーを作成するcreate3と、同じく3回連続ユーザーを作成するのですが各々runするcreate3CommitEachの二つのメソッドを作成します。
def create3(name1: String, name2: String, name3: String): Future[(User, User, User)] =
(for {
user1 <- userRepository.create(name1)
user2 <- userRepository.create(name2)
user3 <- userRepository.create(name3)
} yield (user1, user2, user3)).run()
import scala.concurrent.ExecutionContext.Implicits.global
def create3CommitEach(name1: String, name2: String, name3: String): Future[(User, User, User)] =
for {
user1 <- userRepository.create(name1).run()
user2 <- userRepository.create(name2).run()
user3 <- userRepository.create(name3).run()
} yield (user1, user2, user3)
このcreate3とcreate3CommitEachにnullを与えてみてそれぞれ動作の違いを見てみましょう。
create3
scala> getValue(UserService.create3("Randy1", "Randy2", null))
ReadWriteRunner
org.h2.jdbc.JdbcSQLException: 列 "NAME" にはnull値が許されていません
(略)
scala> getValue(UserService.readAll)
ReadRunner
res2: List[domain.entity.User] = List()
scala> getValue(UserService.create3CommitEach("Randy1", "Randy2", null))
ReadWriteRunner
ReadWriteRunner
ReadWriteRunner
org.h2.jdbc.JdbcSQLException: 列 "NAME" にはnull値が許されていません
(略)
scala> getValue(UserService.readAll)
ReadRunner
res4: List[domain.entity.User] = List(User(1,Randy1), User(2,Randy2))
普通にFujitaskを使ったcreate3の場合は例外が発生し、ユーザーが一つも作成されていないことがわかります。
毎回runをしたcreate3CommitEachのほうは例外が発生するまでの二つのユーザーが作成されてしまっています。
これでFujitaskによりトランザクションの合成がちゃんとおこなれていることが確認できました。
あらためてFujitaskのすごさを考える
まず便利!
これまでトランザクションの抽象化、トランザクションの合成、合成によるトランザクションの種類の判断というFujitaskの三つの能力を見てきました。
Fujitaskのこれらの機能によりMySQLなどのトランザクション機能を持つRDBMSや、Master/Slave構成のRedisなど、Webアプリケーションでよく使う幅広いストレージを統一的なインターフェースで扱えるようになります。
実装が簡単!使いやすい!
Fujitaskは本体のコードは数十行ほどでしかないですし、ScalikeJDBCを組み合わせた場合でもほとんど冗長な記述なく、DB処理を書くことができます。
Fujitaskの作者の方は、自由な発想で機能を作りつつも、ライブラリの使いやすさをとても重視しています。
Fujitaskはまさにその特徴が表れていると言えます。
関数型プログラミングとオブジェクト指向プログラミングの融合!Scalaの高い記述能力が活かされている
Fujitaskは全体としてReaderモナドに近いモナドであり、TaskRunnerも型クラス的なものであることから、関数型プログラミングの産物であると言えます。
しかし、トランザクションオブジェクトに継承関係を持たせ、Taskの型変数に変位指定を使い、その場合にflatMapのとき型変数が二つのトランザクションオブジェクトの共通のサブクラスの型になることを利用し、トランザクションがReadかReadWriteかを判断するというところは、オブジェクト指向の技術を使っていると言えます。
Fujitaskがシンプルながら他のライブラリにない発想があると言えるのは、このように関数型プログラミングとオブジェクト指向プログラミングの技術を高度に組み合わせているところにあります。
これは関数型プログラミング言語としてもオブジェクト指向プログラミング言語としても高い機能を合わせ持つScalaならではの発明と言えるでしょう。
そもそもFujitaskはマーチン・ファウラーの『PofEAA』の「Unit of Work」パターンの実装として考案されたらしいです(僕はその誕生に立ち合ってないので、詳しくはわかりませんが)。
それでこのトランザクションモナドを作るのですから、やはり作者は天才としか言いようがないでしょう。
まとめ
以上、ScalaMatsuriセッションで発表予定であるドワンゴ秘蔵のトランザクションモナドについて解説しました。
まだドワンゴには他にもいくつかこういうものがあるんですが、僕は現在ちょっと時間がなくてScalaMatsuriセッション資料提出期限の12月15日までにまとめきれるかわからないというのと、
ScalaMatsuriセッションの質疑応答合わせて40分という発表時間中に、それらをうまく説明することができるのかという、二つの難題を抱えているところです…。