このたびはScalaMatsuriのセッションに投票していただき、ありがとうございました。
今回はそのScalaMatsuriのセッションで発表予定の内容の一つであるドワンゴ秘蔵のトランザクションモナドについて解説したいと思います。
このトランザクションモナドは基本的な機能だけなら30行ほどの短いコードで記述できてしまうのですが、なかなか説明が難しい代物でして、
ScalaMatsuriの自分の発表時間内に聴衆のみなさんに理解していただくのは難しいだろうということで、先に解説記事を書くことにしました。
このトランザクションモナドは作者の名前から通称Fujitaskと呼ばれているのですが、作者の方は周りから「天才」と言われてまして、彼は常人が思いつかないようなコードを書かれるんですね。
Fujitaskは短いながらも、モナドと、サブタイピング(変位指定)と、アドホックポリモーフィズムの三つの機能を使っており、Scalaでなければ実現できない機能を持っています。
特に変位指定の部分は、Fujitaskで一番難解なところであり、そして、まさに「発明」と呼ぶべき発想があります。
それでは、その天才が産み出したFujitaskを解説していきたいと思います。
Fujitaskの機能
Fujitaskの機能はおおまかに言って以下のような機能があります。
- トランザクションの抽象化ができる
- トランザクションの合成ができる
- そのトランザクションが読み込みなのか、読み込みと書き込みの両方ができるのかを 合成により 判断する
1と2は「トランザクションモナド」って言うんだから、それくらいはできるでしょという感じだと思いますが、3がこのFujitaskのポイントです。
たとえば、Fujitaskで読み込みしかできないReadトランザクションと、読み込みと書き込みができるReadWriteトランザクションを合成すると、ReadWriteのトランザクションになります。
これはMaster/Slave構成のストレージでは特に重要になります。
Master/Slave構成のストレージの場合、基本的にReadトランザクションの場合はSlaveに問い合わせに行き、ReadWriteトランザクションの場合はMasterに問い合わせに行くわけですが、
このような構成の場合、以下のような問題がよくあるのではないでしょうか。
- Masterに行くか、Slaveに行くかを個々のケースについていちいち記述しなければいけなくて面倒くさい
- 間違って本来はMasterに行くべき更新のクエリーをSlaveに発行しても、コンパイル時ではなく実行時エラーにより判明する
- Slaveに問い合わせに行くべき処理をMasterに問い合わせてしまっても、エラーにはならないので気づきにくい
Fujitaskにはこれらの問題を解決する機能があります。
したがって、トランザクション機能がないストレージ、たとえばMaster/Slave構成のRedisなどでもFujitaskは有効です。
サンプルコード
今回解説するサンプルコードは hexx/fujitask-simple にあります。
このコードには3つのsbtプロジェクトがあります。
-
fujitask
はFujitaskの基本部分です -
fujitask-scalikejdbc
はScalikeJDBCによるFujitaskのトランザクションの実装です -
domain
はfujitask-scalikejdbc
を使ったサンプルコードです
上から順番に解説していきたいと思います。
(ちなみにFujitaskとScalikeJDBCを組み合わせたDB処理の実装はまだなかったので、今回の説明のために自分が実装しました。
今回の実装に何か問題があっても自分の不手際であり、ドワンゴのプロダクトコードの問題ではないことをご了承ください)
Fujitaskの基本コード
それではサンプルコードのfujitask
プロジェクトにあるFujitaskの基本機能を取り出したコードをご覧いただきましょう。
// fujitask/Task.scala
package fujitask
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
/**
* 『PofEAA』の「Unit of Work」パターンの実装
*
* トランザクションとはストレージに対するまとまった処理である
* トランザクションオブジェクトとはトランザクションを表現するオブジェクトで、
* 具体的にはデータベースライブラリのセッションオブジェクトなどが該当する
*
* @tparam Resource トランザクションオブジェクトの型
* @tparam A トランザクションを実行して得られる値の型
*/
trait Task[-Resource, +A] { lhs =>
/**
* トランザクションの内部で実行される個々の処理の実装
* このメソッドを実装することでTaskが作られる
*
* @param resource トランザクションオブジェクト
* @param ec ExecutionContext
* @return トランザクションの内部で実行される個々の処理で得られる値
*/
def execute(resource: Resource)(implicit ec: ExecutionContext): Future[A]
/**
* Taskモナドを合成する
* その際、変位指定によりResourceの型は両方のTaskのResourceの共通のサブクラスの型になる
*
* @param f モナド関数
* @tparam ExtendedResource トランザクションオブジェクトの型
* @tparam B 合成されたTaskを実行すると得られる値の型
* @return 合成されたTask
*/
def flatMap[ExtendedResource <: Resource, B](f: A => Task[ExtendedResource, B]): Task[ExtendedResource, B] =
new Task[ExtendedResource, B] {
def execute(resource: ExtendedResource)(implicit ec: ExecutionContext): Future[B] =
lhs.execute(resource).map(f).flatMap(_.execute(resource))
}
/**
* 関数をTaskの結果に適用する
*
* @param f 適用したい関数
* @tparam B 関数を適用して得られた値の型
* @return 関数が適用されたTask
*/
def map[B](f: A => B): Task[Resource, B] = flatMap(a => Task(f(a)))
/**
* TaskRunnerを使ってTaskを実行する
* implicitによりResourceに合ったTaskRunnerが選ばれる
*
* @param runner Taskを実行するためのTaskRunner
* @tparam ExtendedResource トランザクションオブジェクトの型
* @return 個々のTaskの処理の結果得られる値
*/
def run[ExtendedResource <: Resource]()(implicit runner: TaskRunner[ExtendedResource]): Future[A] = runner.run(this)
}
object Task {
/**
* Taskのデータコンストラクタ
*
* @param a Taskの値
* @tparam Resource トランザクションオブジェクトの型
* @tparam A Taskの値の型
* @return 実行するとaの値を返すTask
*/
def apply[Resource, A](a: => A): Task[Resource, A] =
new Task[Resource, A] {
def execute(resource: Resource)(implicit executor: ExecutionContext): Future[A] =
Future(a)
}
}
/**
* Taskを実行する
* トランザクションオブジェクトの型ごとにインスタンスを作成すること
*
* @tparam Resource トランザクションオブジェクトの型
*/
trait TaskRunner[Resource] {
/**
* Taskを実行する
*
* @param task 実行するTask
* @tparam A Task実行すると得られる値の型
* @return Task実行して得られた値
*/
def run[A](task: Task[Resource, A]): Future[A]
}
前述のとおり、Fujitask本体はこれだけで記述できてしまいます。コメントを除くと30行ほどになります。
しかし、正直、このコードを見るだけでは、どう使うのかまったく想像できないと思います。
勘のいい方でも気づくのは以下のようなことくらいじゃないでしょうか。
-
map
とflatMap
があるので、なんとなくモナドっぽい - 実行(
run
)するとFutureになるらしい - 型変数に反変と共変の変位指定がある
- ということは、
Task
とはResource
を受け取りA
を返す関数を表現しているのか? - ということは、Readerモナドに近い?
-
TaskRunner
って何だ?分かれている意味があるのか?
そんな疑問を具体的にFujitaskをScalikeJDBCを使ってDB処理を実装することで解消していきましょう。
トランザクションオブジェクトの型を定義する
Fujitaskを使うためにはまずトランザクションオブジェクトの型を定義する必要があります。
Task[-Resource, +A]
の左の型変数Resource
に入るものです。
// fujitask/Transaction.scala
package fujitask
trait Transaction
trait ReadTransaction extends Transaction
trait ReadWriteTransaction extends ReadTransaction
-
ReadTransaction
は読み込みだけができるトランザクションを表現しています(Master/Slave構成ではSlaveに問い合わせます) -
ReadWriteTransaction
は読み込みと書き込みの両方ができるトランザクションを表現しています(Master/Slave構成ではMasterに問い合わせます)
重要な点は ReadWriteTransaction
がReadTransaction
を継承する ということです。
これが後々合成するときに利いてきます。
ScalikeJDBC版Fujitask
ScalikeJDBC版のトランザクションオブジェクトを定義する
ここからはfujitask-scalikejdbc
プロジェクトのScalikeJDBC版のFujitaskを実装を説明してきます。
まずはScalikeJDBC版のトランザクションオブジェクトを定義しましょう。
// fujitask/scalikejdbc/Transaction.scala
package fujitask.scalikejdbc
import fujitask.ReadTransaction
import fujitask.ReadWriteTransaction
import scalikejdbc._
abstract class ScalikeJDBCTransaction(val session: DBSession)
class ScalikeJDBCReadTransaction(session: DBSession) extends ScalikeJDBCTransaction(session) with ReadTransaction
class ScalikeJDBCReadWriteTransaction(session: DBSession) extends ScalikeJDBCTransaction(session) with ReadWriteTransaction
これらのクラスは要するに抽象型のReadTransaction
とReadWriteTransaction
を継承し、ScalikeJDBCのDBSession
を包むものです。
このようにTask[-Resource, +A]
のResource
には各ストレージライブラリのトランザクションオブジェクト(セッションオブジェクト)を入れます。
ScalikeJDBC版のTaskRunner
を定義する
次にScalikeJDBCを使って以下のメソッドとオブジェクトを作成します。
- トランザクションオブジェクトを取得する
ask
メソッド - トランザクションを実行するimplicitの
TaskRunner
オブジェクト
以下、実装コードです
// fujitask/scalikejdbc/package.scala
package fujitask
import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import _root_.scalikejdbc._
package object scalikejdbc {
def ask: Task[Transaction, DBSession] =
new Task[Transaction, DBSession] {
def execute(transaction: Transaction)(implicit ec: ExecutionContext): Future[DBSession] =
Future.successful(transaction.asInstanceOf[ScalikeJDBCTransaction].session)
}
implicit def readRunner[R >: ReadTransaction] : TaskRunner[R] =
new TaskRunner[R] {
def run[A](task: Task[R, A]): Future[A] = {
val session = DB.readOnlySession()
val future = task.execute(new ScalikeJDBCReadTransaction(session))
future.onComplete(_ => session.close())
future
}
}
implicit def readWriteRunner[R >: ReadWriteTransaction]: TaskRunner[R] =
new TaskRunner[R] {
def run[A](task: Task[R, A]): Future[A] = {
DB.futureLocalTx(session => task.execute(new ScalikeJDBCReadWriteTransaction(session)))
}
}
}
ask
メソッドは作者の方に伺ったところ、一般的なReaderモナドのask
関数と同等の機能だそうです。
やっぱりFujitaskはReaderモナドの機能を持っているわけですね。
つまりそれぞれのTask
はScalikeJDBCのDBSession
を取得できるということになります。
そして、ReadとReadWriteの二つのTaskRunner
が定義されています。
ScalikeJDBCをご存知の方ならわかると思いますが、それぞれのTaskRunner
にはScalikeJDBCのトランザクション処理が書かれています。
つまり、実際にトランザクション処理を実行し、結果を得るのがTaskRunner
ということになります。
ここまで来るとFujitaskがどう動作するのかだんだん予想できてくるのではないでしょうか。
ScalikeJDBC版Fujitaskを使ってDB処理を実装してみる
ではdomain
プロジェクトに移り、ScalikeJDBC版Fujitaskを使ってDB処理を書いてみましょう。
ここではUserとMessageのエンティティとそれぞれのCRUD処理を作成してみます。
エンティティ
User
はIDと名前を持ちます。
// domain/entity/User.scala
package domain.entity
case class User(id: Long, name: String)
Message
はIDとメッセージとメッセージを書き込んだユーザー名を持ちます。
// domain/entity/Message.scala
package domain.entity
case class Message(id: Long, message: String, userName: String)
DDLは以下のようになります。
create table if not exists `users` (
`id` bigint not null auto_increment,
`name` varchar(64) not null
)
create table if not exists `messages` (
`id` bigint not null auto_increment,
`message` varchar(256) not null,
`user_name` varchar(64) not null
)
リポジトリ(インターフェース)
次に先ほど定義したUser
とMessage
のCRUD処理をおこなうリポジトリのインターフェースを定義します。
個々の処理はTask
を返すようにします。このタスクを実行(run
)すると処理がおこなわれ、実際の結果が返されます。
// domain/repository/MessageRepository.scala
package domain.repository
import fujitask.Task
import fujitask.ReadTransaction
import fujitask.ReadWriteTransaction
import domain.entity.User
trait UserRepository {
def create(name: String): Task[ReadWriteTransaction, User]
def read(id: Long): Task[ReadTransaction, Option[User]]
def readAll: Task[ReadTransaction, List[User]]
def update(user: User): Task[ReadWriteTransaction, Unit]
def delete(id: Long): Task[ReadWriteTransaction, Unit]
}
MessageRepository
も似たような感じなので省略します。
ここで注目してほしいのは 個々のトランザクションの指定はそれぞれ自分の機能だけを考えればよい という点です。
たとえばread
メソッドは読み込みなのでReadTransaction
を指定し、delete
メソッドはReadWriteTransaction
を指定します。
そして、これらのTask
を組み合わせるときにはトランザクションの種類を指定する必要はありません。
最終的に全体がReadTransaction
なのか(Slaveに問い合わせるのか)、ReadWrite
トランザクションなのか(Masterに問い合わせるか)という 全体のトランザクションの判断はTask
の合成結果により決定される わけです。
もう一つ注目していただきたいのは、このインターフェースを記述した時点ではScalikeJDBCへの依存が発生していないという点です。
つまり トランザクションが抽象化されている と言えるでしょう。
リポジトリ(実装)
それでは次にScalikeJDBCを使って、このインターフェースに実装を与えます。
先ほどfujitask-scalikejdbc
プロジェクトで作ったask
メソッドを使います。
// domain/repository/scalikejdbc
package domain.repository.scalikejdbc
import fujitask.Task
import fujitask.ReadTransaction
import fujitask.ReadWriteTransaction
import fujitask.scalikejdbc._
import domain.entity.User
import domain.repository.UserRepository
import scalikejdbc._
object UserRepositoryImpl extends UserRepository {
def create(name: String): Task[ReadWriteTransaction, User] =
ask.map { implicit session =>
val sql = sql"""insert into users (name) values ($name)"""
val id = sql.updateAndReturnGeneratedKey.apply()
User(id, name)
}
def read(id: Long): Task[ReadTransaction, Option[User]] =
ask.map { implicit session =>
val sql = sql"""select * from users where id = $id"""
sql.map(rs => User(rs.long("id"), rs.string("name"))).single.apply()
}
def readAll: Task[ReadTransaction, List[User]] =
ask.map { implicit session =>
val sql = sql"""select * from users"""
sql.map(rs => User(rs.long("id"), rs.string("name"))).list.apply()
}
def update(user: User): Task[ReadWriteTransaction, Unit] =
ask.map { implicit session =>
val sql = sql"""update users set name = ${user.name} where id = ${user.id}"""
sql.update.apply()
}
def delete(id: Long): Task[ReadWriteTransaction, Unit] =
ask.map { implicit session =>
val sql = sql"""delete users where id = $id"""
sql.update.apply()
}
}
ScalikeJDBCを知っているなら理解していただけると思いますが、この実装は素のScalikeJDBCの記述にかなり近いものになっていると思います。
fujitask-scalikejdbc
プロジェクトのところで説明したようにask
でScalikeJDBCのDBSession
を取得したあとは、通常のScalikeJDBCの処理と同じように書くだけで個々のTaskを実装することができます。
MessageRepositoryImpl
も同じような感じなので省略します。
サービス
では、このリポジトリを使ってサービスを作ってみましょう。
UserService
UserService
はリポジトリのTask
を素直にrun
で実行するだけにしましょう。
ちゃんと実装するならUserRepository
やTaskRunner
インスタンスはDIしたほうがいいと思いますが、コードを単純にするため今回はそのまま使います。
package domain.service
import domain.entity.User
import domain.repository.UserRepository
import domain.repository.scalikejdbc.UserRepositoryImpl
import fujitask.scalikejdbc._
import scala.concurrent.Future
object UserService {
val userRepository: UserRepository = UserRepositoryImpl
def create(name: String): Future[User] =
userRepository.create(name).run()
def read(id: Long): Future[Option[User]] =
userRepository.read(id).run()
def readAll: Future[List[User]] =
userRepository.readAll.run()
def update(user: User): Future[Unit] =
userRepository.update(user).run()
def delete(id: Long): Future[Unit] =
userRepository.delete(id).run()
}
Taskの実行にはfujitask-scalikejdbc
プロジェクトで定義されたTaskRunner
インスタンスが使われます。
MessageService
MessageService
のほうはCRUD処理に加えてTask
を合成する処理のcreateByUserId
メソッドも入れてみましょう。
package domain.service
import domain.entity.Message
import domain.repository.UserRepository
import domain.repository.MessageRepository
import domain.repository.scalikejdbc.UserRepositoryImpl
import domain.repository.scalikejdbc.MessageRepositoryImpl
import fujitask.scalikejdbc._
import scala.concurrent.Future
object MessageService {
val userRepository: UserRepository = UserRepositoryImpl
val messageRepository: MessageRepository = MessageRepositoryImpl
def create(message: String, userName: String): Future[Message] =
messageRepository.create(message, userName).run()
def read(id: Long): Future[Option[Message]] =
messageRepository.read(id).run()
def readAll: Future[List[Message]] =
messageRepository.readAll.run()
def update(message: Message): Future[Unit] =
messageRepository.update(message).run()
def delete(id: Long): Future[Unit] =
messageRepository.delete(id).run()
def createByUserId(message: String, userId: Long): Future[Message] = {
val task =
for {
userOpt <- userRepository.read(userId)
user = userOpt.getOrElse(throw new IllegalArgumentException("User Not Found"))
message <- messageRepository.create(message, user.name)
} yield message
task.run()
}
}
createByUserId
はまずuserRepository.read
を使ってユーザー名を取得します。
ユーザーが存在しなかった場合は例外を投げます。
そしてmessageRepository.create
に取得できたユーザー名とメッセージをあたえてMessage
を作成します。
例によってTask
はモナドなのでScalaのfor式を使ってこのような合成処理を記述できます。
最後に合成してできたTaskをrun
して結果を取得します。
このようにサービスの記述は非常に簡単ですね。
実行確認
それでは作成したサービスの動作を確認してみましょう。
わかりやすいようにTaskRunner
インスタンスにデバッグプリントを追加します。
readRunner
が使われた場合は"ReadRunner"という文字列が出力されます。
readWriteRunner
が使われた場合は"ReadWriteRunner"という文字列が出力されます。
またFutureの結果確認のために以下のメソッドも追加します。
def getValue[A](f: Future[A]): A = Await.result(f, Duration(300, "seconds"))
それでは動かしてみます。
まずユーザーを作成してみます。
scala> getValue(UserService.create("Randy"))
ReadWriteRunner
res1: domain.entity.User = User(1,Randy)
ReadWriteRunner
が使われて"Randy"という名前のUser
が作成されました。
次にユーザーを取得してみます。
scala> getValue(UserService.read(1))
ReadRunner
res2: Option[domain.entity.User] = Some(User(1,Randy))
ReadRunner
が使われてUser
オブジェクトを取得することができました。
Task
に応じてTaskRunner
が選択されていることがわかります。
次に上で合成したMessageService.createByUserId
メソッドを使ってみましょう。
scala> getValue(MessageService.createByUserId("つらい", 1))
ReadWriteRunner
res3: domain.entity.Message = Message(1,Randy,つらい)
Randyさんの「つらい」というメッセージを作成できました。
ここではUserRepository.read
のTask[ReadTransaction, Option[User]]
とMessageRepository.create
のTask[ReadWriteTransaction, Message]
が合成された結果、
Task[ReadWriteTransaction, Message]
が作られ、実行時にはTaskRunnerとしてReadWriteRunnerが選ばれて実行されていることがわかります。
わざとトランザクションを失敗させてみる
最後にわざとらしくトランザクションを失敗させてみましょう。
3回連続ユーザーを作成するcreate3
と、同じく3回連続ユーザーを作成するのですが各々run
するcreate3CommitEach
の二つのメソッドを作成します。
def create3(name1: String, name2: String, name3: String): Future[(User, User, User)] =
(for {
user1 <- userRepository.create(name1)
user2 <- userRepository.create(name2)
user3 <- userRepository.create(name3)
} yield (user1, user2, user3)).run()
import scala.concurrent.ExecutionContext.Implicits.global
def create3CommitEach(name1: String, name2: String, name3: String): Future[(User, User, User)] =
for {
user1 <- userRepository.create(name1).run()
user2 <- userRepository.create(name2).run()
user3 <- userRepository.create(name3).run()
} yield (user1, user2, user3)
このcreate3
とcreate3CommitEach
にnull
を与えてみてそれぞれ動作の違いを見てみましょう。
create3
scala> getValue(UserService.create3("Randy1", "Randy2", null))
ReadWriteRunner
org.h2.jdbc.JdbcSQLException: 列 "NAME" にはnull値が許されていません
(略)
scala> getValue(UserService.readAll)
ReadRunner
res2: List[domain.entity.User] = List()
scala> getValue(UserService.create3CommitEach("Randy1", "Randy2", null))
ReadWriteRunner
ReadWriteRunner
ReadWriteRunner
org.h2.jdbc.JdbcSQLException: 列 "NAME" にはnull値が許されていません
(略)
scala> getValue(UserService.readAll)
ReadRunner
res4: List[domain.entity.User] = List(User(1,Randy1), User(2,Randy2))
普通にFujitaskを使ったcreate3
の場合は例外が発生し、ユーザーが一つも作成されていないことがわかります。
毎回run
をしたcreate3CommitEach
のほうは例外が発生するまでの二つのユーザーが作成されてしまっています。
これでFujitaskによりトランザクションの合成がちゃんとおこなれていることが確認できました。
あらためてFujitaskのすごさを考える
まず便利!
これまでトランザクションの抽象化、トランザクションの合成、合成によるトランザクションの種類の判断というFujitaskの三つの能力を見てきました。
Fujitaskのこれらの機能によりMySQLなどのトランザクション機能を持つRDBMSや、Master/Slave構成のRedisなど、Webアプリケーションでよく使う幅広いストレージを統一的なインターフェースで扱えるようになります。
実装が簡単!使いやすい!
Fujitaskは本体のコードは数十行ほどでしかないですし、ScalikeJDBCを組み合わせた場合でもほとんど冗長な記述なく、DB処理を書くことができます。
Fujitaskの作者の方は、自由な発想で機能を作りつつも、ライブラリの使いやすさをとても重視しています。
Fujitaskはまさにその特徴が表れていると言えます。
関数型プログラミングとオブジェクト指向プログラミングの融合!Scalaの高い記述能力が活かされている
Fujitaskは全体としてReaderモナドに近いモナドであり、TaskRunner
も型クラス的なものであることから、関数型プログラミングの産物であると言えます。
しかし、トランザクションオブジェクトに継承関係を持たせ、Task
の型変数に変位指定を使い、その場合にflatMapのとき型変数が二つのトランザクションオブジェクトの共通のサブクラスの型になることを利用し、トランザクションがReadかReadWriteかを判断するというところは、オブジェクト指向の技術を使っていると言えます。
Fujitaskがシンプルながら他のライブラリにない発想があると言えるのは、このように関数型プログラミングとオブジェクト指向プログラミングの技術を高度に組み合わせているところにあります。
これは関数型プログラミング言語としてもオブジェクト指向プログラミング言語としても高い機能を合わせ持つScalaならではの発明と言えるでしょう。
そもそもFujitaskはマーチン・ファウラーの『PofEAA』の「Unit of Work」パターンの実装として考案されたらしいです(僕はその誕生に立ち合ってないので、詳しくはわかりませんが)。
それでこのトランザクションモナドを作るのですから、やはり作者は天才としか言いようがないでしょう。
まとめ
以上、ScalaMatsuriセッションで発表予定であるドワンゴ秘蔵のトランザクションモナドについて解説しました。
まだドワンゴには他にもいくつかこういうものがあるんですが、僕は現在ちょっと時間がなくてScalaMatsuriセッション資料提出期限の12月15日までにまとめきれるかわからないというのと、
ScalaMatsuriセッションの質疑応答合わせて40分という発表時間中に、それらをうまく説明することができるのかという、二つの難題を抱えているところです…。